• 有关异步I / O实用程序的设计和实现的详细信息,可以在投标和设计文档FLIP-12
  • Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.2版本引入。
  • 主要目的:是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

image.png
图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。

1. 瓶颈


与数据库的异步交互意味着单个并行函数实例可以同时处理许多请求同时接收响应。这样,等待时间可以与发送其他请求接收响应重叠。至少,等待时间将分摊到多个请求中。在大多数情况下,这导致更高的流吞吐量。

在某些情况下,也可以仅通过将MapFunction缩放到很高的并行度来提高吞吐量,但是通常会付出很高的资源成本:拥有更多并行的MapFunction实例意味着更多的任务,线程,Flink内部网络连接,与数据库的网络连接,缓冲区和一般内部资源开销。

  • 如果客户端 不支持 异步请求 (很少但不排除)

    在没有这样的客户端的情况下,可以尝试通过创建多个客户端并使用线程池处理同步调用,将同步客户端转变为有限的并发客户端。但是,这种方法通常不如适当的异步客户端有效。


2. 使用步骤

假设其中一个具有目标数据库的异步客户端,则需要三个部分来对数据库执行具有异步I / O的流转换:

  1. 实现用来分发请求的AsyncFunction   
  2. 获取操作结果的callback,并将它提交到AsyncCollector中   
  3. 将异步I/O操作转换成DataStream

其中的两个重要的参数:

  • Timeout: 超时定义异步请求在被视为失败之前可能需要花费多长时间。此参数防止无效/失败的请求。
  • Capacity: 超过限制的并发请求数量会产生背压。

    此参数定义了同时可能正在进行多少个异步请求。即使异步I / O方法通常可以带来更好的吞吐量,但操作员仍然可能成为流式应用程序的瓶颈。限制并发请求的数量可确保Operator 不会积累未决请求的不断增长的积压,但是一旦容量用尽,它将触发背压。

使用时 注意:

  • 使用Async I/O,需要外部存储有支持异步请求的客户端。
  • 使用Async I/O,继承RichAsyncFunction(接口AsyncFunction的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可。
  • 使用Async I/O, 最好结合缓存一起使用,可减少请求外部存储的次数,提高效率。
  • Async I/O 提供了Timeout参数来控制请求最长等待时间。默认,异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
  • Async I/O 提供了Capacity参数控制请求并发数,一旦Capacity被耗尽,会触发反压机制来抑制上游数据的摄入。
  • Async I/O 输出提供乱序和顺序两种模式:

    • 无序:

      异步请求完成后立即发出结果记录。在异步I / O运算符之后,流中记录的顺序与之前不同。当将处理时间用作基本时间特征时,此模式具有最低的延迟和最低的开销。在此模式下使用AsyncDataStream.**unorderedWait**(…)。

    • 有序:

      在这种情况下,将保留流顺序。结果记录的发射顺序与异步请求的触发顺序相同(Operator 输入记录的顺序)。为此,Operator **对结果记录进行缓冲直到发出其所有先前的记录或将其超时)为止。这通常会带来一些额外的延迟和检查点的开销,因为与无序模式相比,记录或结果在检查点状态下的保存时间更长。在此模式下使用AsyncDataStream.orderedWait**(…)。Event Time


基于 Watermark 事件发送

  • 无序:

    水印不会超过记录,反之亦然,这意味着水印会建立顺序边界记录仅在水印之间无序发出。在某个水印**之后发生的记录仅在该水印被发出之后才被发出。反过来,仅在发出水印之前输入的所有结果记录之后才发出水印。 这意味着在存在水印的情况下,无序模式引入一些与有序模式相同的延迟和管理开销**。该开销的量取决于水印频率。

  • 有序:

    保留记录的水印顺序,就像保留记录之间的顺序一样。与处理时间相比,开销没有明显变化。


原理实现

AsyncDataStream.(un)orderedWait 的主要工作就是创建了一个 AsyncWaitOperatorAsyncWaitOperator是支持异步 IO 访问的算子实现,该算子会运行 AsyncFunction并处理异步返回的结果,其内部原理如下图所示。每个AsyncWaitOperator都由三个主要的部分组成

  • AsyncFunction:执行异步操作的函数,用户需要覆写其asyncInvoke()方法并传入。
  • StreamElementQueue:包含StreamElementQueueEntry的队列,底层由ArrayDeque实现。
  • Emitter:单独的守护线程,将异步调用完成后的结果发送给下游算子。

image.pngimage.png
如图所示,AsyncWaitOperator 主要由两部分组成:StreamElementQueue 和 Emitter。StreamElementQueue 是一个 Promise 队列,所谓 Promise 是一种异步抽象表示将来会有一个值,这个队列是未完成的 Promise 队列,也就是进行中的请求队列。Emitter 是一个单独的线程,负责发送消息(收到的异步回复)给下游。
图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个 “Promise” P5,然后将P5放入队列。最后调用 AsyncFunction 的 ayncInvoke 方法,该方法会向外部服务发起一个异步的请求,并注册回调。该回调会在异步请求成功返回时调用 AsyncCollector.collect 方法将返回的结果交给框架处理。实际上 AsyncCollector 是一个 Promise ,也就是 P5,在调用 collect 的时候会标记 Promise 为完成状态,并通知 Emitter 线程有完成的消息可以发送了。Emitter 就会从队列中拉取完成的 Promise ,并从 Promise 中取出消息发送给下游。
AsyncWaitOperator的机制可以用下面的简图来表示。
image.png

  • 来自上游的StreamElement进入AsyncWaitOperator的StreamElementQueue,并被封装成StreamElementQueueEntry。
  • AsyncWaitOperator调用传入的AsyncFunction的asyncInvoke()方法,该方法异步地与外部系统交互。
  • 异步操作完成后,由asyncInvoke()方法显式地调用ResultFuture.complete()方法,将结果返回;或者调用completeExceptionally()方法表示出现了异常。ResultFuture就是CompletableFuture的代理接口。
  • Emitter线程从StreamElementQueue中拉取那些已经完成了的StreamElementQueueEntry,并输出到下游算子。

