Back Pressure是流处理系统中,非常经典而常见的问题,它是让流系统能对压力变化能够呈现良好抗压性的关键点所在。各个开源实时处理系统,在中后期,都开始有对背压机制有完善的考虑和设计,基本原理一致,但是实现方式,有依赖于具体系统而各有千秋。今天我们以Spark Streaming和Flink这两个目前最流行的流处理平台为例,剖析一下Back Pressure的原理和实现技巧。
在系统架构设计中,有一个经典的排队理论,其核心的理念是:一个服务中心的服务能力是有限的,完成服务是需要一定时间的。所以为了保障服务中心的服务能正常进行,需要在外面维护一个队列,让到达的消息事件进行排队,直到服务中心完成服务,才能让下一个事件进入服务中心。
体现这种设计理念的经典设计模式之一,就是生产者-消费者模式。生产者产生事件,消费者消费事件,而随着双方数量和处理速度的变化和不均衡,生产者生产速度超过消费者的现象,经常会存在,Queue只能够缓解这种问题,不能根治,而且Queue的长度需要被妥善的设计。
为了解决这个问题,业界提出了Reactive Stream的设计模式。这是一种生产者-消费者
+迭代器
的模式。它的改进点在于,消费者(Subscriber)向生产者(Publisher),指明请求的个数,然后生产者根据该数量,向订阅者推送指定数量的消息。
根据该设计模式,在Java 9中,引入标准的4个接口(Processor[Publisher, Subscriber-Subscription],并应用于RxJava。但是在大数据流式系统中,大部分只是参考其理念,而因为分布式的原因,所以需要做更加精密而复杂的设计。
在真实的生产环境中,流式系统面对的系统压力,在波峰和波谷是完全不一样的,这个时候如果用固定的资源数,会造成很大的浪费,所以在Spark和Flink中,都会有一个动态Executor个数的模型,可以动态调节。但是就算是如此,在调整的过程中,还是会出现突然的压力过大的情况,难以避免。这个时候,如何让系统能够稳健的应对压力,就需要用到背压的概念和设计了。
从上图可以看到,一个流式系统的背压能力,其实需要从输入源开始,到最后的输出,每一个位于上游的子模块/系统,都具备根据下游信号量往下游发送指定数量的消息的能力,只有这样,整个流式系统才能完美背压,不会被系统突增的压力挤垮。
Spark Streaming是Spark的流式模块,它基于Spark Core提供了一套基于micro-batch处理的实时流式处理框架。它的基本理念是将数据流转换为DStream,再通过Spark Engine的RDD机制,进行统一处理。
上图是SparkStreaming的系统核心模块,和背压特性相关的,主要是模块3:数据的产生和导入。
基于前面的排队理论,Spark Streaming每一批次的处理时长(batch_process_time)需要小于批次间隔batch_interval,否则batch_process_time > batch_interval,程序的处理能力不足,积累的数据越来越多,最终会造成Executor的OOM。
为了避免这种情况,Spark Steaming从1.5版本开始,开始引入背压机制,第一个相关Issue是经典的SPARK-7398。其大体的思路是:
通过在Driver端进行速率估算,并将速率更新到Executor端的各个Receiver,从而实现背压
整个特性,主要由三大模块实现:
- 速率控制
- 速率估算
- 速率更新
速率控制
整个背压机制的核心,就是Drvier端的RateContoller,它作为控制核心,继承自StreamingListener,监听Batch的完成情况,记录下它们的关键延迟,然后传递给computeAndPublish方法,遍历Executor并进行估算和更新
速率估算
PIDRateEstimator是目前RateEstimator的唯一官方实现,基本上也没谁去重新实现一个,因为确实好用。PID(Proportional Integral Derivative,比例积分差分控制算法)是工控领域中,经过多次的验证是一种非常有效的工业控制器算法。Spark Streaming将它引入,作为根据最新的Rate,以及比例(Proportional) 积分(Integral)微分(Derivative)这3个变量,来确定最新的Rate,实现简洁明了,也非常好理解。
速率更新
计算完新Rate,就该把它发布出去了。RateController通过ReceiverTracker,利用RPC消息,发布Rate到Receiver所在的节点上,该节点上的ReceiverSupervisorImpl会接收消息,并把速率更新到BlockGenerator上,从而以控制每个批次的数据生成。
仔细阅读这两个类的代码,可以发现它们充分利用了Scala的特性和高性能网络通信库,非常的简洁,一点都不拖泥带水。无论是发送端的UpdateRateLimit的case class消息类构建,还是接收端的receive的偏函数特性,都充分的体现了Scala的代码之美。
同Spark Steaming类似,Flink的基本元素是Stream和Transformations,每个Streaming Dataflow由很多Stream和Operator组成,在这些Stream里,Source是数据源,Sink是数据池。最终会组成一个DAG。
实际DAG被部署到执行集群时,还要考虑并行度的影响,假设并行度是4,同时,该集群有两个TaskManager(执行工作的节点,每个节点可以执行多个任务),假设TaskManager 1执行A.1,A.2,B.1和B.2,TaskManager 2执行A.3,A.4,B.3和B.4。于是只有不在同一台机器的子任务面临节点之间的传输,以A.1,A.2到B.3,B.4为例,如上图。
每个子任务都有自己的本地缓存池,收到的数据以及发出的数据,都会序列化之后,放入到缓冲池里。然后,两个TaskManager之间,只会建立一条物理链路(底层使用Netty通讯),所有子任务之间的通讯,都由这条链路承担。
当任何一个子任务的发送缓存(不管是子任务自己的本地缓存,还是底层传输时Netty的发送缓存)耗尽时,发送方就会被阻塞,产生背压;同样,任何任务接收数据时,如果本地缓存用完了,都会停止从底层Netty那里读取数据,这样很快上游的数据很快就会占满下游的底层接收缓存,从而背压到发送端,形成对上游所有的任务的背压。
很显然,这种思路有个明显的问题,任务一个下游子任务的产生背压,都会影响整条TaskManager之间的链路,导致全链路所有子任务背压。比如上图的B.3子任务,此时还有处理能力,但也无法收到数据。
为了解决上节的单任务背压影响全链路的问题,在Flink 1.5之后,引入了Credit-based Flow Control,基于信用点的流量控制。
这种方法,首先把每个子任务的本地缓存分为两个部分,独占缓存(Exclusive Buffers)和浮动缓存(Floating Buffers);
然后,独占缓存的大小作为信用点发给数据发送方,发送方会按照不同的子任务分别记录信用点,并发送尽可能多数据给接收方,发送后则降低对应信用点的大小;
当信用点为0时,则不再发送,起到背压的作用。在发送数据的同时,发送方还会把队列中暂存排队的数据量,发给接收方,接收方收到后,根据本地缓存的大小,决定是否去浮动缓存里请求更多的缓存来加速队列的处理,起到动态控制流量的作用。整个过程参考上图。
通过这样的设计,就实现了任务级别的背压:任意一个任务产生背压,只会影响这个任务,并不会对TaskManger上的其它任务造成影响。
可以看出,StreamTask算子处理完数据后,会调用RecordWrite将数据写到相应的ResultPartition中,每个ResultPartition会被拆分成一到多个ResultSubpartition,数据实际上,是写到每个ResultSubpartition的buffers里。如图中大图RecordWriter代码。
这里有个关键的一个方法requestNewBufferBuilder,这个方法会到Task的LocalBufferPool里请求内存, 没有就会阻塞,如上小图中代码。
Flink就是通过检测每个任务线程的栈深度来实现背压检测的,如果背压了,就会出现很深的栈深度,因为要在这个方法上等待内存释放。
CreditBasedSequenceNumberingViewReader是TaskManager对应的Netty的通讯服务中的一个部分,当调用ResultSubpartition的flush方法时,实际上调用的是CreditBasedSequenceNumberingViewReader的notifyDataAvailable方法,从而通知Netty服务端的Pipline醒来进行数据读取。
将Reader加入到准备就绪状态,在这之前会先判断reader是否可用,其实是判断Credit够不够,不够就暂时无法发出;
上图为判断Credit够不够;下图为发送数据,并减少Credit。
接收端只要及时将Credit信息发送给发送端,发送端就能根据Credit数量尽快开始数据发送。从代码结构上,和发送端相反,接收端是InputGate间接的监听Netty的读取事件:当CreditBasedPartitionRequestClientHandler正常读取出数据并写入InputChannel后,通知CheckpointBarrierHandle来读取。
只要对端可写的时候,马上尽快把Credit发送过去。
事实上,作为数据的接收方,更多是接收来自发送方的大量数据,发往发送方的Credit小相对少很多,所以一般一有Credit可用,就能马上发送给发送方,保证实时性。
Spark Streaming可以对PID算法的几个变量进行调整,以适应具体的数据流量波形;也可以调整最大处理的消息数,已防止出现OOM;Flink可以调整LocalBuffer和FloatBuffer大小,来适配不同的流量波形。
Netty传输,Flink Credit优先处理;Spark Structured Streaming采用和Flink一致的编程模型,因此也有类似的背压方法。
参考文献:
- Reactive Stream 介绍: http://www.infoq.com/cn/news/2015/12/reactive-streams-introduction
- JDK9揭秘:Reactive Streams:https://www.cnblogs.com/IcanFixIt/p/7245377.html
- Spark Streaming Back-Pressure Design Doc1:点我
- Spark Streaming Back-Pressure Design Doc 2:链接
- Spark Streaming back pressure:链接
- Spark Streaming Backpressure - finding the optimal rate is now done automatically:链接
- Operating on back-pressure in Spark with a PID: 链接
- 深入理解Spark Streaming流量控制及反压机制 链接
- A Deep-Dive into Flink’s Network Stack. 链接
- Handling Overload. 链接
- https://github.com/apache/flink
- https://github.com/lw-lin/CoolplaySpark