window算子在flink中是非常重要的,要理解window算子首先要明白window的相关机制和原理。本文将从实战的角度讲解api的使用,详细的原理机制建议先阅读官方文档Windows。下面以Tumbling Windows为例讲解一些常见用法。下面基于ProcessingTime的样例都适用于EventTime。

基于ProcessingTime的基本用法

Window Join

定义

Transformation Description
DataStream,DataStream → DataStream Join two data streams on a given key and a common window.

说明

将两个window的数据进行join

样例

代码

  1. public class WindowJoinDemo {
  2. public static void main(String[] args) throws Exception {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
  6. DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource());
  7. DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(orangeStream, greenStream, 5);
  8. joinedStream.print("join");
  9. env.execute("Windowed Join Demo");
  10. }
  11. public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
  12. DataStream<Tuple2<String, Integer>> grades,
  13. DataStream<Tuple2<String, Integer>> salaries,
  14. long windowSize) {
  15. return grades.join(salaries)
  16. .where(new NameKeySelector())
  17. .equalTo(new NameKeySelector())
  18. .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
  19. .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
  20. @Override
  21. public Tuple3<String, Integer, Integer> join(
  22. Tuple2<String, Integer> first,
  23. Tuple2<String, Integer> second) {
  24. return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
  25. }
  26. });
  27. }
  28. private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
  29. @Override
  30. public String getKey(Tuple2<String, Integer> value) {
  31. return value.f0;
  32. }
  33. }
  34. private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
  35. private volatile boolean running = true;
  36. @Override
  37. public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
  38. int bound = 50;
  39. String[] keys = new String[]{"foo", "bar", "baz"};
  40. final long numElements = RandomUtils.nextLong(10, 20);
  41. int i = 0;
  42. while (running && i < numElements) {
  43. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  44. Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
  45. ctx.collect(data);
  46. System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
  47. i++;
  48. }
  49. }
  50. @Override
  51. public void cancel() {
  52. running = false;
  53. }
  54. }
  55. }

输出结果

  1. 59-sand data:(bar,49)
  2. 58-sand data:(bar,44)
  3. 58-sand data:(foo,2)
  4. 59-sand data:(baz,34)
  5. 58-sand data:(baz,2)
  6. 59-sand data:(baz,29)
  7. join> (baz,34,2)
  8. join> (baz,29,2)

说明

两条流里面的数据类型都是Tuple2,随机生成一些数据,窗口大小设置为5秒,根据两个流数据中的key进行join

Window CoGroup

定义

Transformation Description
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window.

说明

coGroup方法的是用与上面join方法类似,不同的地方在于coGroup方法可以拿到两个窗口的所有数据,所以可以实现更多的场景,例如join就相当于coGroup的特例,也就是两个窗口的数据集根据key取交集。

样例

代码

  1. public class WindowCoGroupDemo {
  2. public static void main(String[] args) throws Exception {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
  6. DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource());
  7. DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowCoGroup(orangeStream, greenStream, 10);
  8. joinedStream.print();
  9. env.execute("Windowed CoGroup Demo");
  10. }
  11. public static DataStream<Tuple3<String, Integer, Integer>> runWindowCoGroup(
  12. DataStream<Tuple2<String, Integer>> orangeStream,
  13. DataStream<Tuple2<String, Integer>> greenStream,
  14. long windowSize) {
  15. return orangeStream.coGroup(greenStream)
  16. .where(new NameKeySelector())
  17. .equalTo(new NameKeySelector())
  18. .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize)))
  19. .apply(new Join());
  20. }
  21. private static class Join implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>{
  22. @Override
  23. public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
  24. first.forEach(x -> {
  25. second.forEach(y -> {
  26. if (x.f0.equals(y.f0)){
  27. out.collect(new Tuple3<>(x.f0, x.f1, y.f1));
  28. }
  29. });
  30. });
  31. }
  32. }
  33. private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
  34. @Override
  35. public String getKey(Tuple2<String, Integer> value) {
  36. return value.f0;
  37. }
  38. }
  39. private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
  40. private volatile boolean running = true;
  41. @Override
  42. public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
  43. int bound = 50;
  44. String[] keys = new String[]{"foo", "bar", "baz"};
  45. final long numElements = RandomUtils.nextLong(10, 20);
  46. int i = 0;
  47. while (running && i < numElements) {
  48. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  49. Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
  50. ctx.collect(data);
  51. System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
  52. i++;
  53. }
  54. }
  55. @Override
  56. public void cancel() {
  57. running = false;
  58. }
  59. }
  60. }