消息顺序性

image.png

有序

有序比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括 watermark),都会包装成 Promise 并按到达顺序放入该队列。如下图所示,尽管P4的结果先返回,但并不会发送,只有 P1 (队首)的结果返回了才会触发 Emitter 拉取队首元素进行发送。
image.png

ProcessingTime 无序

ProcessingTime 无序也比较简单,因为没有 watermark,不需要协调 watermark 与消息的顺序性,所以使用两个队列就能实现,一个 uncompletedQueue 一个 completedQueue。所有新进入该算子的元素,同样的包装成 Promise 并放入 uncompletedQueue 队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该 Promise 移到 completedQueue 队列中,并通知 Emitter 消费。如下图所示:
image.png
image.png

EventTime 无序

EventTime 无序类似于有序与 ProcessingTime 无序的结合体。因为有 watermark,需要协调 watermark 与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的 Promise 变成了 Promise 集合。如果进入算子的是消息元素,则会包装成 Promise 放入队尾的集合中。如果进入算子的是 watermark,也会包装成 Promise 并放到一个独立的集合中,再将该集合加入到 uncompletedQueue 队尾,最后再创建一个空集合加到 uncompletedQueue 队尾。这样,watermark 就成了消息顺序的边界。只有处在队首的集合中的 Promise 返回了数据,才能将该 Promise 移到 completedQueue 队列中,由 Emitter 消费发往下游。只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个 watermark 之前所有的消息都已经被发送了,该 watermark 才能被发送。过程如下图所示:
image.png
image.png

容错保障 快照与恢复

异步I / O operator 提供完全的一次容错保证。它将正在进行的异步请求的记录存储在检查点中,并在从故障中恢复时恢复/重新触发请求。

分布式快照机制是为了保证状态的一致性。我们需要分析哪些状态是需要快照的,哪些是不需要的。首先,已经完成回调并且已经发往下游的元素是不需要快照的。否则,会导致重发,那就不是 exactly-once 了。而已经完成回调且未发往下游的元素,加上未完成回调的元素,就是上述队列中的所有元素。

所以快照的逻辑也非常简单,(1)清空原有的状态存储,(2)遍历队列中的所有 Promise,从中取出 StreamElement(消息或 watermark)并放入状态存储中,(3)执行快照操作。

恢复的时候,从快照中读取所有的元素全部再处理一次,当然包括之前已完成回调的元素。所以在失败恢复后,会有元素重复请求外部服务,但是每个回调的结果只会被发往下游一次。


AsyncFunction不是以多线程方式调用的。仅存在一个AsyncFunction实例,并且在流的各个分区中为每个记录依次调用它。除非asyncInvoke(…)方法快速返回并依赖于(由客户端)回调,否则它将不会导致正确的异步I / O。

  • 禁止使用阻塞

processElement

  1. /**
  2. * Add an AsyncWaitOperator. The order of output stream records may be reordered.
  3. *
  4. * @param in Input {@link DataStream}
  5. * @param func {@link AsyncFunction}
  6. * @param timeout for the asynchronous operation to complete
  7. * @param timeUnit of the given timeout
  8. * @param capacity The max number of async i/o operation that can be triggered
  9. * @param <IN> Type of input record
  10. * @param <OUT> Type of output record
  11. * @return A new {@link SingleOutputStreamOperator}.
  12. */
  13. public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
  14. DataStream<IN> in,
  15. AsyncFunction<IN, OUT> func,
  16. long timeout,
  17. TimeUnit timeUnit,
  18. int capacity) {
  19. return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
  20. }
  1. @Override
  2. public void processElement(StreamRecord<IN> element) throws Exception {
  3. final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
  4. if (timeout > 0L) {
  5. // register a timeout for this AsyncStreamRecordBufferEntry
  6. long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
  7. final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
  8. timeoutTimestamp,
  9. new ProcessingTimeCallback() {
  10. @Override
  11. public void onProcessingTime(long timestamp) throws Exception {
  12. userFunction.timeout(element.getValue(), streamRecordBufferEntry);
  13. }
  14. });
  15. // Cancel the timer once we've completed the stream record buffer entry. This will remove
  16. // the register trigger task
  17. streamRecordBufferEntry.onComplete(
  18. (StreamElementQueueEntry<Collection<OUT>> value) -> {
  19. timerFuture.cancel(true);
  20. },
  21. executor);
  22. }
  23. addAsyncBufferEntry(streamRecordBufferEntry);
  24. userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
  25. }

未处理

  • Flink 1.9 的优化
  • Flink Table 中使用 async I/O
  • 消息顺序性和一致性内部是怎么保证的

参考: