在使用Flink做数据处理时,一个典型的场景是对两个流的数据进行关联。Flink DataStream API中,相关方法有3个:join, coGroupintervalJoin

Join和CoGroup

调用示例

  1. stream.join(otherStream)
  2. .where(<KeySelector>)
  3. .equalTo(<KeySelector>)
  4. .window(<WindowAssigner>)
  5. .apply(<JoinFunction>)
  6. stream.coGroup(otherStream)
  7. .where(<KeySelector>)
  8. .equalTo(<KeySelector>)
  9. .window(<WindowAssigner>)
  10. .apply(<CoGroupFunction>)

介绍

这两个都属于WindowJoin,join为在窗口内做inner join,而coGroup为在窗口内做outer join。
join为例,根据指定Window类型的不同,能达到以下效果:

Tumbling Window Join image.png
Sliding Window Join image.png
Session Window Join image.png

join后元素的时间戳为该窗口的最大时间戳。

代码

  1. public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
  2. /**
  3. * The join method, called once per joined pair of elements.
  4. */
  5. OUT join(IN1 first, IN2 second) throws Exception;
  6. }
  7. public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
  8. /**
  9. * This method must be implemented to provide a user implementation of a
  10. * coGroup. It is called for each pair of element groups where the elements share the
  11. * same key.
  12. */
  13. void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
  14. }
  • JoinFunction作用于一个窗口内,参数为两个流中按key匹配上的每一对元素,为innerjoin
  • CoGroupFunction的参数为一个窗口内,两个流中key相同的所有元素,因此即可实现innerjoin,又可实现outerjoin。

先来看一下join的代码

  1. stream.join(otherStream) -> JoinedStreams
  2. .where(<KeySelector>) -> JoinedStreams.Where
  3. .equalTo(<KeySelector>) -> JoinedStreams.EqualTo
  4. .window(<WindowAssigner>) -> JoinedStreams.WithWindow
  5. .apply(<JoinFunction/FlatJoinFunction>)
  • 核心代码在JoinedStreams.WithWindow中,别的都是简单的check和wrap
  • Join被转换成CoGroup进行处理


  1. /**
  2. * A streaming join operation is evaluated over elements in a window.
  3. *
  4. * <p>Note: Right now, the join is being evaluated in memory so you need to ensure that the number
  5. * of elements per key does not get too high. Otherwise the JVM might crash.
  6. */
  7. @Public
  8. public class JoinedStreams<T1, T2> {
  9. @Public
  10. public static class WithWindow<T1, T2, KEY, W extends Window> {
  11. private final DataStream<T1> input1;
  12. private final DataStream<T2> input2;
  13. private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;
  14. public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
  15. // clean the closure
  16. function = input1.getExecutionEnvironment().clean(function);
  17. // Join被转换成CoGroup操作
  18. coGroupedWindowedStream = input1.coGroup(input2)
  19. .where(keySelector1)
  20. .equalTo(keySelector2)
  21. .window(windowAssigner)
  22. .trigger(trigger)
  23. .evictor(evictor)
  24. .allowedLateness(allowedLateness);
  25. // 用户传入的JoinFunction被封装成JoinCoGroupFunction
  26. return coGroupedWindowedStream
  27. .apply(new JoinCoGroupFunction<>(function), resultType);
  28. }
  29. }
  30. }
  1. /**
  2. * A streaming join operation is evaluated over elements in a window.
  3. *
  4. * <p>Note: Right now, the join is being evaluated in memory so you need to ensure that the number
  5. * of elements per key does not get too high. Otherwise the JVM might crash.
  6. */
  7. @Public
  8. public class JoinedStreams<T1, T2> {
  9. /**
  10. * CoGroup function that does a nested-loop join to get the join result.
  11. */
  12. private static class JoinCoGroupFunction<T1, T2, T>
  13. extends WrappingFunction<JoinFunction<T1, T2, T>>
  14. implements CoGroupFunction<T1, T2, T> {
  15. public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
  16. super(wrappedFunction);
  17. }
  18. @Override
  19. public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
  20. for (T1 val1: first) {
  21. for (T2 val2: second) {
  22. out.collect(wrappedFunction.join(val1, val2));
  23. }
  24. }
  25. }
  26. }
  27. }

