Windows

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality.
Windows是处理无限流的核心。Windows将数据流拆分为有限大小的“桶”,我们可以在其中应用计算。本文档重点介绍如何在FLink中执行窗口,以及程序员如何从其提供的功能中获益。

The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams, while the second to non-keyed ones. As one can see, the only difference is the keyBy(...) call for the keyed streams and the window(...) which becomes windowAll(...) for non-keyed streams. This is also going to serve as a roadmap for the rest of the page.
下面给出了一个窗口Flink程序的一般结构。第一个代码段引用_keed_Streams,而第二个代码段指向_non-keeded_ones。可以看到,唯一的区别是对键控流的“keyBy(.)”调用和对非键流的“windows All(.)”的调用。这也将作为页面其余部分的路线图。

Keyed Windows
Keyed Windows 带键的窗口

  1. stream
  2. .keyBy(...) <- keyed versus non-keyed windows
  3. .keyBy(...) <- 键控与非键控窗口
  4. .window(...) <- required: "assigner"
  5. .window(...) <- 必须的: "assigner"
  6. [.trigger(...)] <- optional: "trigger" (else default trigger)
  7. [.trigger(...)] <- 可选择的: "trigger" (其他默认触发器)
  8. [.evictor(...)] <- optional: "evictor" (else no evictor)
  9. [.evictor(...)] <- 可选择的: "evictor" (其他默认触发器)
  10. [.allowedLateness(...)] <- optional: "lateness" (else zero)
  11. [.allowedLateness(...)] <- 可选择的: "lateness" (else zero)
  12. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
  13. [.sideOutputLateData(...)] <- 可选择的: "output tag" (else no side output for late data)
  14. .reduce/aggregate/fold/apply() <- required: "function"
  15. .reduce/aggregate/fold/apply() <- 必须的: "function"
  16. [.getSideOutput(...)] <- optional: "output tag"
  17. [.getSideOutput(...)] <- 可选择的: "output tag"

Non-Keyed Windows

  1. stream
  2. .windowAll(...) <- required: "assigner"
  3. .windowAll(...) <- 必须的: "assigner"
  4. [.trigger(...)] <- optional: "trigger" (else default trigger)
  5. [.trigger(...)] <- 可选择的: "trigger" (else default trigger)
  6. [.evictor(...)] <- optional: "evictor" (else no evictor)
  7. [.evictor(...)] <- 可选择的: "evictor" (else no evictor)
  8. [.allowedLateness(...)] <- optional: "lateness" (else zero)
  9. [.allowedLateness(...)] <- 可选择的: "lateness" (else zero)
  10. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
  11. [.sideOutputLateData(...)] <- 可选择的: "output tag" (else no side output for late data)
  12. .reduce/aggregate/fold/apply() <- required: "function"
  13. .reduce/aggregate/fold/apply() <- 必须的: "function"
  14. [.getSideOutput(...)] <- optional: "output tag"
  15. [.getSideOutput(...)] <- 可选择的: "output tag"

In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize your windowing logic in many different ways so that it best fits your needs.
在上面,方括号中的命令([.])是可选的。这揭示了Flink允许您以许多不同的方式定制您的窗口逻辑,以便它最适合您的需要。

Window Lifecycle

