资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 前沿技术 > 大数据 > 大数据系统

Flink基础系列17-Tranform之多流转换算子

大数据系统 更新时间: 发布时间: 计算机考试归档 最新发布

Flink基础系列17-Tranform之多流转换算子

文章目录
  • 一.多流转换算子概述
    • 1.1 Split和Select
    • 1.2 Connect和CoMap
    • 1.3 Union
  • 二.代码实现
  • 参考:

一.多流转换算子概述

多流转换算子一般包括:
Split和Select (新版已经移除)
Connect和CoMap
Union

1.1 Split和Select

注:新版Flink已经不存在Split和Select这两个API了(至少Flink1.12.1没有!)

Split

DataStream -> SplitStream:根据某些特征把DataStream拆分成SplitStream;
SplitStream虽然看起来像是两个Stream,但是其实它是一个特殊的Stream;

Select

SplitStream -> DataStream:从一个SplitStream中获取一个或者多个DataStream;
我们可以结合split&select将一个DataStream拆分成多个DataStream。

1.2 Connect和CoMap

Connect

DataStream,DataStream -> ConnectedStreams: 连接两个保持他们类型的数据流,两个数据流被Connect 之后,只是被放在了一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap

ConnectedStreams -> DataStream: 作用于ConnectedStreams 上,功能与map和flatMap一样,对ConnectedStreams 中的每一个Stream分别进行map和flatMap操作;

1.3 Union

DataStream -> DataStream:对两个或者两个以上的DataStream进行Union操作,产生一个包含多有DataStream元素的新DataStream。

问题:和Connect的区别?

  1. Connect 的数据类型可以不同,Connect 只能合并两个流;
  2. Union可以合并多条流,Union的数据结构必须是一样的;
二.代码实现

数据准备:
sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

代码:

package org.flink.transform;



import org.flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class TransformTest4_MultipleStreams {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream inputStream = env.readTextFile("C:\Users\Administrator\IdeaProjects\FlinkStudy\src\main\resources\sensor.txt");

        // 转换成SensorReading
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        } );

        // 1. 分流,按照温度值30度为界分为两条流
        SplitStream splitStream = dataStream.split(new OutputSelector() {
            @Override
            public Iterable select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });

        DataStream highTempStream = splitStream.select("high");
        DataStream lowTempStream = splitStream.select("low");
        DataStream allTempStream = splitStream.select("high", "low");

        highTempStream.print("high");
        lowTempStream.print("low");
        allTempStream.print("all");

        // 2. 合流 connect,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
        DataStream> warningStream = highTempStream.map(new MapFunction>() {
            @Override
            public Tuple2 map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });

        ConnectedStreams, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

        DataStream resultStream = connectedStreams.map(new CoMapFunction, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2 value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "high temp warning");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "normal");
            }
        });

        resultStream.print();

        // 3. union联合多条流
//        warningStream.union(lowTempStream);
        highTempStream.union(lowTempStream, allTempStream);

        env.execute();
    }
}

 

测试记录:

参考:
  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae
转载请注明:文章转载自 http://www.konglu.com/
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【Flink基础系列17-Tranform之多流转换算子】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2