Joining

Joining

Window Join

A window join joins the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a window assigner and are evaluated on elements from both of the streams. 窗口连接加入共享公共密钥并位于同一窗口中的两个流的元素。这些窗口可以通过使用WindowAssigner来定义,并在两个流中的元素上进行评估。

The elements from both sides are then passed to a user-defined JoinFunction or FlatJoinFunction where the user can emit results that meet the join criteria. 然后将双方的元素传递给用户定义的`JoinFunction‘或’FlatJoinFunction‘,其中用户可以发出符合联接标准的结果。

The general usage can be summarized as follows: 一般用法可总结如下:

  1. stream.join(otherStream)
  2. .where(<KeySelector>)
  3. .equalTo(<KeySelector>)
  4. .window(<WindowAssigner>)
  5. .apply(<JoinFunction>)

Some notes on semantics: 关于语义的一些注释:

  • The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don’t have a corresponding element from the other stream to be joined with.
  • 两个流的元素的成对组合的创建就像一个内部连接,这意味着如果一个流中的元素没有来自另一个流的对应元素,就不会被释放出来。
  • Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with [5, 10) as its boundaries would result in the joined elements having 9 as their timestamp.
  • 那些确实被加入的元素将以仍然位于各自窗口中的最大时间戳作为它们的时间戳。例如,以`[5,10]‘为边界的窗口将导致连接元素的时间戳为9。

In the following section we are going to give an overview over how different kinds of window joins behave using some exemplary scenarios. 在下一节中,我们将使用一些示例性场景概述不同类型的窗口联接的行为。v

Tumbling Window Join

翻滚的窗户连接起来

When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to a JoinFunction or FlatJoinFunction. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! 在执行滚动窗口连接时,所有具有公共键和通用滚动窗口的元素都以成对组合的形式连接起来,并传递给“JoinFunction”或“FlatJoinFunction”。因为这就像一个内部连接,一个流的元素在其翻滚的窗口中没有来自另一个流的元素就不会被发出!

Joining - 图1

As illustrated in the figure, we define a tumbling window with the size of 2 milliseconds, which results in windows of the form [0,1], [2,3], .... The image shows the pairwise combinations of all elements in each window which will be passed on to the JoinFunction. Note that in the tumbling window [6,7] nothing is emitted because no elements exist in the green stream to be joined with the orange elements ⑥ and ⑦. 如图所示,我们定义了一个大小为2毫秒的翻滚窗口,这导致窗体“[0,1]、[2,3]等等”的窗口。图像显示每个窗口中所有元素的成对组合,这些元素将被传递到“JoinFunction”。请注意,在翻滚窗口“[6,7]”中,没有发出任何东西,因为绿色流中没有元素与橙色元素和单位连接。

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream.join(greenStream)
  8. .where(<KeySelector>)
  9. .equalTo(<KeySelector>)
  10. .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
  11. .apply (new JoinFunction<Integer, Integer, String> (){
  12. @Override
  13. public String join(Integer first, Integer second) {
  14. return first + "," + second;
  15. }
  16. });
  1. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  2. import org.apache.flink.streaming.api.windowing.time.Time;
  3. ...
  4. val orangeStream: DataStream[Integer] = ...
  5. val greenStream: DataStream[Integer] = ...
  6. orangeStream.join(greenStream)
  7. .where(elem => /* select key */)
  8. .equalTo(elem => /* select key */)
  9. .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
  10. .apply { (e1, e2) => e1 + "," + e2 }

Sliding Window Join

滑动窗连接

When performing a sliding window join, all elements with a common key and common sliding window are joined as pairwise combinations and passed on to the JoinFunction or FlatJoinFunction. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted! Note that some elements might be joined in one sliding window but not in another! 在执行滑动窗口连接时,具有公共键和公共滑动窗口的所有元素都以成对组合的形式连接起来,并传递给“JoinFunction”或“FlatJoinFunction”。没有从当前滑动窗口中的另一个流中发出元素的一个流的元素不会被发出!请注意,一些元素可能在一个滑动窗口中连接,而不是在另一个滑动窗口中!

Joining - 图2

In this example we are using sliding windows with a size of two milliseconds and slide them by one millisecond, resulting in the sliding windows [-1, 0],[0,1],[1,2],[2,3], …. The joined elements below the x-axis are the ones that are passed to the JoinFunction for each sliding window. Here you can also see how for example the orange ② is joined with the green ③ in the window [2,3], but is not joined with anything in the window [1,2]. 在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,导致滑动窗口“[-1,0]、[0,1]、[1,2]、[2,3]等等。在X轴下方的接合元件是传递给每个滑动窗的“接合功能”的元件。在此,您还可以看到,例如橙色光与窗口“[2,3]”中的绿色“”连接,但与窗口“[1,2]”中的任何内容不连接。

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream.join(greenStream)
  8. .where(<KeySelector>)
  9. .equalTo(<KeySelector>)
  10. .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
  11. .apply (new JoinFunction<Integer, Integer, String> (){
  12. @Override
  13. public String join(Integer first, Integer second) {
  14. return first + "," + second;
  15. }
  16. });
  1. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  2. import org.apache.flink.streaming.api.windowing.time.Time;
  3. ...
  4. val orangeStream: DataStream[Integer] = ...
  5. val greenStream: DataStream[Integer] = ...
  6. orangeStream.join(greenStream)
  7. .where(elem => /* select key */)
  8. .equalTo(elem => /* select key */)
  9. .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
  10. .apply { (e1, e2) => e1 + "," + e2 }

