https://www.csdn.net/tags/MtjaYgxsNTc5ODEtYmxvZwO0O0OO0O0O.html
当今生活节奏日益加快,企业面对不断增加的海量信息,其信息筛选和处理效率低下的困扰与日俱增。由于用户营销不够细化,企业 App 中许多不合时宜或不合偏好的消息推送很大程度上影响了用户体验,甚至引发了用户流失。在此背景下,友信金服公司推行全域的数据体系战略,通过打通和整合集团各个业务线数据,利用大数据、人工智能等技术构建统一的数据资产,如 ID-Mapping、用户标签等。友信金服用户画像项目正是以此为背景成立,旨在实现“数据驱动业务与运营”的集团战略。目前该系统支持日处理数据量超 10 亿,接入上百种合规数据源。
二、用户画像业务架构
用户画像系统目前为集团线上业务提供用户实时标签数据服务。为此我们的服务需要打通多种数据源,对海量的数字信息进行实时不间断的数据清洗、聚类、分析,从而将它们抽象成标签,并最终为应用方提供高质量的标签服务。在此背景下,我们设计用户画像系统的整体架构如下图所示:
整体架构分为五层:
- 接入层:接入原始数据并对其进行处理,如 Kafka、Hive、文件等。
- 计算层:选用 Flink 作为实时计算框架,对实时数据进行清洗,关联等操作。
- 存储层:对清洗完成的数据进行数据存储,我们对此进行了实时用户画像的模型分层与构建,将不同应用场景的数据分别存储在如Phoenix,HBase,HDFS,Kafka 等。
- 服务层:对外提供统一的数据查询服务,支持从底层明细数据到聚合层数据的多维计算服务。
应用层:以统一查询服务对各个业务线数据场景进行支撑。目前业务主要包含用户兴趣分、用户质量分、用户的事实信息等数据。
通过以上流程分析,我们通过三种方式来提高 Checkpointing 性能。这些方案分别是:
选择合适的 Checkpoint 存储方式
- 合理增加算子(Task)并行度
缩短算子链(Operator Chains)长度
- 选择合适的 Checkpoint 存储方式
CheckPoint 存储方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。
由官方文档可知,不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择 RocksDBStateBackend。
这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。
合理增加算子(Task)并行度
Checkpointing 需要对每个 Task 进行数据状态采集。单个 Task 状态数据越多则 Checkpointing 越慢。所以我们可以通过增加 Task 并行度,减少单个 Task 状态数据的数量来达到缩短 CheckPointing 时间的效果。
缩短算子链(Operator Chains)长度:
Flink 算子链(Operator Chains)越长,Task 也会越多,相应的状态数据也就更多,Checkpointing 也会越慢。通过缩短算子链长度,可以减少 Task 数量,从而减少系统中的状态数据总量,间接的达到优化 Checkpointing 的目的。下面展示了 Flink 算子链的合并规则:
- 上下游的并行度一致
- 下游节点的入度为 1
- 上下游节点都在同一个 Slot Group 中
- 下游节点的 Chain 策略为 ALWAYS
- 上游节点的 Chain 策略为 ALWAYS 或 HEAD
- 两个节点间数据分区方式是 Forward
- 用户没有禁用 Chain
基于以上这些规则,我们在代码层面上合并了相关度较大的一些 Task,使得平均的操作算子链长度至少缩短了 60%~70%。
Flink 背压产生过程分析及解决方案
背压产生过程分析
在 Flink 运行过程中,每一个操作算子都会消费一个中间 / 过渡状态的流,并对它们进行转换,然后生产一个新的流。这种机制可以类比为:Flink 使用阻塞队列作为有界的缓冲区。跟 Java 里阻塞队列一样,一旦队列达到容量上限,处理速度较慢的消费者会阻塞生产者向队列发送新的消息或事件。下图展示了 Flink 中两个操作算子之间的数据传输以及如何感知到背压的:
首先,Source 中的事件进入 Flink 并被操作算子 1 处理且被序列化到 Buffer 中,然后操作算子 2 从这个 Buffer 中读出该事件。当操作算子 2 处理能力不足的时候,操作算子 1 中的数据便无法放入 Buffer,从而形成背压。背压出现的原因可能有以下两点:
- 下游算子处理能力不足;
- 数据发生了倾斜。
背压解决方案:
实践中我们通过以下方式解决背压问题。
首先,缩短算子链会合理的合并算子,节省出资源。
其次缩短算子链也会减少 Task(线程)之间的切换、消息的序列化 / 反序列化以及数据在缓冲区的交换次数,进而提高系统的整体吞吐量。
最后,根据数据特性将不需要或者暂不需要的数据进行过滤,然后根据业务需求将数据分别处理,比如有些数据源需要实时的处理,有些数据是可以延迟的,最后通过使用 keyBy 关键字,控制 Flink 时间窗口大小,在上游算子处理逻辑中尽量合并更多数据来达到降低下游算子的处理压力。
未来工作的思考和展望
端到端的实时流处理
目前用户画像部分数据都是从 Hive 数据仓库拿到的,数据仓库本身是 T+1 模式,数据延时性较大,所以为了提高数据实时性,端到端的实时流处理很有必要。
端到端是指一端采集原始数据,另一端以报表 / 标签 / 接口的方式对这些对数进行呈现与应用,连接两端的是中间实时流。在后续的工作中,我们计划将现有的非实时数据源全部切换到实时数据源,统一经过 Kafka 和 Flink 处理后再导入到 Phoenix/JanusGraph/HBase。强制所有数据源数据进入 Kafka 的一个好处在于它能够提高整体流程的稳定性与可用性:首先 Kafka 作为下游系统的缓冲,可以避免下游系统的异常影响实时流的计算,起到“削峰填谷”的作用;其次,Flink 自 1.4 版本开始正式支持与 Kafka 的端到端精确一次处理语义,在一致性方面上更有保证。