再看一下coGroup的代码:

Flink先将两个流union成一个流(元素类型为TaggedUnion,可标识属于左流还是右流),将DataStream变成KeyedDataStream,再套上window。

  1. @Internal
  2. public static class TaggedUnion<T1, T2> {
  3. private final T1 one;
  4. private final T2 two;
  5. }


  1. @Public
  2. public class CoGroupedStreams<T1, T2> {
  3. @Public
  4. public static class WithWindow<T1, T2, KEY, W extends Window> {
  5. private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
  6. public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
  7. //clean the closure
  8. function = input1.getExecutionEnvironment().clean(function);
  9. UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
  10. UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
  11. // 给两个流分别加上标识
  12. DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
  13. .map(new Input1Tagger<T1, T2>()) // 生成TaggedUnion(one, null)
  14. .setParallelism(input1.getParallelism())
  15. .returns(unionType);
  16. DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
  17. .map(new Input2Tagger<T1, T2>()) // 生成TaggedUnion(null, two)
  18. .setParallelism(input2.getParallelism())
  19. .returns(unionType);
  20. // 两个流合并成一个流
  21. DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
  22. // we explicitly create the keyed stream to manually pass the key type information in
  23. windowedStream =
  24. new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
  25. .window(windowAssigner);
  26. if (trigger != null) {
  27. windowedStream.trigger(trigger);
  28. }
  29. if (evictor != null) {
  30. windowedStream.evictor(evictor);
  31. }
  32. if (allowedLateness != null) {
  33. windowedStream.allowedLateness(allowedLateness);
  34. }
  35. return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
  36. }
  37. }

window被触发时,将其中的元素分开成左边一组和右边一组,再交给CoGroupFunction处理。

  1. @Public
  2. public class CoGroupedStreams<T1, T2> {
  3. private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
  4. extends WrappingFunction<CoGroupFunction<T1, T2, T>>
  5. implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
  6. public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
  7. super(userFunction);
  8. }
  9. @Override
  10. public void apply(KEY key,
  11. W window,
  12. Iterable<TaggedUnion<T1, T2>> values,
  13. Collector<T> out) throws Exception {
  14. List<T1> oneValues = new ArrayList<>();
  15. List<T2> twoValues = new ArrayList<>();
  16. // 窗口内的所有元素按左流和右流拆分开
  17. for (TaggedUnion<T1, T2> val: values) {
  18. if (val.isOne()) {
  19. oneValues.add(val.getOne());
  20. } else {
  21. twoValues.add(val.getTwo());
  22. }
  23. }
  24. wrappedFunction.coGroup(oneValues, twoValues, out);
  25. }
  26. }
  27. }


IntervalJoin

调用示例

  1. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  2. import org.apache.flink.streaming.api.windowing.time.Time;
  3. ...
  4. val orangeStream: DataStream[Integer] = ...
  5. val greenStream: DataStream[Integer] = ...
  6. orangeStream
  7. .keyBy(elem => /* select key */)
  8. .intervalJoin(greenStream.keyBy(elem => /* select key */))
  9. .between(Time.milliseconds(-2), Time.milliseconds(1))
  10. .process(new ProcessJoinFunction[Integer, Integer, String] {
  11. override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
  12. out.collect(left + "," + right);
  13. }
  14. });
  15. });

介绍

  • Window Join和CoGroup要求关联上的两个消息必须在同一个窗口内
  • 若想根据两个流中消息的相对时间来匹配,需要用Interval Join

    orangeElem.ts`` + ``lowerBound``<= ``greenElem.ts``<= ``orangeElem.ts`` + upperBound

  • Join后的元素的时间戳为两个消息时间戳中更大的一个

image.png

代码