Session Window Join

会话窗口连接

When performing a session window join, all elements with the same key that when “combined” fulfill the session criteria are joined in pairwise combinations and passed on to the JoinFunction or FlatJoinFunction. Again this performs an inner join, so if there is a session window that only contains elements from one stream, no output will be emitted! 在执行会话窗口联接时,所有具有与“组合”会话条件相同的键的元素都以成对的组合连接起来,并传递给`JoinFunction‘或’FlatJoinFunction‘。这再次执行内部连接,因此如果会话窗口只包含来自一个流的元素,则不会发出任何输出!

Joining - 图3

Here we define a session window join where each session is divided by a gap of at least 1ms. There are three sessions, and in the first two sessions the joined elements from both streams are passed to the JoinFunction. In the third session there are no elements in the green stream, so ⑧ and ⑨ are not joined! 在这里,我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隙分隔。有三个会话,在前两个会话中,来自两个流的连接元素被传递给`JoinFunction‘。在第三节中,绿色流中没有元素,所以8和9没有连接!

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream.join(greenStream)
  8. .where(<KeySelector>)
  9. .equalTo(<KeySelector>)
  10. .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
  11. .apply (new JoinFunction<Integer, Integer, String> (){
  12. @Override
  13. public String join(Integer first, Integer second) {
  14. return first + "," + second;
  15. }
  16. });
  1. import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
  2. import org.apache.flink.streaming.api.windowing.time.Time;
  3. ...
  4. val orangeStream: DataStream[Integer] = ...
  5. val greenStream: DataStream[Integer] = ...
  6. orangeStream.join(greenStream)
  7. .where(elem => /* select key */)
  8. .equalTo(elem => /* select key */)
  9. .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
  10. .apply { (e1, e2) => e1 + "," + e2 }

Interval Join

区间连接

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A. 间隔加入连接两个流的元素(我们现在将使用公共密钥调用它们A&B),并且其中流B的元素具有位于流A中的元素的时间戳的相对时间间隔中的时间戳。

This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound &lt;= b.timestamp &lt;= a.timestamp + upperBound 这也可以更正式地表示为“B.时间戳+下限;A.时间戳+上绑定”或“A.时间戳+下限&lt;=B.TimeStamp&lt;=A.TimeStamp+Upperbound”

where a and b are elements of A and B that share a common key. Both the lower and upper bound can be either negative or positive as long as as the lower bound is always smaller or equal to the upper bound. The interval join currently only performs inner joins. 其中,a和b是A和B的元素,它们共享一个公共密钥。上下界可以是负的,也可以是正的,只要下界总是较小或等于上界。Interval联接当前只执行内部连接。

When a pair of elements are passed to the ProcessJoinFunction, they will be assigned with the larger timestamp (which can be accessed via the ProcessJoinFunction.Context) of the two elements. 当一对元素被传递到“ProcessJoinFunction”时,它们将被分配较大的时间戳(可以通过两个元素的“ProcessJoinfunction.context”来访问)。

Note The interval join currently only supports event time. 注意:间隔联接当前仅支持事件时间。

Joining - 图4

In the example above, we join two streams ‘orange’ and ‘green’ with a lower bound of -2 milliseconds and an upper bound of +1 millisecond. Be default, these boundaries are inclusive, but .lowerBoundExclusive() and .upperBoundExclusive can be applied to change the behaviour. 在上面的示例中,我们将两个流“橙色”和“绿色”连接,下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界包含在内,但“.下边界()”和“.pperBoundexclusive”可用于更改行为。

Using the more formal notation again this will translate to 再次使用更正式的表示法,这将转换为

orangeElem.ts + lowerBound &lt;= greenElem.ts &lt;= orangeElem.ts + upperBound

as indicated by the triangles. 如三角形所示。

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream
  8. .keyBy(<KeySelector>)
  9. .intervalJoin(greenStream.keyBy(<KeySelector>))
  10. .between(Time.milliseconds(-2), Time.milliseconds(1))
  11. .process (new ProcessJoinFunction<Integer, Integer, String(){
  12. @Override
  13. public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
  14. out.collect(first + "," + second);
  15. }
  16. });
  1. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  2. import org.apache.flink.streaming.api.windowing.time.Time;
  3. ...
  4. val orangeStream: DataStream[Integer] = ...
  5. val greenStream: DataStream[Integer] = ...
  6. orangeStream
  7. .keyBy(elem => /* select key */)
  8. .intervalJoin(greenStream.keyBy(elem => /* select key */))
  9. .between(Time.milliseconds(-2), Time.milliseconds(1))
  10. .process(new ProcessJoinFunction[Integer, Integer, String] {
  11. override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
  12. out.collect(left + "," + right);
  13. }
  14. });
  15. });