输出结果

  1. 59-sand data:(baz,4)
  2. 59-sand data:(foo,48)
  3. 57-sand data:(foo,34)
  4. 57-sand data:(foo,40)
  5. 59-sand data:(baz,24)
  6. 59-sand data:(bar,1)
  7. 57-sand data:(bar,22)
  8. 57-sand data:(bar,41)
  9. (bar,1,22)
  10. (bar,1,41)
  11. (foo,48,34)
  12. (foo,48,40)

说明

样例中对两个窗口的数据进行了类似join的计算

基于EventTime的基本用法

EventTime&Watermark

样例

代码

  1. public class EventTimeWindowDemo {
  2. public static void main(String[] args) throws Exception {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  6. DataStream<Tuple3<String, Integer, Long>> orangeStream = env.addSource(new DataSource("orangeStream"))
  7. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
  8. orangeStream.keyBy(0)
  9. .window(TumblingEventTimeWindows.of(Time.seconds(30)))
  10. .apply(new WindowFunction<Tuple3<String, Integer, Long>, Object, Tuple, TimeWindow>() {
  11. @Override
  12. public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Integer, Long>> input, Collector<Object> out) throws Exception {
  13. System.out.println(window.toString());
  14. out.collect(input);
  15. }
  16. }).name("EventTimeWindow").print("out");
  17. env.execute("EventTime Demo");
  18. }
  19. private static class DataSource extends RichParallelSourceFunction<Tuple3<String, Integer, Long>> {
  20. private volatile boolean running = true;
  21. private volatile String name;
  22. public DataSource(String name) {
  23. this.name = name;
  24. }
  25. @Override
  26. public void run(SourceContext<Tuple3<String, Integer, Long>> ctx) throws Exception {
  27. Random random = new Random();
  28. int bound = 100;
  29. final long numElements = 10;
  30. int i = 0;
  31. while (running && i < numElements) {
  32. Thread.sleep(1500);
  33. Tuple3 data = new Tuple3<>("foo", random.nextInt(bound), getRandomInt(i*10, 60+i*10));
  34. ctx.collect(data);
  35. System.out.println(Thread.currentThread().getId() + "-" + this.name + "-sand data:" + data);
  36. i++;
  37. }
  38. Thread.sleep(5000);
  39. }
  40. @Override
  41. public void cancel() {
  42. running = false;
  43. }
  44. private long getRandomInt(int min, int max){
  45. return 1573441860000L + 1000* RandomUtils.nextInt(min, max);
  46. }
  47. }
  48. private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Tuple3<String, Integer, Long>> {
  49. private final long maxOutOfOrderness = 10000;
  50. private long currentMaxTimestamp;
  51. @Override
  52. public long extractTimestamp(Tuple3<String, Integer, Long> row, long previousElementTimestamp) {
  53. long timestamp = row.f2;
  54. currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  55. System.out.println(Thread.currentThread().getId() + "-" + row + ",time="+stampToDate(row.f2.toString()) + ",watermark=" + stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
  56. return timestamp;
  57. }
  58. @Override
  59. public Watermark getCurrentWatermark() {
  60. return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  61. }
  62. private static String stampToDate(String s) {
  63. String res;
  64. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  65. long lt = new Long(s);
  66. Date date = new Date(lt);
  67. res = simpleDateFormat.format(date);
  68. return res;
  69. }
  70. }
  71. }

输出结果

wm.m4v (16.45MB)

说明

  1. 时间窗口设置为30s
  2. watermark的计算公式为当前最大时间戳减去10s,也就是最大可容忍延迟10s的数据
  3. 默认采用的是EventTimeTrigger,下面是触发窗口计算的公式,其中window.maxTimestamp()返回的是窗口结束时间-1毫秒
  1. @Override
  2. public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  3. if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
  4. // if the watermark is already past the window fire immediately
  5. return TriggerResult.FIRE;
  6. } else {
  7. ctx.registerEventTimeTimer(window.maxTimestamp());
  8. return TriggerResult.CONTINUE;
  9. }
  10. }

