目的:用connet将不同的两个流合并,用comap处理后返回一个流,对比union
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. DataStream<String> inputStream = env.readTextFile("文件1绝对路径");
  4. DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
  5. @Override
  6. public Test map(String value) throws Exception {
  7. String[] words = value.split(" ");
  8. return new Test(words[0], Integer.parseInt(words[1]), words[2]);
  9. }
  10. });
  11. DataStream<String> inputStream1 = env.readTextFile("文件2绝对路径");
  12. //连接两条流
  13. ConnectedStreams<Test, String> con = dataStream.connect(inputStream1);
  14. //合并两条流,分别定义不同流的返回结构
  15. DataStream<Object> resCon = con.map(new CoMapFunction<Test, String, Object>() {
  16. @Override
  17. public Object map1(Test value) throws Exception {
  18. return new Tuple2<>(value.getId(),value.getContent());
  19. }
  20. @Override
  21. public Object map2(String value) throws Exception {
  22. return new Tuple2<>(value,"test1");
  23. }
  24. });
  25. //输出
  26. resCon.print("con :");
  27. env.execute();

并流算子.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. con :> (1,bs)
  5. con :> (2,jj)
  6. con :> (3,jw)
  7. con :> (1,jj)
  8. con :> (java,test1)
  9. con :> (c,test1)
  10. con :> (c++,test1)


union

  1. DataStream<类型> unionStream = A流.union(B流);

Connect 与 Union 区别

  1. 1 Union 之前两个流的类型必须是一样,Connect 可以不一样。
  2. 2. Connect 只能操作两个流,Union 可以操作多个。