Window Lifecycle 窗口生命周期

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners). For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp.
简而言之,当应该属于该窗口的第一个元素到达时,就会创建一个窗口,并且当时间(事件或处理时间)通过其结束时间戳加上用户指定的“允许延迟”(请参见[允许的延迟])时,该窗口将被完全删除*。Flink只保证移除基于时间的窗口,而不保证其他类型的删除,例如_全局窗口(参见[窗口分配者](docs_1.7#Window-Designers))。例如,基于事件时间的窗口策略每5分钟创建一次不重叠(或翻滚)窗口,允许延迟1分钟,Flink将创建一个新窗口,其间隔为“12:00”和“12:05”之间的间隔,当第一个具有该间隔的时间戳的元素到达时,它将在水印通过“12:06”时间戳时将其删除。

In addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.
此外,每个窗口都将具有一个“触发器”(见[触发器](#触发器))和一个函数(“processwindowfunction”、“还原函数”、“AggregateFunction”或“FoldFunction”)(参见[Window函数](#Window-函数))。函数将包含要应用到窗口内容的计算,而“触发器”指定为要应用的函数准备窗口的条件。触发策略可能是像“当窗口中的元素数大于4时”或“当水印通过窗口的结尾时,”之类的东西。触发器还可以决定在其创建和移除之间的任何时间清除窗口的内容。在此情况下清除仅引用窗口中的元素,而_NOT_theWindow元数据。这意味着新数据仍可添加到该窗口中。

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.
除上述情况外,您还可以指定一个`Evictor’(见[Evictors](#evictors)),它将能够在触发后以及在应用该函数之前和/或之后从窗口中删除元素。

In the following we go into more detail for each of the components above. We start with the required parts in the above snippet (see Keyed vs Non-Keyed Windows, Window Assigner, and Window Function) before moving to the optional ones.
下面我们对上述每个组件进行更详细的介绍。在移动到可选部分之前,我们从上述片段中的所需部分开始(请参见[键入的vs非键窗口](#键-vs-非键-窗口)、[窗口分配器](#窗口-分配器)和[窗口函数](#窗口-函数))。

Keyed vs Non-Keyed Windows

Keyed vs Non-Keyed Windows 键控与非键窗口

The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window. Using the keyBy(...) will split your infinite stream into logical keyed streams. If keyBy(...) is not called, your stream is not keyed.
首先要指定的是您的流是否应该键控。这必须在定义窗口之前完成。使用‘keyBy(.)’将您的无限流拆分为逻辑键控流。如果“keyBy(.)”未被调用,则不会对您的流进行键操作。

In the case of keyed streams, any attribute of your incoming events can be used as a key (more details here). Having a keyed stream will allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task.
在键控流的情况下,输入事件的任何属性都可以用作密钥(更多详细信息[此处](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/api_concepts.html#specifying-keys))。具有键控流将允许通过多个任务并行地执行窗口计算,因为每个逻辑键控流可以独立于其余部分被处理。所有引用相同密钥的元素将被发送到相同的并行任务。

In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic will be performed by a single task, i.e. with parallelism of 1.
在非键控流的情况下,原始流不会被分割成多个逻辑流,所有窗口逻辑都将由单个任务执行,即__具有并行性为1。

Window Assigners

Window Assigners 窗口分配者

After specifying whether your stream is keyed or not, the next step is to define a window assigner. The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyed streams) or the windowAll() (for non-keyed streams) call.
指定流是否键控之后,下一步是定义_Window转让人。窗口分配器定义如何将元素分配给windows。这是通过在“Window(.)”(For_Keed_Streams)或“windowAll()”(for_non-keed_Streams)调用中指定您选择的“WindowAssigner”来完成的。

A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time. Please take a look at our section on event time to learn about the difference between processing time and event time and how timestamps and watermarks are generated.
“windowassigner”负责将每个传入元素分配给一个或多个窗口。flink与预定义的窗口分配器一起使用,用于最常见的用例,即翻滚Windows滑动WindowssessionWindowsglobalWindows。还可以通过扩展“windowassigner”类来实现自定义窗口分配器。所有内置的窗口分配器(除全局窗口外)都根据时间将元素分配给Windows,这可以是处理时间或事件时间。请查看我们的事件时间一节,了解处理时间和事件时间之间的差异以及生成时间戳和水印的方式。

Time-based windows have a start timestamp (inclusive) and an end timestamp (exclusive) that together describe the size of the window. In code, Flink uses TimeWindow when working with time-based windows which has methods for querying the start- and end-timestamp and also an additional method maxTimestamp() that returns the largest allowed timestamp for a given windows.
基于时间的窗口有START时间戳(包括)和End时间戳(独占),它们一起描述窗口的大小。在代码中,Flink在使用具有查询开始和结束时间戳的方法的基于时间的窗口时使用了“TimeWindows”,还使用了一个额外的方法“maxTimeSTAMP()”,该方法返回给定窗口允许的最大时间戳。

In the following, we show how Flink’s pre-defined window assigners work and how they are used in a DataStream program. The following figures visualize the workings of each assigner. The purple circles represent elements of the stream, which are partitioned by some key (in this case user 1, user 2 and user 3). The x-axis shows the progress of time.
在下面,我们将展示Flink的预定义窗口分配程序是如何工作的,以及它们是如何在Datastream程序中使用的。下面的数字显示了每个指定者的工作方式。紫色圆圈表示流的元素,这些元素由一些键(在本例中为USER 1、_USER 2USER 3)进行分区。x轴表示时间的进步.

Tumbling Windows

Tumbling Windows 落地窗

A tumbling windows assigner assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure.
滚动窗口转让人将每个元素分配给指定窗口大小的窗口。翻滚窗口有固定的大小,不重叠。例如,如果您指定一个大小为5分钟的滚动窗口,则将对当前窗口进行评估,并将每五分钟启动一个新窗口,如下图所示。

Windows - 图1

The following code snippets show how to use tumbling windows.
下面的代码片段演示如何使用滚动的窗口。

  1. DataStream<T> input = ...;
  2. // tumbling event-time windows
  3. input
  4. .keyBy(<key selector>)
  5. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  6. .<windowed transformation>(<window function>);
  7. // tumbling processing-time windows
  8. input
  9. .keyBy(<key selector>)
  10. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  11. .<windowed transformation>(<window function>);
  12. // daily tumbling event-time windows offset by -8 hours.
  13. input
  14. .keyBy(<key selector>)
  15. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
  16. .<windowed transformation>(<window function>);
  1. val input: DataStream[T] = ...
  2. // tumbling event-time windows input
  3. .keyBy(<key selector>)
  4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  5. .<windowed transformation>(<window function>)
  6. // tumbling processing-time windows input
  7. .keyBy(<key selector>)
  8. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  9. .<windowed transformation>(<window function>)
  10. // daily tumbling event-time windows offset by -8 hours. input
  11. .keyBy(<key selector>)
  12. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
  13. .<windowed transformation>(<window function>)

Time intervals can be specified by using one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x), and so on.
时间间隔可以使用“时间.毫秒(X)”、“时间.秒(X)”、“时间.分钟(X)”等来指定。

As shown in the last example, tumbling window assigners also take an optional offset parameter that can be used to change the alignment of windows. For example, without offsets hourly tumbling windows are aligned with epoch, that is you will get windows such as 1:00:00.000 - 1:59:59.999, 2:00:00.000 - 2:59:59.999 and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get 1:15:00.000 - 2:14:59.999, 2:15:00.000 - 3:14:59.999 etc. An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
如上一个示例所示,滚动窗口分配程序也采用一个可选的“偏移”参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移量,每小时滚动的窗口都是对齐的,也就是说,您将得到“1:00:00.000-1:59:59:59.999”、“2:00:00.000-2:59:59:59.999”等窗口。如果您想要更改,您可以提供一个偏移。例如,如果偏移15分钟,您将得到“1:15:00.000-2:14:59.999”、“2:15:00.000-3:14:59.999”等。偏移的一个重要用例是调整窗口以适应UTC-0以外的时区。例如,在中国,您必须指定“时间.小时(-8)”的偏移量。

Sliding Windows

Sliding Windows 滑动窗口

The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumbling windows assigner, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.
slidingwindowsassigner将元素指定给长度固定的窗口。与翻滚窗口指定程序类似,窗口大小由windowsize参数配置。附加windowslide参数控制滑动窗口的启动频率。因此,如果滑块小于窗大小,则滑动窗口可能重叠。在这种情况下,元素被分配给多个窗口。

For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every 5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by the following figure.
例如,你可以有10分钟大小的窗口,滑动5分钟。有了这个,你每5分钟就会得到一个窗口,它包含在最后10分钟内到达的事件,如下图所示。

Windows - 图2

The following code snippets show how to use sliding windows.
以下代码片段显示如何使用滑动窗口。

  1. DataStream<T> input = ...;
  2. // sliding event-time windows
  3. input
  4. .keyBy(<key selector>)
  5. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  6. .<windowed transformation>(<window function>);
  7. // sliding processing-time windows
  8. input
  9. .keyBy(<key selector>)
  10. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  11. .<windowed transformation>(<window function>);
  12. // sliding processing-time windows offset by -8 hours
  13. input
  14. .keyBy(<key selector>)
  15. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
  16. .<windowed transformation>(<window function>);
  1. val input: DataStream[T] = ...
  2. // sliding event-time windows input
  3. .keyBy(<key selector>)
  4. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  5. .<windowed transformation>(<window function>)
  6. // sliding processing-time windows input
  7. .keyBy(<key selector>)
  8. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  9. .<windowed transformation>(<window function>)
  10. // sliding processing-time windows offset by -8 hours input
  11. .keyBy(<key selector>)
  12. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
  13. .<windowed transformation>(<window function>)

Time intervals can be specified by using one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x), and so on.
时间间隔可以使用“时间.毫秒(X)”、“时间.秒(X)”、“时间.分钟(X)”等来指定。

As shown in the last example, sliding window assigners also take an optional offset parameter that can be used to change the alignment of windows. For example, without offsets hourly windows sliding by 30 minutes are aligned with epoch, that is you will get windows such as 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get 1:15:00.000 - 2:14:59.999, 1:45:00.000 - 2:44:59.999 etc. An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
如最后一个示例所示,滑动窗口分配程序还采用可选的偏移量'参数,可用于更改窗口的对齐方式。例如,在没有偏移的情况下,每小时滑动30分钟的窗口将与时代对齐,也就是说,您将得到诸如1:00:00.000-1:59:59.99’、1:30.00.000-2:29:59.99'等窗口。如果您想更改,您可以给出一个偏移量。例如,用15分钟的偏移量,你会得到“1:15:00.000-2:14:59.99”,“1:45:00.000-2:44:59.999”等。偏移的一个重要用例是调整窗口以适应UT C-0以外的时区。例如,在中国,您必须指定time.hours(-8)’的偏移量。

Session Windows

Session Windows 会话窗口

The session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead a session window closes when it does not receive elements for a certain period of time, i.e., when a gap of inactivity occurred. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.
Session Windows转让人按活动会话对元素进行分组。会话窗口不重叠,并且没有固定的开始和结束时间,而不是滚动窗口和滑动窗口。相反,当会话窗口在某一段时间内不接收元素时关闭—即_,此时出现不活动的缺口。可以使用静态会话间隙会话间隙提取器_函数配置会话窗口分配程序,该函数定义不活动周期的长短。当此期间到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。

Windows - 图3

The following code snippets show how to use session windows.
以下代码片段显示了如何使用会话窗口。

  1. DataStream<T> input = ...;
  2. // event-time session windows with static gap
  3. input
  4. .keyBy(<key selector>)
  5. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  6. .<windowed transformation>(<window function>);
  7. // event-time session windows with dynamic gap
  8. input
  9. .keyBy(<key selector>)
  10. .window(EventTimeSessionWindows.withDynamicGap((element) -> {
  11. // determine and return session gap
  12. }))
  13. .<windowed transformation>(<window function>);
  14. // processing-time session windows with static gap
  15. input
  16. .keyBy(<key selector>)
  17. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  18. .<windowed transformation>(<window function>);
  19. // processing-time session windows with dynamic gap
  20. input
  21. .keyBy(<key selector>)
  22. .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
  23. // determine and return session gap
  24. }))
  25. .<windowed transformation>(<window function>);
  1. val input: DataStream[T] = ...
  2. // event-time session windows with static gap input
  3. .keyBy(<key selector>)
  4. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  5. .<windowed transformation>(<window function>)
  6. // event-time session windows with dynamic gap input
  7. .keyBy(<key selector>)
  8. .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
  9. override def extract(element: String): Long = {
  10. // determine and return session gap
  11. }
  12. }))
  13. .<windowed transformation>(<window function>)
  14. // processing-time session windows with static gap input
  15. .keyBy(<key selector>)
  16. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  17. .<windowed transformation>(<window function>)
  18. // processing-time session windows with dynamic gap input
  19. .keyBy(<key selector>)
  20. .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
  21. override def extract(element: String): Long = {
  22. // determine and return session gap
  23. }
  24. }))
  25. .<windowed transformation>(<window function>)

Static gaps can be specified by using one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x), and so on.
可以使用“time.毫秒(x)”、“time.seconds(x)”、“time.min(x)”等中的一个来指定静态间隙。

Dynamic gaps are specified by implementing the SessionWindowTimeGapExtractor interface.
通过实现“SessionWindowTimeGapExtractor”接口指定动态间隙。

Attention Since session windows do not have a fixed start and end, they are evaluated differently than tumbling and sliding windows. Internally, a session window operator creates a new window for each arriving record and merges windows together if their are closer to each other than the defined gap. In order to be mergeable, a session window operator requires a merging Trigger and a merging Window Function, such as ReduceFunction, AggregateFunction, or ProcessWindowFunction (FoldFunction cannot merge.)
注意,因为会话窗口没有固定的开始和结束,所以它们的评估方式不同于翻滚和滑动窗口。在内部,会话窗口操作符为每个到达的记录创建新窗口,并且如果它们彼此靠近,则将窗口合并在一起。为实现MERGE,会话窗口运算符需要合并触发器和合并Window函数,例如”还原函数”、”AggregateFunction”或”ProcessWindow函数”(“FoldFunction”)无法合并。)

Global Windows

Global Windows 全局Windows

A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
一个globalwindows赋值器将所有具有相同密钥的元素赋值给同一个_globalwindow。只有当您还指定自定义[触发器](#触发器)时,此加窗方案才有用。否则,将不会执行任何计算,因为全局窗口没有一个我们可以处理聚合元素的自然结束。

Windows - 图4

The following code snippets show how to use a global window.
下面的代码片段演示如何使用全局窗口。

  1. DataStream<T> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(GlobalWindows.create())
  5. .<windowed transformation>(<window function>);
  1. val input: DataStream[T] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .window(GlobalWindows.create())
  5. .<windowed transformation>(<window function>)

Window Functions

Window Functions 窗口函数

After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the window function, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing (see triggers for how Flink determines when a window is ready).
定义了窗口分配器后,我们需要指定我们要在这些窗口中的每一个上执行的计算。这是Window函数的责任,该函数用于在系统确定窗口已准备好用于处理(请参见[触发器](#触发器)(用于FLink如何确定窗口已就绪)时,处理每个(可能键入的)窗口的元素。

The window function can be one of ReduceFunction, AggregateFunction, FoldFunction or ProcessWindowFunction. The first two can be executed more efficiently (see State Size section) because Flink can incrementally aggregate the elements for each window as they arrive. A ProcessWindowFunction gets an Iterable for all the elements contained in a window and additional meta information about the window to which the elements belong.
窗口函数可以是“ReduceFunction”、“AggregateFunction”、“FoldFunction”或“ProcessWindows Function”。可以更有效地执行前两个部分(请参阅State size部分),因为Flink可以在每个窗口到达时递增地聚合元素。一个‘ProcessWindowFunction’为一个窗口中包含的所有元素获取一个‘Iterable’,并获得关于元素所属窗口的其他元信息。

A windowed transformation with a ProcessWindowFunction cannot be executed as efficiently as the other cases because Flink has to buffer all elements for a window internally before invoking the function. This can be mitigated by combining a ProcessWindowFunction with a ReduceFunction, AggregateFunction, or FoldFunction to get both incremental aggregation of window elements and the additional window metadata that the ProcessWindowFunction receives. We will look at examples for each of these variants.
带有“ProcessWindowFunction”的窗口转换不能像其他情况那样高效地执行,因为在调用函数之前,Flink必须在内部缓冲窗口的所有元素。这可以通过将“ProcessWindowFunction”与“ReduceFunction”、“AggregateFunction”或“FoldFunction”相结合来缓解,以获得窗口元素的增量聚合和“ProcessWindowFunction”接收的附加窗口元数据。我们将查看每个变体的示例。

ReduceFunction

ReduceFunction 还原功能

A ReduceFunction specifies how two elements from the input are combined to produce an output element of the same type. Flink uses a ReduceFunction to incrementally aggregate the elements of a window.
“ReduceFunction”指定如何组合输入中的两个元素以生成相同类型的输出元素。Flink使用“ReduceFunction”递增地聚合窗口的元素。

A ReduceFunction can be defined and used like this:
“ReduceFunction”可以这样定义和使用:

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .reduce(new ReduceFunction<Tuple2<String, Long>> {
  6. public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
  7. return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
  8. }
  9. });
  1. val input: DataStream[(String, Long)] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

The above example sums up the second fields of the tuples for all elements in a window.
上述示例对窗口中的所有元素的元组的第二字段求和。

AggregateFunction

AggregateFunction 聚合函数

An AggregateFunction is a generalized version of a ReduceFunction that has three types: an input type (IN), accumulator type (ACC), and an output type (OUT). The input type is the type of elements in the input stream and the AggregateFunction has a method for adding one input element to an accumulator. The interface also has methods for creating an initial accumulator, for merging two accumulators into one accumulator and for extracting an output (of type OUT) from an accumulator. We will see how this works in the example below.
“AggregateFunction”是一种“还原函数”的广义版本,它具有三种类型:输入类型(“in”)、累加器类型(“acc”)和输出类型(“out”)。输入类型是输入流中的元素类型,“AggregateFunction”具有将一个输入元素添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中,并且用于从累加器提取输出(类型“OUT”)。我们将在下面的示例中了解该如何工作。

Same as with ReduceFunction, Flink will incrementally aggregate input elements of a window as they arrive.
与“ReduceFunction”一样,Flink将在窗口的输入元素到达时递增地聚合它们。

An AggregateFunction can be defined and used like this:
“聚合功能”可以这样定义和使用:

  1. /**
  2. * The accumulator is used to keep a running sum and a count. The {@code getResult} method
  3. * computes the average.
  4. */
  5. private static class AverageAggregate
  6. implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  7. @Override
  8. public Tuple2<Long, Long> createAccumulator() {
  9. return new Tuple2<>(0L, 0L);
  10. }
  11. @Override
  12. public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
  13. return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  14. }
  15. @Override
  16. public Double getResult(Tuple2<Long, Long> accumulator) {
  17. return ((double) accumulator.f0) / accumulator.f1;
  18. }
  19. @Override
  20. public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
  21. return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  22. }
  23. }
  24. DataStream<Tuple2<String, Long>> input = ...;
  25. input
  26. .keyBy(<key selector>)
  27. .window(<window assigner>)
  28. .aggregate(new AverageAggregate());
  1. /**
  2. * The accumulator is used to keep a running sum and a count. The [getResult] method
  3. * computes the average.
  4. */
  5. class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  6. override def createAccumulator() = (0L, 0L)
  7. override def add(value: (String, Long), accumulator: (Long, Long)) =
  8. (accumulator._1 + value._2, accumulator._2 + 1L)
  9. override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
  10. override def merge(a: (Long, Long), b: (Long, Long)) =
  11. (a._1 + b._1, a._2 + b._2)
  12. }
  13. val input: DataStream[(String, Long)] = ...
  14. input
  15. .keyBy(<key selector>)
  16. .window(<window assigner>)
  17. .aggregate(new AverageAggregate)

The above example computes the average of the second field of the elements in the window.
上述示例计算窗口中元素的第二个字段的平均值。

FoldFunction

FoldFunction 折叠函数

A FoldFunction specifies how an input element of the window is combined with an element of the output type. The FoldFunction is incrementally called for each element that is added to the window and the current output value. The first element is combined with a pre-defined initial value of the output type.
“FoldFunction”指定窗口的输入元素如何与输出类型的元素组合。对添加到窗口中的每个元素和当前输出值增量调用“FoldFunction”。所述第一元素与所述输出类型的预定义初始值组合。

A FoldFunction can be defined and used like this:
“FoldFunction”可以如下定义和使用:

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
  6. public String fold(String acc, Tuple2<String, Long> value) {
  7. return acc + value.f1;
  8. }
  9. });
  1. val input: DataStream[(String, Long)] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .fold("") { (acc, v) => acc + v._2 }

The above example appends all input Long values to an initially empty String.
上面的示例将所有输入“长”值附加到最初空的“字符串”。

Attention fold() cannot be used with session windows or other mergeable windows.
注意“折叠()”不能与会话窗口或其他可变形的窗口一起使用。

ProcessWindowFunction

ProcessWindowFunction 进程窗口函数

A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context object with access to time and state information, which enables it to provide more flexibility than other window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing.
ProcessWindowFunction获得一个包含窗口所有元素的Iterable,以及一个访问时间和状态信息的上下文对象,这使得它能够提供比其他窗口函数更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为可以处理为止。

The signature of ProcessWindowFunction looks as follows:
“ProcessWindowsFunction”的签名如下所示:

  1. public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
  2. /**
  3. * Evaluates the window and outputs none or several elements.
  4. *
  5. * @param key The key for which this window is evaluated.
  6. * @param context The context in which the window is being evaluated.
  7. * @param elements The elements in the window being evaluated.
  8. * @param out A collector for emitting elements.
  9. *
  10. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  11. */
  12. public abstract void process(
  13. KEY key,
  14. Context context,
  15. Iterable<IN> elements,
  16. Collector<OUT> out) throws Exception;
  17. /**
  18. * The context holding window metadata.
  19. */
  20. public abstract class Context implements java.io.Serializable {
  21. /**
  22. * Returns the window that is being evaluated.
  23. */
  24. public abstract W window();
  25. /** Returns the current processing time. */
  26. public abstract long currentProcessingTime();
  27. /** Returns the current event-time watermark. */
  28. public abstract long currentWatermark();
  29. /**
  30. * State accessor for per-key and per-window state.
  31. *
  32. * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
  33. * by implementing {@link ProcessWindowFunction#clear(Context)}.
  34. */
  35. public abstract KeyedStateStore windowState();
  36. /**
  37. * State accessor for per-key global state.
  38. */
  39. public abstract KeyedStateStore globalState();
  40. }
  41. }
  1. abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
  2. /**
  3. * Evaluates the window and outputs none or several elements.
  4. *
  5. * @param key The key for which this window is evaluated.
  6. * @param context The context in which the window is being evaluated.
  7. * @param elements The elements in the window being evaluated.
  8. * @param out A collector for emitting elements.
  9. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  10. */
  11. def process(
  12. key: KEY,
  13. context: Context,
  14. elements: Iterable[IN],
  15. out: Collector[OUT])
  16. /**
  17. * The context holding window metadata
  18. */
  19. abstract class Context {
  20. /**
  21. * Returns the window that is being evaluated.
  22. */
  23. def window: W
  24. /**
  25. * Returns the current processing time.
  26. */
  27. def currentProcessingTime: Long
  28. /**
  29. * Returns the current event-time watermark.
  30. */
  31. def currentWatermark: Long
  32. /**
  33. * State accessor for per-key and per-window state.
  34. */
  35. def windowState: KeyedStateStore
  36. /**
  37. * State accessor for per-key global state.
  38. */
  39. def globalState: KeyedStateStore
  40. }
  41. }

Note The key parameter is the key that is extracted via the KeySelector that was specified for the keyBy() invocation. In case of tuple-index keys or string-field references this key type is always Tuple and you have to manually cast it to a tuple of the correct size to extract the key fields.
注意,key'参数是通过为keyBy()’调用指定的keySelector'提取的密钥。对于元组索引键或字符串字段引用,此键类型总是Tuple’,您必须手动将其转换为大小正确的元组才能提取键字段。

A ProcessWindowFunction can be defined and used like this:
可以定义并使用“processwindowfunction”,如:

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3. .keyBy(t -> t.f0)
  4. .timeWindow(Time.minutes(5))
  5. .process(new MyProcessWindowFunction());
  6. /* ... */
  7. public class MyProcessWindowFunction
  8. extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  9. @Override
  10. public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
  11. long count = 0;
  12. for (Tuple2<String, Long> in: input) {
  13. count++;
  14. }
  15. out.collect("Window: " + context.window() + "count: " + count);
  16. }
  17. }
  1. val input: DataStream[(String, Long)] = ...
  2. input
  3. .keyBy(_._1)
  4. .timeWindow(Time.minutes(5))
  5. .process(new MyProcessWindowFunction())
  6. /* ... */
  7. class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
  8. def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
  9. var count = 0L
  10. for (in <- input) {
  11. count = count + 1
  12. }
  13. out.collect(s"Window ${context.window} count: $count")
  14. }
  15. }

The example shows a ProcessWindowFunction that counts the elements in a window. In addition, the window function adds information about the window to the output.
该示例显示了一个“ProcessWindowFunction”,它对窗口中的元素进行计数。此外,窗口函数将有关窗口的信息添加到输出中。

Attention Note that using ProcessWindowFunction for simple aggregates such as count is quite inefficient. The next section shows how a ReduceFunction or AggregateFunction can be combined with a ProcessWindowFunction to get both incremental aggregation and the added information of a ProcessWindowFunction.
注意,对诸如count这样的简单聚合使用‘ProcessWindowction’是非常低效率的。下一节将展示如何将“ReduceFunction”或“AggregateFunction”与“ProcessWindowFunction”组合起来,以获得增量聚合和添加的“ProcessWindowFunction”信息。

ProcessWindowFunction with Incremental Aggregation

ProcessWindowFunction with Incremental Aggregation 具有增量聚合的进程窗口函数

A ProcessWindowFunction can be combined with either a ReduceFunction, an AggregateFunction, or a FoldFunction to incrementally aggregate elements as they arrive in the window. When the window is closed, the ProcessWindowFunction will be provided with the aggregated result. This allows it to incrementally compute windows while having access to the additional window meta information of the ProcessWindowFunction.
“ProcessWindowFunction”可以与“ReduceFunction”、“AggregateFunction”或“FoldFunction”组合,以便在元素到达窗口时递增地聚合元素。当窗口关闭时,将向‘ProcessWindowFunction’提供聚合结果。这允许它在访问“ProcessWindowFunction”的附加窗口元信息的同时,增量地计算窗口。

Note You can also the legacy WindowFunction instead of ProcessWindowFunction for incremental window aggregation.
注意,您也可以将遗留的W indow Function'而不是ProcessWindow Function’用于增量窗口聚合。

Incremental Window Aggregation with ReduceFunction

Incremental Window Aggregation with ReduceFunction 具有缩减功能的增量窗口聚合

The following example shows how an incremental ReduceFunction can be combined with a ProcessWindowFunction to return the smallest event in a window along with the start time of the window.
下面的示例显示了如何将增量“还原函数”与“processwindowfunction”结合,以将窗口中的最小事件与窗口的开始时间一起返回。

  1. DataStream<SensorReading> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
  6. // Function definitions
  7. private static class MyReduceFunction implements ReduceFunction<SensorReading> {
  8. public SensorReading reduce(SensorReading r1, SensorReading r2) {
  9. return r1.value() > r2.value() ? r2 : r1;
  10. }
  11. }
  12. private static class MyProcessWindowFunction
  13. extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
  14. public void process(String key,
  15. Context context,
  16. Iterable<SensorReading> minReadings,
  17. Collector<Tuple2<Long, SensorReading>> out) {
  18. SensorReading min = minReadings.iterator().next();
  19. out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
  20. }
  21. }
  1. val input: DataStream[SensorReading] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .reduce(
  6. (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
  7. ( key: String,
  8. window: TimeWindow,
  9. minReadings: Iterable[SensorReading],
  10. out: Collector[(Long, SensorReading)] ) =>
  11. {
  12. val min = minReadings.iterator.next()
  13. out.collect((window.getStart, min))
  14. }
  15. )

Incremental Window Aggregation with AggregateFunction

Incremental Window Aggregation with AggregateFunction 具有AggregateFunction的增量窗口聚合

The following example shows how an incremental AggregateFunction can be combined with a ProcessWindowFunction to compute the average and also emit the key and window along with the average.
下面的示例说明如何将增量的Aggregate Function'与ProcessWindow Function’结合起来计算平均值,并同时发出密钥和窗口以及平均值。

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
  6. // Function definitions
  7. /**
  8. * The accumulator is used to keep a running sum and a count. The {@code getResult} method
  9. * computes the average.
  10. */
  11. private static class AverageAggregate
  12. implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  13. @Override
  14. public Tuple2<Long, Long> createAccumulator() {
  15. return new Tuple2<>(0L, 0L);
  16. }
  17. @Override
  18. public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
  19. return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  20. }
  21. @Override
  22. public Double getResult(Tuple2<Long, Long> accumulator) {
  23. return ((double) accumulator.f0) / accumulator.f1;
  24. }
  25. @Override
  26. public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
  27. return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  28. }
  29. }
  30. private static class MyProcessWindowFunction
  31. extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
  32. public void process(String key,
  33. Context context,
  34. Iterable<Double> averages,
  35. Collector<Tuple2<String, Double>> out) {
  36. Double average = averages.iterator().next();
  37. out.collect(new Tuple2<>(key, average));
  38. }
  39. }
  1. val input: DataStream[(String, Long)] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
  6. // Function definitions
  7. /**
  8. * The accumulator is used to keep a running sum and a count. The [getResult] method
  9. * computes the average.
  10. */
  11. class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  12. override def createAccumulator() = (0L, 0L)
  13. override def add(value: (String, Long), accumulator: (Long, Long)) =
  14. (accumulator._1 + value._2, accumulator._2 + 1L)
  15. override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
  16. override def merge(a: (Long, Long), b: (Long, Long)) =
  17. (a._1 + b._1, a._2 + b._2)
  18. }
  19. class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
  20. def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
  21. val average = averages.iterator.next()
  22. out.collect((key, average))
  23. }
  24. }

Incremental Window Aggregation with FoldFunction

Incremental Window Aggregation with FoldFunction 带折叠功能的增量窗口聚合

The following example shows how an incremental FoldFunction can be combined with a ProcessWindowFunction to extract the number of events in the window and return also the key and end time of the window.
下面的示例演示如何将增量的“FoldFunction”与“ProcessWindowFunction”组合起来,以提取窗口中的事件数,并返回窗口的键和结束时间。

  1. DataStream<SensorReading> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
  6. // Function definitions
  7. private static class MyFoldFunction
  8. implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
  9. public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
  10. Integer cur = acc.getField(2);
  11. acc.setField(cur + 1, 2);
  12. return acc;
  13. }
  14. }
  15. private static class MyProcessWindowFunction
  16. extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
  17. public void process(String key,
  18. Context context,
  19. Iterable<Tuple3<String, Long, Integer>> counts,
  20. Collector<Tuple3<String, Long, Integer>> out) {
  21. Integer count = counts.iterator().next().getField(2);
  22. out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
  23. }
  24. }
  1. val input: DataStream[SensorReading] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .timeWindow(<duration>)
  5. .fold (
  6. ("", 0L, 0),
  7. (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
  8. ( key: String,
  9. window: TimeWindow,
  10. counts: Iterable[(String, Long, Int)],
  11. out: Collector[(String, Long, Int)] ) =>
  12. {
  13. val count = counts.iterator.next()
  14. out.collect((key, window.getEnd, count._3))
  15. }
  16. )

Using per-window state in ProcessWindowFunction

Using per-window state in ProcessWindowFunction 在ProcessWindow函数中使用每个窗口状态

In addition to accessing keyed state (as any rich function can) a ProcessWindowFunction can also use keyed state that is scoped to the window that the function is currently processing. In this context it is important to understand what the window that per-window state is referring to is. There are different “windows” involved:
除了访问键控状态(就像任何富函数一样),“ProcessWindowFunction”还可以使用键状态,该状态的作用域是函数当前正在处理的窗口。在这种情况下,了解_per-Window_state所指的窗口是什么非常重要。有不同的“窗口”涉及:

  • The window that was defined when specifying the windowed operation: This might be tumbling windows of 1 hour or sliding windows of 2 hours that slide by 1 hour.
  • 指定加窗操作时定义的窗口:这可能是1小时滑动窗口的1小时_时间窗。
  • An actual instance of a defined window for a given key: This might be time window from 12:00 to 13:00 for user-id xyz. This is based on the window definition and there will be many windows based on the number of keys that the job is currently processing and based on what time slots the events fall into.
  • 给定键的定义窗口的实际实例:对于用户id xyz_,这可能是从12:00到13:00的_time窗口。这是基于窗口定义的,并且将根据作业当前正在处理的键数以及事件属于哪些时隙来创建许多窗口。

Per-window state is tied to the latter of those two. Meaning that if we process events for 1000 different keys and events for all of them currently fall into the [12:00, 13:00) time window then there will be 1000 window instances that each have their own keyed per-window state.
每个窗口状态与这两种状态中的后者相关联。这意味着,如果我们处理1000个不同键的事件和当前所有键的事件都落入[12:00,13:00)时间窗口,那么将有1000个窗口实例,每个窗口都有自己的键。

There are two methods on the Context object that a process() invocation receives that allow access two the two types of state:
“进程()”调用接收的“Context”对象上有两种方法,允许访问两种类型的状态:

  • globalState(), which allows access to keyed state that is not scoped to a window
  • globalState(),允许访问不属于窗口的键状态
  • windowState(), which allows access to keyed state that is also scoped to the window
  • windowState(), 它允许访问也在窗口范围内的键控状态

This feature is helpful if you anticipate multiple firing for the same window, as can happen when you have late firings for data that arrives late or when you have a custom trigger that does speculative early firings. In such a case you would store information about previous firings or the number of firings in per-window state.
如果您预测同一窗口的多个激发,则此功能将有所帮助,因为当您对到达延迟的数据进行延迟或您具有不进行推测早期启动的自定义触发器时,可能会发生这种情况。在这种情况下,您将存储关于以前的FIRings的信息或每个窗口状态下的FIRings的数量。

When using windowed state it is important to also clean up that state when a window is cleared. This should happen in the clear() method.
当使用窗口状态时,重要的是在清除窗口时也要清理该状态。这应该发生在“明确()”方法中。

WindowFunction (Legacy)

WindowFunction (Legacy) 窗口功能(遗产)

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.
在可以使用“processwindowfunction”的一些地方,也可以使用“窗口函数”。这是一个较早版本的“ProcessWindowsFunction”,它提供较少的上下文信息,并且没有某些高级功能,例如每个窗口键控状态。在某一点上,将不建议使用此接口。

The signature of a WindowFunction looks as follows:
“窗口函数”的签名如下所示:

  1. public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
  2. /**
  3. * Evaluates the window and outputs none or several elements.
  4. *
  5. * @param key The key for which this window is evaluated.
  6. * @param window The window that is being evaluated.
  7. * @param input The elements in the window being evaluated.
  8. * @param out A collector for emitting elements.
  9. *
  10. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  11. */
  12. void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
  13. }
  1. trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
  2. /**
  3. * Evaluates the window and outputs none or several elements.
  4. *
  5. * @param key The key for which this window is evaluated.
  6. * @param window The window that is being evaluated.
  7. * @param input The elements in the window being evaluated.
  8. * @param out A collector for emitting elements.
  9. * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  10. */
  11. def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
  12. }

It can be used like this:
它可以这样使用:

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .apply(new MyWindowFunction());
  1. val input: DataStream[(String, Long)] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .apply(new MyWindowFunction())

Triggers

Triggers )扳机

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).
Trigger‘确定窗口(由_Window转让人形成)何时准备由_Window函数处理。每个‘WindowAssigner’都带有一个默认的Trigger‘。如果默认触发器不符合您的需要,则可以使用“触发器(.)”指定自定义触发器。

The trigger interface has five methods that allow a Trigger to react to different events:
触发器接口有五种允许“Trigger”对不同事件作出反应的方法:

  • The onElement() method is called for each element that is added to a window.
  • 对添加到窗口中的每个元素调用“OnElement()”方法。
  • The onEventTime() method is called when a registered event-time timer fires.
  • 当注册事件时间计时器触发时,将调用‘onEventTime()’方法。
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • 当注册的处理时间计时器触发时,将调用‘onProcessingTime()’方法。
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • “onmerge()”方法与有状态触发器相关,并在其相应的Windows合并时合并两个触发器的状态(例如,使用会话窗口时)。
  • Finally the clear() method performs any action needed upon removal of the corresponding window.
  • 最后,“清除()”方法执行移除相应窗口所需的任何动作。

Two things to notice about the above methods are:
关于上述方法,需要注意的两件事是:

  1. The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:
  2. 前三个决定如何通过返回一个`TriggerResult‘来处理它们的调用事件。该行动可以是下列之一:
  • CONTINUE: do nothing,
  • CONTINUE: 什么都不做,
  • FIRE: trigger the computation,
  • FIRE: 触发计算,
  • PURGE: clear the elements in the window, and
  • PURGE: 清除窗口中的元素,以及
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
  • FIRE_AND_PURGE: 触发计算,然后清除窗口中的元素。
  1. Any of these methods can be used to register processing- or event-time timers for future actions.
  2. 这些方法中的任何一种都可用于为将来的操作注册处理或事件-时间计时器。

Fire and Purge

Fire and Purge 火灾和吹扫

Once a trigger determines that a window is ready for processing, it fires, i.e., it returns FIRE or FIRE_AND_PURGE. This is the signal for the window operator to emit the result of the current window. Given a window with a ProcessWindowFunction all elements are passed to the ProcessWindowFunction (possibly after passing them to an evictor). Windows with ReduceFunction, AggregateFunction, or FoldFunction simply emit their eagerly aggregated result.
一旦触发器确定窗口准备好进行处理,它将触发,_即,它返回“火”或“fire_and_purge”。这是窗口操作员发出当前窗口结果的信号。给定具有“processwindowfunction”窗口的窗口,所有元素都会传递到“processwindowfunction”(可能在将它们传递给evictor后)。具有“还原功能”、“AggregateFunction”或“FoldFunction”的窗口简单地发出它们的热切聚集的结果。

When a trigger fires, it can either FIRE or FIRE_AND_PURGE. While FIRE keeps the contents of the window, FIRE_AND_PURGE removes its content. By default, the pre-implemented triggers simply FIRE without purging the window state.
当触发器开火时,它可以是“FIRE”,也可以是“FIRE_和_PURGE”。“FIRE”保留窗口的内容,而“FIRE_AND_PURGE”则删除其内容。默认情况下,预实现的触发器只是‘FIRE’,而不清除窗口状态。

Attention Purging will simply remove the contents of the window and will leave any potential meta-information about the window and any trigger state intact.
注意清除将简单地删除窗口的内容,并将保留任何关于窗口和任何触发状态的潜在元信息。

Default Triggers of WindowAssigners

Default Triggers of WindowAssigners Windows分配程序的默认触发器

The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.
“windowassigner”的默认“触发器”适用于许多用例。例如,所有事件-时间窗口分配器都具有“EventTimeTrigger”作为默认触发器。一旦水印通过窗口的末尾,该触发器就会简单地触发。

Attention The default trigger of the GlobalWindow is the NeverTrigger which does never fire. Consequently, you always have to define a custom trigger when using a GlobalWindow.
注意,GlobalWindow'的默认触发器是Never Trigger’,它从不触发。因此,在使用“GlobalWindow”时,您总是必须定义自定义触发器。

Attention By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner. For example, if you specify a CountTrigger for TumblingEventTimeWindows you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count.
请注意,通过使用“触发器()”指定触发器,您将覆盖“WindowAssigner”的默认触发器。例如,如果您为“TumblingEventTimeWindows”指定了一个“CountTrigger”,那么您将不再根据时间的进展而只通过计数获得窗口触发。现在,您必须编写自己的自定义触发器,如果您想要基于时间和计数的反应。

Built-in and Custom Triggers

Built-in and Custom Triggers 内置和自定义触发器

Flink comes with a few built-in triggers.
Flink附带了一些内置触发器。

  • The (already mentioned) EventTimeTrigger fires based on the progress of event-time as measured by watermarks.
  • (已经提到)“事件时间触发”火灾是基于由水印测量的事件时间的进度。
  • The ProcessingTimeTrigger fires based on processing time.
  • 基于处理时间的“处理时间触发”火灾。
  • The CountTrigger fires once the number of elements in a window exceeds the given limit.
  • 一旦窗口中的元素数超过给定的限制,“CountTrigger”就会触发。
  • The PurgingTrigger takes as argument another trigger and transforms it into a purging one.
  • ‘PurgingTrigger’使用另一个触发器作为参数,并将其转换为清除触发器。

If you need to implement a custom trigger, you should check out the abstract Trigger class. Please note that the API is still evolving and might change in future versions of Flink.
如果需要实现自定义触发器,则应该检查抽象[触发器](https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java)类。请注意,API仍在不断发展,并可能在未来版本的Flink中发生变化。

Evictors

Evictors 驱逐者

Flink’s windowing model allows specifying an optional Evictor in addition to the WindowAssigner and the Trigger. This can be done using the evictor(...) method (shown in the beginning of this document). The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:
Flink的窗口模型允许除了“WindowAssigner”和“Trigger”之外,指定一个可选的“Evictor”。这可以使用“驱逐者(.)”方法(如本文件开头所示)来完成。驱逐者能够从WindowAfter触发器激发和_PRECT和/或WORK_WORK函数中删除元素。为此,“Evictor”接口有两种方法:

  1. /**
  2. * Optionally evicts elements. Called before windowing function.
  3. *
  4. * @param elements The elements currently in the pane.
  5. * @param size The current number of elements in the pane.
  6. * @param window The {@link Window}
  7. * @param evictorContext The context for the Evictor
  8. */
  9. void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
  10. /**
  11. * Optionally evicts elements. Called after windowing function.
  12. *
  13. * @param elements The elements currently in the pane.
  14. * @param size The current number of elements in the pane.
  15. * @param window The {@link Window}
  16. * @param evictorContext The context for the Evictor
  17. */
  18. void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

The evictBefore() contains the eviction logic to be applied before the window function, while the evictAfter() contains the one to be applied after the window function. Elements evicted before the application of the window function will not be processed by it.
“驱逐”()“”包含要在窗口函数之前应用的驱逐逻辑,而“驱逐者”()“”包含在窗口函数之后应用的驱逐逻辑。在窗口函数的应用之前被驱逐的元素将不会被处理。

Flink comes with three pre-implemented evictors. These are:
FLINK来自三个预先实施的驱逐者。这些是:

  • CountEvictor: keeps up to a user-specified number of elements from the window and discards the remaining ones from the beginning of the window buffer.
  • CountEvictor: 保持用户指定的窗口元素数量,并从窗口缓冲区开始丢弃其余元素。
  • DeltaEvictor: takes a DeltaFunction and a threshold, computes the delta between the last element in the window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold.
  • DeltaEvictor: 接受一个‘DeltaFunction’和一个‘阈值’,计算窗口缓冲区中最后一个元素与其余每个元素之间的增量,并删除大于或等于阈值的增量。
  • TimeEvictor: takes as argument an interval in milliseconds and for a given window, it finds the maximum timestamp max_ts among its elements and removes all the elements with timestamps smaller than max_ts - interval.
  • TimeEvictor: 以毫秒为单位作为参数,对于给定的窗口,它会在元素中找到最大时间戳‘Max_s’,并删除所有时间戳小于‘Max_ts-Interval’的元素。

Default By default, all the pre-implemented evictors apply their logic before the window function.
默认情况下,所有预实现的驱逐者都在窗口函数之前应用它们的逻辑。

Attention Specifying an evictor prevents any pre-aggregation, as all the elements of a window have to be passed to the evictor before applying the computation.
注意,指定驱逐者可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须传递给驱逐者。

Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last.
注意Flink不能保证窗口中元素的顺序。这意味着,虽然驱逐者可以从窗口开始删除元素,但这些要素不一定是最先或最后到达的。

Allowed Lateness

Allowed Lateness 所有的拉丁人

When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See event time and especially late elements for a more thorough discussion of how Flink deals with event time.
在使用_Event-time_windowing时,可能会发生元素晚到达的情况,即Flink用于跟踪事件时间进展的水印已经超过元素所属窗口的结束时间戳。有关Flink如何处理事件时间的更详细讨论,请参见Event time](ci.apache.org_projects_flink_flink-docs-release-1.7_dev_event_time.html),特别是晚期elements)]。

By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped, and its default value is 0. Elements that arrive after the watermark has passed the end of the window but before it passes the end of the window plus the allowed lateness, are still added to the window. Depending on the trigger used, a late but not dropped element may cause the window to fire again. This is the case for the EventTimeTrigger.
默认情况下,当水印超过窗口结束时,延迟元素将被删除。但是,Flink允许为窗口运算符指定最大允许的延迟性。允许延迟指定元素在被删除之前可以延迟多久,其默认值为0。在水印通过窗口结束但经过窗口结束之前加上允许的延迟后到达的元素仍然添加到窗口中。根据使用的触发器,延迟但不删除的元素可能会再次触发窗口。“EventTimeTrigger”就是这种情况。

In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also described in the Window Lifecycle section.
为了使这一工作,Flink保持状态的窗口,直到他们允许的迟到过期。一旦发生这种情况,Flink将删除该窗口并删除其状态,正如Window生命周期部分所描述的那样。

Default By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.
默认情况下,允许的延迟设置为“0”。也就是说,到达水印后面的元素将被丢弃。

You can specify an allowed lateness like this:
您可以指定如下所允许的延迟:

  1. DataStream<T> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .allowedLateness(<time>)
  6. .<windowed transformation>(<window function>);
  1. val input: DataStream[T] = ...
  2. input
  3. .keyBy(<key selector>)
  4. .window(<window assigner>)
  5. .allowedLateness(<time>)
  6. .<windowed transformation>(<window function>)

Note When using the GlobalWindows window assigner no data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE.
注意:在使用“GlobalWindows”窗口分配程序时,没有任何数据会被延迟考虑,因为全局窗口的结束时间戳是‘Long.MAX_VALUE’。

Getting late data as a side output

Getting late data as a side output 获取延迟数据作为侧输出

Using Flink’s side output feature you can get a stream of the data that was discarded as late.
使用flink的边输出功能,您可以获取被丢弃的数据的流。

You first need to specify that you want to get late data using sideOutputLateData(OutputTag) on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation:
首先,您需要指定要使用窗口流上的“侧OutputLateData(OutputTag)”获取延迟数据。然后,您可以获得窗口操作的结果的侧输出流:

  1. final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
  2. DataStream<T> input = ...;
  3. SingleOutputStreamOperator<T> result = input
  4. .keyBy(<key selector>)
  5. .window(<window assigner>)
  6. .allowedLateness(<time>)
  7. .sideOutputLateData(lateOutputTag)
  8. .<windowed transformation>(<window function>);
  9. DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
  1. val lateOutputTag = OutputTag[T](docs_1.7_"late-data")
  2. val input: DataStream[T] = ...
  3. val result = input
  4. .keyBy(<key selector>)
  5. .window(<window assigner>)
  6. .allowedLateness(<time>)
  7. .sideOutputLateData(lateOutputTag)
  8. .<windowed transformation>(<window function>)
  9. val lateStream = result.getSideOutput(lateOutputTag)

Late elements considerations

Late elements considerations 后期要素考虑

When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes the end of the window. In these cases, when a late but not dropped element arrives, it could trigger another firing for the window. These firings are called late firings, as they are triggered by late events and in contrast to the main firing which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows, as they may “bridge” the gap between two pre-existing, unmerged windows.
当指定大于0的允许的延迟时,窗口连同其内容一起被保持在水印通过窗口的结尾之后。在这些情况下,当一个延迟但未下降的元素到达时,它可以触发另一个窗口的激发。这些点火被称为“延迟点火”,因为它们是由延迟事件触发的,与“主点火”相反,该“主点火”是窗的第一次点火。在会话窗口的情况下,延迟启动会进一步导致窗口的合并,因为它们可能会桥接“两个预先存在的未合并的窗口之间的间隙”。

Attention You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.
注意,您应该知道,延迟触发所发出的元素应该被视为前一次计算的更新结果,即您的数据流将包含相同计算的多个结果。根据应用程序的不同,您需要将这些重复的结果考虑进去或去重复它们。

Working with window results

Working with window results 处理窗口结果

The result of a windowed operation is again a DataStream, no information about the windowed operations is retained in the result elements so if you want to keep meta-information about the window you have to manually encode that information in the result elements in your ProcessWindowFunction. The only relevant information that is set on the result elements is the element timestamp. This is set to the maximum allowed timestamp of the processed window, which is end timestamp - 1, since the window-end timestamp is exclusive. Note that this is true for both event-time windows and processing-time windows. i.e. after a windowed operations elements always have a timestamp, but this can be an event-time timestamp or a processing-time timestamp. For processing-time windows this has no special implications but for event-time windows this together with how watermarks interact with windows enables consecutive windowed operations with the same window sizes. We will cover this after taking a look how watermarks interact with windows.
加窗操作的结果是“数据流”,没有关于窗口操作的信息保留在结果元素中,因此如果要保留有关窗口的元信息,则必须在“processwindowfunction”中的结果元素中手动地编码该信息。在结果元素上设置的唯一相关信息是元素timestamp。这设置为已处理窗口的最大允许时间戳,它是end时间戳-1,因为窗口-结束时间戳是唯一的。请注意,这对于事件时间窗口和处理时间窗口均为真。即,在窗口操作元素总是具有时间戳之后,但这可以是事件时间戳或处理时间戳。对于处理时间窗口,这对事件时间窗口没有特别的影响,但是对于事件时间窗口,这与水印如何与Windows交互实现了具有相同窗口大小的[连续加窗操作](#连续窗口操作)。我们将在考虑水印如何与Windows交互之后覆盖这一点。

Interaction of watermarks and windows

Interaction of watermarks and windows 水印与窗口的交互

Before continuing in this section you might want to take a look at our section about event time and watermarks.
在继续本节之前,您可能想看看我们关于事件时间和watermarks.]的部分。

When watermarks arrive at the window operator this triggers two things:
当水印到达窗口运算符时,这触发了两件事:

  • the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
  • 水印触发所有窗口的计算,其中最大时间戳(即end时间戳-1)小于新水印。
  • the watermark is forwarded (as is) to downstream operations
  • 水印将(按原样)转发到下游操作。

Intuitively, a watermark “flushes” out any windows that would be considered late in downstream operations once they receive that watermark.
直观地说,水印“冲洗”任何窗口,这些窗口一旦收到水印,在下游操作中就会被认为是晚的。

Consecutive windowed operations

连续窗口操作

As mentioned before, the way the timestamp of windowed results is computed and how watermarks interact with windows allows stringing together consecutive windowed operations. This can be useful when you want to do two consecutive windowed operations where you want to use different keys but still want elements from the same upstream window to end up in the same downstream window. Consider this example:
如前所述,计算窗口结果的时间戳的方式,以及水印如何与窗口交互允许将连续的窗口操作串在一起。当您要执行两个连续的窗口操作时,如果要使用不同的键,但仍希望来自相同的上游窗口的元素在相同的下游窗口中结束,这可能是有用的。请考虑以下示例:

  1. DataStream<Integer> input = ...;
  2. DataStream<Integer> resultsPerKey = input
  3. .keyBy(<key selector>)
  4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  5. .reduce(new Summer());
  6. DataStream<Integer> globalResults = resultsPerKey
  7. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  8. .process(new TopKWindowFunction());
  1. val input: DataStream[Int] = ...
  2. val resultsPerKey = input
  3. .keyBy(<key selector>)
  4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  5. .reduce(new Summer())
  6. val globalResults = resultsPerKey
  7. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  8. .process(new TopKWindowFunction())

In this example, the results for time window [0, 5) from the first operation will also end up in time window [0, 5) in the subsequent windowed operation. This allows calculating a sum per key and then calculating the top-k elements within the same window in the second operation.
在该示例中,在随后的加窗操作中,来自第一操作的时间窗口“[0,5)”的结果也将在时间窗口“[0,5”中结束。这允许计算每个键的和,然后在第二操作中计算同一窗口内的顶部-k元素。

Useful state size considerations

Useful state size considerations 有用的状态大小考虑

Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation:
Windows可以在很长的时间内定义(例如天、周或月),因此积累了非常大的状态。在估计窗口计算的存储需求时,需要记住几条规则:

  1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

  2. flink创建它所属的每个窗口的每个元素的一个副本。给定此选项,滚动窗口保留每个元素的一个副本(元素属于一个窗口,除非它已被推迟)。相反,滑动窗口会创建多个元素,如WindowAssigners部分所述。因此,1日和1号幻灯片的滑动窗口可能不是一个好主意。v

  3. ReduceFunction, AggregateFunction, and FoldFunction can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a ProcessWindowFunction requires accumulating all elements.

  4. “Reduce Function”、“Gregate Function”和“Fold Function”可以大大减少存储需求,因为它们热切地聚合元素,每个窗口只存储一个值。相反,仅仅使用“ProcessWindow Function”就需要积累所有元素。

  5. Using an Evictor prevents any pre-aggregation, as all the elements of a window have to be passed through the evictor before applying the computation (see Evictors).

  6. 使用“驱逐者”可防止任何预聚集,因为在应用计算之前必须通过驱逐者(参见驱逐者)来传递窗口的所有元素。