在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。
Async I/O 的使用方式
在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:
| ``` /**
- An implementation of the ‘AsyncFunction’ that sends requests and sets the callback.
/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** The database specific client that can issue concurrent requests with callbacks /
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/* The context used for the future callbacks /
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
} } // create the original stream val stream: DataStream[String] = … // 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量 val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) ``` | | —- |// issue the asynchronous request, receive a future for the result
// 发起异步请求,返回结果是一个 Future
val resultFutureRequested: Future[String] = client.query(str)
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 请求完成时的回调,将结果交给 ResultFuture
resultFutureRequested.onSuccess {
case result: String => resultFuture.complete(Iterable((str, result)))
}
AsyncDataStream
提供了两种调用方法,分别是 orderedWait
和 unorderedWait
,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
Async I/O 的实现
AsyncDataStream
在运行时被转换为 AsyncWaitOperator
算子,它是 AbstractUdfStreamOperator
的子类。下面我们来看看 AsyncWaitOperator
的实现原理。
基本原理
AsyncWaitOperator
算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator
内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue
提供了一种队列的抽象,一个“消费者”线程 Emitter
从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。
| ```
public class AsyncWaitOperator
|
| --- |
`AsyncWaitOperator` 可以工作在两种模式下,即 `ORDERED` 和 `UNORDERED`。Flink 通过 `StreamElementQueue` 的不同实现实现了这两种模式。
<a name="3c2b8321"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-async-io/#%E6%9C%89%E5%BA%8F-%E6%A8%A1%E5%BC%8F)“有序”模式
在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,`StreamElementQueue` 的具体是实现是 `OrderedStreamElementQueue`。`OrderedStreamElementQueue` 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。<br />[![](https://cdn.nlark.com/yuque/0/2020/svg/2946520/1608088401246-f1d5a12c-b223-48ad-92ad-74e58f89407d.svg#align=left&display=inline&height=369&margin=%5Bobject%20Object%5D&originHeight=369&originWidth=769&size=0&status=done&style=none&width=769)](https://blog.jrwang.me/img/flink/OrderedStreamElementQueue.svg)
|
public class OrderedStreamElementQueue implements StreamElementQueue {
/ Capacity of this queue. */
private final int capacity;
/ Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque
|
| --- |
<a name="923411a2"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-async-io/#%E6%97%A0%E5%BA%8F-%E6%A8%A1%E5%BC%8F)“无序”模式
在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 `UnorderedStreamElementQueue` 中巧妙地实现了这两种情况。<br />[![](https://cdn.nlark.com/yuque/0/2020/svg/2946520/1608088400971-70d7c846-9592-48b1-96b4-195d69addf3d.svg#align=left&display=inline&height=678&margin=%5Bobject%20Object%5D&originHeight=678&originWidth=1046&size=0&status=done&style=none&width=1046)](https://blog.jrwang.me/img/flink/UnorderedStreamElementQueue.svg)<br />从上图中可以看出,在 `UnorderedStreamElementQueue` 内部使用了两个队列,`ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue` 中保存未完成的异步请求计算结果,而 `completedQueue` 中保存已完成的异步请求计算结果。注意,`ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue` 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, `watermarkSet` 作为一种特殊的集合,其内部只有一个元素,即 `Watermark`,充当了不同散列集合之间的分界。这样就保证了在一个 `Watermark` 之后的异步请求的计算结果不会先于该 `Watermark` 之前进行提交。`firstSet` 中完成异步请求的计算结果会被转移到 `completedQueue` 队列中,`firstSet` 内部的所有异步请求的计算结果都是可以乱序提交的。<br />如果不使用“事件时间”,那么没有 `Watermark` 产生,所有的异步请求都会进入 `firstSet` 中,因而所有的结果都是乱序提交的。<br />具体代码实现逻辑如下,结合上面的示意图应该不难理解。
|
public class UnorderedStreamElementQueue implements StreamElementQueue {
/ Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque
|
| --- |
<a name="75c106f2"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-async-io/#%E5%AE%B9%E9%94%99)容错
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
|
public class AsyncWaitOperator
//......
// 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() +
" encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
//先清除状态
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
//将所有未完成处理请求对应的消息加入状态中
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
} ``` | | —- |
小结
在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。
参考
-EOF-