Interval Join是基于ConnectedStreams实现的

  1. @PublicEvolving
  2. public static class IntervalJoined<IN1, IN2, KEY> {
  3. @PublicEvolving
  4. public <OUT> SingleOutputStreamOperator<OUT> process(
  5. ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
  6. TypeInformation<OUT> outputType) {
  7. Preconditions.checkNotNull(processJoinFunction);
  8. Preconditions.checkNotNull(outputType);
  9. final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
  10. final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
  11. new IntervalJoinOperator<>(
  12. lowerBound,
  13. upperBound,
  14. lowerBoundInclusive,
  15. upperBoundInclusive,
  16. left.getType().createSerializer(left.getExecutionConfig()),
  17. right.getType().createSerializer(right.getExecutionConfig()),
  18. cleanedUdf
  19. );
  20. return left
  21. .connect(right)
  22. .keyBy(keySelector1, keySelector2)
  23. .transform("Interval Join", outputType, operator);
  24. }
  25. }


  • ConnectedStreams可视为两个流的union,不要求两个流的数据类型一样
  • 通过流之间共享状态,可以在双流上实现一些复杂操作。


  1. @Public
  2. public class ConnectedStreams<IN1, IN2> {
  3. protected final DataStream<IN1> inputStream1;
  4. protected final DataStream<IN2> inputStream2;
  5. public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
  6. return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
  7. inputStream2.keyBy(keySelector2));
  8. }
  9. public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {
  10. TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
  11. coMapper,
  12. CoMapFunction.class,
  13. 0,
  14. 1,
  15. 2,
  16. TypeExtractor.NO_INDEX,
  17. getType1(),
  18. getType2(),
  19. Utils.getCallLocationName(),
  20. true);
  21. return map(coMapper, outTypeInfo);
  22. }
  23. }
  24. @Public
  25. public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
  26. OUT map1(IN1 value) throws Exception;
  27. OUT map2(IN2 value) throws Exception;
  28. }

在IntervalJoinOprater中,用两个MapState(leftBuffer, rightBuffer)分别保存两个流中到达的消息,map的key是消息的时间戳。

  1. @Internal
  2. public class IntervalJoinOperator<K, T1, T2, OUT>
  3. extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
  4. implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
  5. private final long lowerBound;
  6. private final long upperBound;
  7. private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
  8. private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
  9. private transient InternalTimerService<String> internalTimerService;
  10. @Override
  11. public void processElement1(StreamRecord<T1> record) throws Exception {
  12. processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
  13. }
  14. @Override
  15. public void processElement2(StreamRecord<T2> record) throws Exception {
  16. processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
  17. }
  18. }


  • 当左流的消息到达时,会将其加入leftBuffer中,然后到rightBuffer中找是否有符合时间范围的消息。如果有,匹配的元素就被传入用户自定义的ProcessJoinFunction中。反之亦然。
  • join上的消息对的时间戳为两个消息中更大的一个。
  • 为了避免两个state无限膨胀,每条消息会注册一个timer,在时间越过该消息的有效范围后,便将其移出state。


  1. @Internal
  2. public class IntervalJoinOperator<K, T1, T2, OUT> {
  3. @SuppressWarnings("unchecked")
  4. private <THIS, OTHER> void processElement(
  5. final StreamRecord<THIS> record,
  6. final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
  7. final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
  8. final long relativeLowerBound,
  9. final long relativeUpperBound,
  10. final boolean isLeft) throws Exception {
  11. final THIS ourValue = record.getValue();
  12. final long ourTimestamp = record.getTimestamp();
  13. if (ourTimestamp == Long.MIN_VALUE) {
  14. throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
  15. "interval stream joins need to have timestamps meaningful timestamps.");
  16. }
  17. if (isLate(ourTimestamp)) { // 若消息时间戳小于internalTimerService.currentWatermark(),丢弃
  18. return;
  19. }
  20. // 将消息加入状态
  21. addToBuffer(ourBuffer, ourValue, ourTimestamp);
  22. // 从另一个数据流的状态中查找匹配的消息
  23. for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
  24. final long timestamp = bucket.getKey();
  25. if (timestamp < ourTimestamp + relativeLowerBound ||
  26. timestamp > ourTimestamp + relativeUpperBound) {
  27. continue;
  28. }
  29. for (BufferEntry<OTHER> entry: bucket.getValue()) {
  30. if (isLeft) {
  31. collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
  32. } else {
  33. collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
  34. }
  35. }
  36. }
  37. // 注册清理消息的timer
  38. long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  39. if (isLeft) {
  40. internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
  41. } else {
  42. internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
  43. }
  44. }
  45. }