    在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。

    Async I/O 的使用方式

    在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:

    /**

    • An implementation of the 'AsyncFunction' that sends requests and sets the callback. / class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks / lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /* The context used for the future callbacks / implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
      // issue the asynchronous request, receive a future for the result
      // 发起异步请求,返回结果是一个 Future
      val resultFutureRequested: Future[String] = client.query(str)
      // set the callback to be executed once the request by the client is complete
      // the callback simply forwards the result to the result future
      // 请求完成时的回调,将结果交给 ResultFuture
      resultFutureRequested.onSuccess {
      case result: String => resultFuture.complete(Iterable((str, result)))
      }
      } } // create the original stream val stream: DataStream[String] = … // 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量 val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

    AsyncDataStream 提供了两种调用方法,分别是 orderedWaitunorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。

    Async I/O 的实现

    AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。下面我们来看看 AsyncWaitOperator 的实现原理。


    AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator 内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。
    public class AsyncWaitOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, OperatorActions { / Queue to store the currently in-flight stream elements into. */ private transient StreamElementQueue queue; / Pending stream element which could not yet added to the queue. / private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry; private transient ExecutorService executor; /** Emitter for the completed stream element queue entries. / private transient Emitter emitter; /* Thread running the emitter. / private transient Thread emitterThread; @Override public void processElement(StreamRecord element) throws Exception { final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); //注册一个定时器,在超时时调用 timeout 方法 if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { userFunction.timeout(element.getValue(), streamRecordBufferEntry); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry> value) -> { timerFuture.cancel(true); }, executor); } //加入队列 addAsyncBufferEntry(streamRecordBufferEntry); //发送异步请求 userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } //尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞 private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { assert(Thread.holdsLock(checkpointingLock)); pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { // we wait for the emitter to notify us if the queue has space left again checkpointingLock.wait(); } pendingStreamElementQueueEntry = null; } } public class Emitter implements Runnable { @Override public void run() { try { while (running) { //从队列阻塞地获取元素 AsyncResult streamElementEntry = streamElementQueue.peekBlockingly(); output(streamElementEntry); } } }

    3. `AsyncWaitOperator` 可以工作在两种模式下,即 `ORDERED` `UNORDERED`Flink 通过 `StreamElementQueue` 的不同实现实现了这两种模式。
    ### "有序"模式
    在"有序"模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,`StreamElementQueue` 的具体是实现是 `OrderedStreamElementQueue`。`OrderedStreamElementQueue` 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。
    7. |

    public class OrderedStreamElementQueue implements StreamElementQueue { / Capacity of this queue. */ private final int capacity; / Queue for the inserted StreamElementQueueEntries. */ private final ArrayDeque> queue; @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { //只有队列头部的请求完成后才解除阻塞状态 while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } return queue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } notFull.signalAll(); return queue.poll(); } finally { lock.unlock(); } } @Override public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (queue.size() < capacity) { //未达容量上限 addEntry(streamElementQueueEntry); return true; } else { return false; } } finally { lock.unlock(); } } }

    3. <a name="923411a2"></a>
    ### "无序"模式
    在"无序"模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用"事件时间"的情况下,要保证 watermark 语义的正确性。在使用"处理时间"的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 `UnorderedStreamElementQueue` 中巧妙地实现了这两种情况。

从上图中可以看出,在 `UnorderedStreamElementQueue` 内部使用了两个队列,`ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue` 中保存未完成的异步请求计算结果,而 `completedQueue` 中保存已完成的异步请求计算结果。注意,`ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue` 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, `watermarkSet` 作为一种特殊的集合,其内部只有一个元素,即 `Watermark`,充当了不同散列集合之间的分界。这样就保证了在一个 `Watermark` 之后的异步请求的计算结果不会先于该 `Watermark` 之前进行提交。`firstSet` 中完成异步请求的计算结果会被转移到 `completedQueue` 队列中,`firstSet` 内部的所有异步请求的计算结果都是可以乱序提交的。

如果不使用"事件时间",那么没有 `Watermark` 产生,所有的异步请求都会进入 `firstSet` 中,因而所有的结果都是乱序提交的。

具体代码实现逻辑如下,结合上面的示意图应该不难理解。
    6. |

    public class UnorderedStreamElementQueue implements StreamElementQueue { / Queue of uncompleted stream element queue entries segmented by watermarks. */ private final ArrayDeque>> uncompletedQueue; / Queue of completed stream element queue entries. / private final ArrayDeque> completedQueue; /** First (chronologically oldest) uncompleted set of stream element queue entries. / private Set> firstSet; // Last (chronologically youngest) uncompleted set of stream element queue entries. New // stream element queue entries are inserted into this set. private Set> lastSet; @Override public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (numberEntries < capacity) { addEntry(streamElementQueueEntry); return true; } else { return false; } } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { //等待 completedQueue 中的元素 while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } numberEntries—; notFull.signalAll(); return completedQueue.poll(); } finally { lock.unlock(); } } //异步请求完成的回调 public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { //如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中 if (firstSet.remove(streamElementQueueEntry)) { completedQueue.offer(streamElementQueueEntry); while (firstSet.isEmpty() && firstSet != lastSet) { //如果firset中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet firstSet = uncompletedQueue.poll(); Iterator> it = firstSet.iterator(); while (it.hasNext()) { StreamElementQueueEntry<?> bufferEntry = it.next(); if (bufferEntry.isDone()) { completedQueue.offer(bufferEntry); it.remove(); } } } hasCompletedEntries.signalAll(); } } finally { lock.unlock(); } } private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); if (streamElementQueueEntry.isWatermark()) { //如果是watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中 lastSet = new HashSet<>(capacity); if (firstSet.isEmpty()) { firstSet.add(streamElementQueueEntry); } else { Set> watermarkSet = new HashSet<>(1); watermarkSet.add(streamElementQueueEntry); uncompletedQueue.offer(watermarkSet); } uncompletedQueue.offer(lastSet); } else { //正常记录,加入lastSet中 lastSet.add(streamElementQueueEntry); } //设置异步请求完成后的回调 streamElementQueueEntry.onComplete( (StreamElementQueueEntry value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { } catch (Throwable t) { operatorActions.failOperator(new Exception(“Could not complete the “ + “stream element queue entry: “ + value + ‘.’, t)); } }, executor); numberEntries++; } }

    ## 容错

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
    6. |

    public class AsyncWaitOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator, OperatorActions { /* Recovered input stream elements. / private transient ListState recoveredStreamElements; @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); } @Override public void open() throws Exception { super.open();

    1. //......
    2. // 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
    3. if (recoveredStreamElements != null) {
    4. for (StreamElement element : recoveredStreamElements.get()) {
    5. if (element.isRecord()) {
    6. processElement(element.<IN>asRecord());
    7. }
    8. else if (element.isWatermark()) {
    9. processWatermark(element.asWatermark());
    10. }
    11. else if (element.isLatencyMarker()) {
    12. processLatencyMarker(element.asLatencyMarker());
    13. }
    14. else {
    15. throw new IllegalStateException("Unknown record type " + element.getClass() +
    16. " encountered while opening the operator.");
    17. }
    18. }
    19. recoveredStreamElements = null;
    20. }
    21. }
    22. @Override
    23. public void snapshotState(StateSnapshotContext context) throws Exception {
    24. super.snapshotState(context);
    25. //先清除状态
    26. ListState<StreamElement> partitionableState =
    27. getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
    28. partitionableState.clear();
    29. //将所有未完成处理请求对应的消息加入状态中
    30. Collection<StreamElementQueueEntry<?>> values = queue.values();
    31. try {
    32. for (StreamElementQueueEntry<?> value : values) {
    33. partitionableState.add(value.getStreamElement());
    34. }
    35. // add the pending stream element queue entry if the stream element queue is currently full
    36. if (pendingStreamElementQueueEntry != null) {
    37. partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
    38. }
    39. } catch (Exception e) {
    40. partitionableState.clear();
    41. throw new Exception("Could not add stream element queue entries to operator state " +
    42. "backend of operator " + getOperatorName() + '.', e);
    43. }
    44. }

    } ``` | | —- |


    在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。

