分流合流windows coGroup joininterval Join维度扩展 enrich join 不能输出未匹配的事件。 connect能 � 分流 分流方式一:split:1.13已废弃分流方式二:processFunction的sideputput ```java SplitStream splitStream = mapStream.split(new OutputSelector() { @Override public Iterable select(Sensorreading value) { return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream highStream = splitStream.select(“high”); DataStream lowStream = splitStream.select(“low”); ```javapublic class SplitStreamByOutputTag { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Event> stream = env.addSource(new ClickSource()); OutputTag<Event> maryOutPut = new OutputTag<>("Mary-pv"); OutputTag<Event> bobOutPut = new OutputTag<>("Bob-pv"); SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() { @Override public void processElement(Event event, Context context, Collector<Event> collector) throws Exception { if ("Mary".equals(event.user)) { context.output(maryOutPut, event); } else if ("Bob".equals(event.user)) { context.output(bobOutPut, event); } else { collector.collect(event); } } }); processedStream.getSideOutput(maryOutPut).print("mary"); processedStream.getSideOutput(bobOutPut).print("bob"); processedStream.print("else"); env.execute(); }} 合流 方式一:connect + coMap/coFlatmap/CoProcessFunction Connect 之后,只是被放在了一个同一个流中,内部依然保持 各自的数据和形式不发生任何变化,两个流相互独立coMap:ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。 方式二:union Connect 与 Union 区别: Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。Connect 只能操作两个流,Union 可以操作多个。 ```java // connect + coMap/coFlatMap ConnectedStreams, SensorReading> connectedStreams = warningStream.connect(lowTempStream); // CoMapFunction三个参数:第一条流、第二条流、合并后的类型 DataStream resultStream = connectedStreams.map(new CoMapFunction, SensorReading, Object>() { @Override public Object map1(Tuple2 value) throws Exception { return new Tuple3<>(value.f0, value.f1, “high temp warning”); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), “normal”); } }); // union DataStream unionStream = highStream.union(lowStream, allStream); <a name="WqZYW"></a>### join<a name="TtmnJ"></a>#### window join开窗之后 窗口内的数据进行笛卡尔积join- .where()传入KeySelector,用来指定第一条流中的 key; - .equalTo()传入KeySelector 则指定了第二条流中的 key```java// 基于窗口的joinpublic class WindowJoinTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<String, Long>> stream1 = env .fromElements( Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); DataStream<Tuple2<String, Long>> stream2 = env .fromElements( Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); stream1 .join(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() { @Override public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception { return left + "=>" + right; } }) .print(); env.execute(); }} windows coGroup join coGroup方法前两个参数 不再是单独的每一组“配对”数据了,而是传入了可遍历的数据集合iter1、iter2。也就是说,现在不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎 样配对完全是自定义的。coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的inner join也可以实现left join、right join、full join。事实上,窗口 join 的底层,也是通过 coGroup 来实现的。 stream1 .coGroup(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() { @Override public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception { collector.collect(iter1 + "=>" + iter2); } }) .print(); interval Join以一条流作为基础,每一个事件 在[上界时间,下界时间]的时间范围内进行join,只能输出匹配到的事件,未匹配的事件无法捕获 重写ProcessJoinFunction:按照keyby的字段进行 = join ```java // 基于间隔的join public class IntervalJoinTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements( Tuple3.of("Mary", "order-1", 5000L), Tuple3.of("Alice", "order-2", 5000L), Tuple3.of("Bob", "order-3", 20000L), Tuple3.of("Alice", "order-4", 20000L), Tuple3.of("Cary", "order-5", 51000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() { @Override public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) { return element.f2; } }) ); SingleOutputStreamOperator<Event> clickStream = env.fromElements( new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L), new Event("Alice", "./prod?id=300", 36000L), new Event("Bob", "./home", 30000L), new Event("Bob", "./prod?id=1", 23000L), new Event("Bob", "./prod?id=3", 33000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); orderStream.keyBy(data -> data.f0) .intervalJoin(clickStream.keyBy(data -> data.user)) .between(Time.seconds(-5), Time.seconds(10)) .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() { @Override public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception { out.collect(left + " => " + right); } }) .print(); env.execute(); }} > (Alice,order-2,5000) => Event{user=’Alice’, url=’./prod?id=100’, timestamp=1970-01-01 08:00:03.0} (Alice,order-2,5000) => Event{user=’Alice’, url=’./prod?id=200’, timestamp=1970-01-01 08:00:03.5} (Bob,order-3,20000) => Event{user=’Bob’, url=’./home’, timestamp=1970-01-01 08:00:30.0} (Bob,order-3,20000) => Event{user=’Bob’, url=’./prod?id=1’, timestamp=1970-01-01 08:00:23.0} ``` � 维度扩展 enrichhttps://www.jianshu.com/p/21f60a37b83ahttps://www.jianshu.com/p/a62fa483ff54