目的:用connet将不同的两个流合并,用comap处理后返回一个流,对比union
环境:jdk8 工具:idea
创建maven项目
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.readTextFile("文件1绝对路径");
DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
@Override
public Test map(String value) throws Exception {
String[] words = value.split(" ");
return new Test(words[0], Integer.parseInt(words[1]), words[2]);
}
});
DataStream<String> inputStream1 = env.readTextFile("文件2绝对路径");
//连接两条流
ConnectedStreams<Test, String> con = dataStream.connect(inputStream1);
//合并两条流,分别定义不同流的返回结构
DataStream<Object> resCon = con.map(new CoMapFunction<Test, String, Object>() {
@Override
public Object map1(Test value) throws Exception {
return new Tuple2<>(value.getId(),value.getContent());
}
@Override
public Object map2(String value) throws Exception {
return new Tuple2<>(value,"test1");
}
});
//输出
resCon.print("con :");
env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
con :> (1,bs)
con :> (2,jj)
con :> (3,jw)
con :> (1,jj)
con :> (java,test1)
con :> (c,test1)
con :> (c++,test1)
union
DataStream<类型> unionStream = A流.union(B流);
Connect 与 Union 区别
1. Union 之前两个流的类型必须是一样,Connect 可以不一样。
2. Connect 只能操作两个流,Union 可以操作多个。