1 概述
流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在这之前,我们的程序无法发送用户b的查询请求。这是一种同步访问方式,如下图所示。
Async I/O - 图1

图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。
2 Async I/O原理实现
如下官方示例代码:

  1. // This example implements the asynchronous request and callback with Futures that have the
  2. // interface of Java 8's futures (which is the same one followed by Flink's Future)
  3. /**
  4. * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
  5. */
  6. class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
  7. /** The database specific client that can issue concurrent requests with callbacks */
  8. private transient DatabaseClient client;
  9. @Override
  10. public void open(Configuration parameters) throws Exception {
  11. client = new DatabaseClient(host, post, credentials);
  12. }
  13. @Override
  14. public void close() throws Exception {
  15. client.close();
  16. }
  17. @Override
  18. public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
  19. // issue the asynchronous request, receive a future for result
  20. final Future<String> result = client.query(key);
  21. // set the callback to be executed once the request by the client is complete
  22. // the callback simply forwards the result to the result future
  23. CompletableFuture.supplyAsync(new Supplier<String>() {
  24. @Override
  25. public String get() {
  26. try {
  27. return result.get();
  28. } catch (InterruptedException | ExecutionException e) {
  29. // Normally handled explicitly.
  30. return null;
  31. }
  32. }
  33. }).thenAccept( (String dbResult) -> {
  34. resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
  35. });
  36. }
  37. }
  38. // create the original stream
  39. DataStream<String> stream = ...;
  40. // apply the async I/O transformation
  41. DataStream<Tuple2<String, String>> resultStream =
  42. AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

AsyncDataStream.(un)orderedWait 的主要工作就是创建了一个 AsyncWaitOperator。AsyncWaitOperator 是支持异步 IO 访问的算子实现,该算子会运行 AsyncFunction 并处理异步返回的结果,其内部原理如下图所示。
Async I/O - 图2
如图所示,AsyncWaitOperator 主要由两部分组成:StreamElementQueue 和 Emitter。StreamElementQueue 是一个 Promise 队列,所谓 Promise 是一种异步抽象表示将来会有一个值,这个队列是未完成的 Promise 队列,也就是进行中的请求队列。Emitter 是一个单独的线程,负责发送消息(收到的异步回复)给下游。
图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个 “Promise” P5,然后将P5放入队列。最后调用 AsyncFunction 的 ayncInvoke 方法,该方法会向外部服务发起一个异步的请求,并注册回调。该回调会在异步请求成功返回时调用 AsyncCollector.collect 方法将返回的结果交给框架处理。实际上 AsyncCollector 是一个 Promise ,也就是 P5,在调用 collect 的时候会标记 Promise 为完成状态,并通知 Emitter 线程有完成的消息可以发送了。Emitter 就会从队列中拉取完成的 Promise ,并从 Promise 中取出消息发送给下游。
3 消息顺序性
3.1 有序
有序比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括 watermark),都会包装成 Promise 并按到达顺序放入该队列。如下图所示,尽管P4的结果先返回,但并不会发送,只有 P1 (队首)的结果返回了才会触发 Emitter 拉取队首元素进行发送。
Async I/O - 图3

3.2 无序
使用两个队列就能实现,一个 uncompletedQueue 一个 completedQueue。所有新进入该算子的元素,同样的包装成 Promise 并放入 uncompletedQueue 队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该 Promise 移到 completedQueue 队列中,并通知 Emitter 消费。如下图所示:
Async I/O - 图4

参考

简书:Flink异步I/O
https://www.jianshu.com/p/d8f99d94b761