• join 不能输出未匹配的事件。 connect能

分流

  • 分流方式一:split:1.13已废弃
  • 分流方式二:processFunction的sideputput ```java SplitStream splitStream = mapStream.split(new OutputSelector() { @Override public Iterable select(Sensorreading value) {
    1. return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");
    } });

DataStream highStream = splitStream.select(“high”); DataStream lowStream = splitStream.select(“low”);

  1. ```java
  2. public class SplitStreamByOutputTag {
  3. public static void main(String[] args) throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<Event> stream = env.addSource(new ClickSource());
  6. OutputTag<Event> maryOutPut = new OutputTag<>("Mary-pv");
  7. OutputTag<Event> bobOutPut = new OutputTag<>("Bob-pv");
  8. SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
  9. @Override
  10. public void processElement(Event event, Context context, Collector<Event> collector) throws Exception {
  11. if ("Mary".equals(event.user)) {
  12. context.output(maryOutPut, event);
  13. } else if ("Bob".equals(event.user)) {
  14. context.output(bobOutPut, event);
  15. } else {
  16. collector.collect(event);
  17. }
  18. }
  19. });
  20. processedStream.getSideOutput(maryOutPut).print("mary");
  21. processedStream.getSideOutput(bobOutPut).print("bob");
  22. processedStream.print("else");
  23. env.execute();
  24. }
  25. }

合流

  • 方式一:connect + coMap/coFlatmap/CoProcessFunction
    • Connect 之后,只是被放在了一个同一个流中,内部依然保持 各自的数据和形式不发生任何变化,两个流相互独立
    • coMap:ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。
  • 方式二:union

  • Connect 与 Union 区别:

    • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。
    • Connect 只能操作两个流,Union 可以操作多个。 ```java // connect + coMap/coFlatMap ConnectedStreams, SensorReading> connectedStreams = warningStream.connect(lowTempStream); // CoMapFunction三个参数:第一条流、第二条流、合并后的类型 DataStream resultStream = connectedStreams.map(new CoMapFunction, SensorReading, Object>() { @Override public Object map1(Tuple2 value) throws Exception { return new Tuple3<>(value.f0, value.f1, “high temp warning”); }

      @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), “normal”); } });

      // union DataStream unionStream = highStream.union(lowStream, allStream);

      1. <a name="WqZYW"></a>
      2. ### join
      3. <a name="TtmnJ"></a>
      4. #### window join
      5. 开窗之后 窗口内的数据进行笛卡尔积join
      6. - .where()传入KeySelector,用来指定第一条流中的 key;
      7. - .equalTo()传入KeySelector 则指定了第二条流中的 key
      8. ```java
      9. // 基于窗口的join
      10. public class WindowJoinTest {
      11. public static void main(String[] args) throws Exception {
      12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      13. env.setParallelism(1);
      14. DataStream<Tuple2<String, Long>> stream1 = env
      15. .fromElements(
      16. Tuple2.of("a", 1000L),
      17. Tuple2.of("b", 1000L),
      18. Tuple2.of("a", 2000L),
      19. Tuple2.of("b", 2000L)
      20. )
      21. .assignTimestampsAndWatermarks(
      22. WatermarkStrategy
      23. .<Tuple2<String, Long>>forMonotonousTimestamps()
      24. .withTimestampAssigner(
      25. new SerializableTimestampAssigner<Tuple2<String, Long>>() {
      26. @Override
      27. public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
      28. return stringLongTuple2.f1;
      29. }
      30. }
      31. )
      32. );
      33. DataStream<Tuple2<String, Long>> stream2 = env
      34. .fromElements(
      35. Tuple2.of("a", 3000L),
      36. Tuple2.of("b", 3000L),
      37. Tuple2.of("a", 4000L),
      38. Tuple2.of("b", 4000L)
      39. )
      40. .assignTimestampsAndWatermarks(
      41. WatermarkStrategy
      42. .<Tuple2<String, Long>>forMonotonousTimestamps()
      43. .withTimestampAssigner(
      44. new SerializableTimestampAssigner<Tuple2<String, Long>>() {
      45. @Override
      46. public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
      47. return stringLongTuple2.f1;
      48. }
      49. }
      50. )
      51. );
      52. stream1
      53. .join(stream2)
      54. .where(r -> r.f0)
      55. .equalTo(r -> r.f0)
      56. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      57. .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
      58. @Override
      59. public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
      60. return left + "=>" + right;
      61. }
      62. })
      63. .print();
      64. env.execute();
      65. }
      66. }

      windows coGroup join

      • coGroup方法前两个参数 不再是单独的每一组“配对”数据了,而是传入了可遍历的数据集合iter1、iter2。也就是说,现在不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎 样配对完全是自定义的。
      • coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的inner join也可以实现left join、right join、full join。
      • 事实上,窗口 join 的底层,也是通过 coGroup 来实现的。

        stream1
          .coGroup(stream2)
          .where(r -> r.f0)
          .equalTo(r -> r.f0)
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
              @Override
              public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
                  collector.collect(iter1 + "=>" + iter2);
              }
          })
          .print();
        

        interval Join

        以一条流作为基础,每一个事件 在[上界时间,下界时间]的时间范围内进行join,只能输出匹配到的事件,未匹配的事件无法捕获

      • 重写ProcessJoinFunction:按照keyby的字段进行 = join ```java // 基于间隔的join public class IntervalJoinTest {

        public static void main(String[] args) throws Exception {

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(1);
        
          SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                  Tuple3.of("Mary", "order-1", 5000L),
                  Tuple3.of("Alice", "order-2", 5000L),
                  Tuple3.of("Bob", "order-3", 20000L),
                  Tuple3.of("Alice", "order-4", 20000L),
                  Tuple3.of("Cary", "order-5", 51000L)
          ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                  .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                      @Override
                      public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                          return element.f2;
                      }
                  })
          );
        
          SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                  new Event("Bob", "./cart", 2000L),
                  new Event("Alice", "./prod?id=100", 3000L),
                  new Event("Alice", "./prod?id=200", 3500L),
                  new Event("Bob", "./prod?id=2", 2500L),
                  new Event("Alice", "./prod?id=300", 36000L),
                  new Event("Bob", "./home", 30000L),
                  new Event("Bob", "./prod?id=1", 23000L),
                  new Event("Bob", "./prod?id=3", 33000L)
          ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                  .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                      @Override
                      public long extractTimestamp(Event element, long recordTimestamp) {
                          return element.timestamp;
                      }
                  })
          );
        
          orderStream.keyBy(data -> data.f0)
                  .intervalJoin(clickStream.keyBy(data -> data.user))
                  .between(Time.seconds(-5), Time.seconds(10))
                  .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
                      @Override
                      public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                          out.collect(left + " => " + right);
                      }
                  })
                  .print();
        
          env.execute();
        

        }}

      > (Alice,order-2,5000) => Event{user=’Alice’, url=’./prod?id=100’, timestamp=1970-01-01 08:00:03.0} (Alice,order-2,5000) => Event{user=’Alice’, url=’./prod?id=200’, timestamp=1970-01-01 08:00:03.5} (Bob,order-3,20000) => Event{user=’Bob’, url=’./home’, timestamp=1970-01-01 08:00:30.0} (Bob,order-3,20000) => Event{user=’Bob’, url=’./prod?id=1’, timestamp=1970-01-01 08:00:23.0} ``` �


      维度扩展 enrich

      https://www.jianshu.com/p/21f60a37b83a
      https://www.jianshu.com/p/a62fa483ff54