Interval Join

定义

Transformation Description
KeyedStream,KeyedStream → DataStream Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

说明

  1. 这个算子只支持EventTime
  2. 下图为这个算子的基本原理,watermark是对单个流中数据允许迟到多久进行控制的一个机制,而两个流进行join,就会涉及到两条流中的窗口是否同步的问题,这样就要考虑流和流之间的窗口存在延迟的情况,也就是between要指定的时间

image.png

  1. 上面介绍的Window Join算子(如下图)是基于两个相同时间窗口内所有数据的inner join;而Interval Join是以每个元素为视角,一条流中的元素去另一条流中查找key相同的元素,并且两个元素的时间戳要满足a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image.png

样例

代码

  1. public class IntervalJoinDemo {
  2. public static void main(String[] args) throws Exception {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. //Time-bounded stream joins are only supported in event time
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  7. DataStream<Tuple3<String, Integer, Long>> orangeStream = env.addSource(new DataSource("orangeStream")).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
  8. DataStream<Tuple3<String, Integer, Long>> greenStream = env.addSource(new DataSource("greenStream")).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
  9. orangeStream
  10. .keyBy(0)
  11. .intervalJoin(greenStream.keyBy(0))
  12. .between(Time.seconds(-5), Time.seconds(5))
  13. .process(new ProcessJoinFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Object>() {
  14. @Override
  15. public void processElement(Tuple3<String, Integer, Long> left, Tuple3<String, Integer, Long> right, Context ctx, Collector<Object> out) throws Exception {
  16. out.collect(new Tuple5<>(left.f0, left.f1, left.f2, right.f1, right.f2));
  17. }
  18. }).name("intervalJoin").print("xxxxxx");
  19. env.execute("Interval Join Demo");
  20. }
  21. private static class DataSource extends RichParallelSourceFunction<Tuple3<String, Integer, Long>> {
  22. private volatile boolean running = true;
  23. private volatile String name;
  24. public DataSource(String name) {
  25. this.name = name;
  26. }
  27. @Override
  28. public void run(SourceContext<Tuple3<String, Integer, Long>> ctx) throws Exception {
  29. Random random = new Random();
  30. int bound = 100;
  31. Tuple3[] data = new Tuple3[]{
  32. new Tuple3<>("foo", random.nextInt(bound), getRandomInt(50, 70)), new Tuple3<>("foo", random.nextInt(bound), getRandomInt(40, 60))};
  33. final long numElements = data.length;
  34. int i = 0;
  35. while (running && i < numElements) {
  36. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  37. ctx.collect(data[i]);
  38. System.out.println(Thread.currentThread().getId() + "-" + this.name + "-sand data:" + data[i]);
  39. i++;
  40. }
  41. }
  42. @Override
  43. public void cancel() {
  44. running = false;
  45. }
  46. private long getRandomInt(int min, int max){
  47. return 1573441870000L + 1000*(new Random().nextInt(max-min+1)+min);
  48. }
  49. }
  50. private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Tuple3<String, Integer, Long>> {
  51. private final long maxOutOfOrderness = 10000;
  52. private long currentMaxTimestamp;
  53. @Override
  54. public long extractTimestamp(Tuple3<String, Integer, Long> row, long previousElementTimestamp) {
  55. System.out.println(Thread.currentThread().getId() + "-" + row + ",time="+stampToDate(row.f2.toString()));
  56. long timestamp = row.f2;
  57. currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  58. System.out.println(Thread.currentThread().getId() + "-watermark:" + stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
  59. return timestamp;
  60. }
  61. @Override
  62. public Watermark getCurrentWatermark() {
  63. return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  64. }
  65. private static String stampToDate(String s) {
  66. String res;
  67. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  68. long lt = new Long(s);
  69. Date date = new Date(lt);
  70. res = simpleDateFormat.format(date);
  71. return res;
  72. }
  73. }
  74. }