目的:用connet将不同的两个流合并,用comap处理后返回一个流,对比union
环境:jdk8 工具:idea
创建maven项目
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.readTextFile("文件1绝对路径");DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {@Overridepublic Test map(String value) throws Exception {String[] words = value.split(" ");return new Test(words[0], Integer.parseInt(words[1]), words[2]);}});DataStream<String> inputStream1 = env.readTextFile("文件2绝对路径");//连接两条流ConnectedStreams<Test, String> con = dataStream.connect(inputStream1);//合并两条流,分别定义不同流的返回结构DataStream<Object> resCon = con.map(new CoMapFunction<Test, String, Object>() {@Overridepublic Object map1(Test value) throws Exception {return new Tuple2<>(value.getId(),value.getContent());}@Overridepublic Object map2(String value) throws Exception {return new Tuple2<>(value,"test1");}});//输出resCon.print("con :");env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.con :> (1,bs)con :> (2,jj)con :> (3,jw)con :> (1,jj)con :> (java,test1)con :> (c,test1)con :> (c++,test1)
union
DataStream<类型> unionStream = A流.union(B流);
Connect 与 Union 区别
1. Union 之前两个流的类型必须是一样,Connect 可以不一样。2. Connect 只能操作两个流,Union 可以操作多个。
