背景:2018年12月份的 Flink Forward China,阿里对外宣布将在 2019 年 1 月份开源 Blink。还特别声明 Blink 不会作为另一个开源项目维护,而是会慢慢回馈给 Flink 社区,会在未来合并到 Flink 主分支上。
在此先简单回顾一下阿里巴巴Blink 开源的部分要点:
- Blink 开源的内容主要是阿里巴巴基于开源 Flink 引擎,依托集团内部业务,在流计算和批处理上积累的大量新功能、性能优化、稳定性提升等核心代码。
- Blink 以分支的形式开源,即开源后会成为 Apache Flink项目下的一个分支。
- Blink 开源的目标不是希望成为另一个活跃的项目,而是将Flink 做的更好。通过开源的方式让大家了解所有 Blink 的实现细节,提高 Blink 功能merge进入Flink 的效率,与社区协作更高效。
by 阿里官方
当时看到这一消息,默默看完了其他 Flink 相关的教程,资料,重点是官方上的文档。就在前一个月,Flink 中文社区的网址还被 404 了…
Flink 的未来一定会成为一个卓越的大数据计算引擎。
从 Blink 作为分支开源之后到现在,时间飞快的过了大半年。我也参与了大大小小不少的 Flink Meetup,开发者的热情高涨,都在等待 Flink 1.9 的诞生。
先看一组数据
如图,Flink 1.9 解决的 issue 和 commit 次数已经达到甚至超过前两个版本之和。代码的修改的行数也是前两个版本之和的 3 倍。
那么,Flink 1.9 到底新增了什么功能呢?优化了那些东西呢?接下来,我将简单的给大家介绍一下,更深层次的后面会展开,等不及的小伙伴去官网爽一下吧。
- 架构的优化
Flink 在其分布式流式执行引擎之上,有两套相对独立的 DataStream 和 DataSet API,分别来描述流计算和批处理的作业。在这两个 API之上,则提供了一个流批统一的API,即 Table API 和SQL。用户可以使用相同的Table API 程序或者 SQL 来描述流批作业,只是在运行时需要告诉 Flink 引擎希望以流的形式运行还是以批的流式运行,此时 Table 层的优化器就会将程序优化成 DataStream 作业或者 DataSet 作业。 但是如果我们仔细查看 DataStream 和 DataSet 底层的实现细节,会发现这两个 API 共享的东西其实不多。它们有各自独立的翻译和优化的流程,而且在真正运行的时候,两者也使用了完全不同的 Task。这样的不一致对用户和开发者来讲可能存在问题。
Blink 提出的架构是这样的
DataStream API 提供给用户更多的是一种“所见即所得”的体验,由用户自行描述和编排算子的关系,引擎不会做过多的干涉和优化。而Table API & SQL 则继续保持现在的风格,提供关系表达式API,引擎会根据用户的意图来进行优化,并选择最优的执行计划。值得一提的是,以后这两个 API 都会各自同时提供流计算和批处理的功能。这两个用户 API 之下,在实现层它们都会共享相同的技术栈,比如会用统一的 DAG 数据结构来描述作业,使用统一的 StreamOperator 来编写算子逻辑,包括使用统一的流式分布式执行引擎。
TableAPI & SQL
在开源 Blink 时,Blink 的Table 模块已经使用了 Flink 未来设想的新架构。因此 Flink 1.9 版本中,Table 模块顺理成章的成为了架构调整后第一个吃螃蟹的人。但是,为了尽量不影响之前版本用户的体验,我们还是需要找到一个方式让两种架构能够并存。 基于这个目的,社区的开发人员做了一系列的努力,包括将 Table 模块进行拆分(FLIP-32,FLIP 即 Flink Improvement Proposals,专门记录一些对Flink 做较大修改的提议),对 Java 和 Scala 的 API 进行依赖梳理,并且提出了 Planner 接口以支持多种不同的 Planner 实现。Planner 将负责具体的优化和将 Table 作业翻译成执行图的工作,我们可以将原来的实现全部挪至 Flink Planner 中,然后把对接新架构的代码放在 Blink Planner里。
图中的 Query Processor 就是 Planner 的实现 这样的做法一举两得。不仅让 Table 模块在经过拆分后更加清晰,更重要的是不影响老版本用户的体验。 在 1.9 版本中,我们已经merge 了大部分当初从 Blink 开源出来的 SQL功能。这些都是近几年在阿里内部场景经过千锤百炼而沉淀出来的新功能和性能上的优化,相信能够促使Flink 更上一个台阶!
除了架构升级之外,Table 模块在 1.9 版本还做了几个相对比较大的重构和新功能,包括:
- FLIP-37:重构 Table API 类型系统
- FLIP-29:Table 增加面向多行多列操作的 API
- FLINK-10232:初步的 SQL DDL 支持
- FLIP-30:全新的统一的 Catalog API
FLIP-38:Table API 增加 Python 版本
有了这些新功能加持,再经过后续修复和完善,Flink Table API 和 SQL 在未来将会发挥越来越重要的作用。
批处理改进
Flink的批处理功能在 1.9 版本有了重大进步,在架构调整后,Flink 1.9 加入了好几项对批处理的功能改进。 首当其冲的是优化批处理的错误恢复代价:FLIP-1(Fine Grained Recovery from Task Failures),从这个 FLIP 的编号就可以看出,该优化其实很早就已经提出,1.9 版本终于有机会将 FLIP-1 中未完成的功能进行了收尾。在新版本中,如果批处理作业有错误发生,那么 Flink 首先会去计算这个错误的影响范围,即 Failover Region。因为在批处理作业中,有些节点之间可以通过网络进行Pipeline 的数据传输,但其他一些节点可以通过 Blocking 的方式先把输出数据存下来,然后下游再去读取存储的数据的方式进行数据传输。如果算子输出的数据已经完整的进行了保存,那么就没有必要把这个算子拉起重跑,这样一来就可以把错误恢复控制在一个相对较小的范围里。
如果作业极端一点,在每一个需要Shuffle 的地方都进行数据落盘,那么就和 MapReduce 以及 Spark 的行为类似了。只是 Flink 支持更高级的用法,你可以自行控制每种 Shuffle 是使用网络来直连,还是通过文件落盘来进行。 有了基于文件的Shuffle 之后,大家很容易就会联想到,是不是可以把这个 Shuffle 的实现变成插件化。没错,社区也正在朝这个方向进行改进:FLIP-31(Pluggable Shuffle Service)。比如,我们可以利用 Yarn 的 Auxliary Service 来作为一种 Shuffle 的实现,我们甚至可以去写一个分布式服务来帮助批处理任务进行Shuffle。最近,Facebook 也分享了一些这方面的工作,而且在阿里内部,我们已经使用这样的架构,支持了单作业处理数百TB 量级的规模。Flink 具备了这样的插件机制后,可以轻松的对接这些更加高效灵活的实现,让Shuffle 这个批处理的老大难问题得到较好的解决。流处理改进
流计算毕竟还是 Flink 发迹的主要领域,在 1.9 版本当然也不能忘了在这方面做一些改进。这个版本增加了一个非常实用的功能,即FLIP-43(State Processor API)。Flink 的 State 数据的访问,以及由 State 数据组成的 Savepoint 的访问一直是社区用户呼声比较高的一个功能。在 1.9 之前的版本,Flink 开发了 Queryable State,不过这个功能的使用场景比较有限,使用效果也不太理想,因此用的人一直不多。这次的 State Processor API 则提供了更加灵活的访问手段,也能够让用户完成一些比较黑科技的功能:
- 用户可以使用这个 API 事先从其他外部系统读取数据,把它们转存为 Flink Savepoint 的格式,然后让 Flink 作业从这个 Savepoint 启动。这样一来,就能避免很多冷启动的问题。
- 使用 Flink 的批处理 API 直接分析State 的数据。State 数据一直以来对用户是个黑盒,这里面存储的数据是对是错,是否有异常,用户都无从而知。有了这个 API 之后,用户就可以像分析其他数据一样,来对 State 数据进行分析。
- 脏数据订正。假如有一条脏数据污染了你的 State,用户还可以使用这个 API 对这样的问题进行修复和订正。
状态迁移。当用户修改了作业逻辑,想复用大部分原来作业的 State,但又希望做一些微调。那么就可以使用这个 API 来完成相应的工作。
上面列举的都是流计算领域非常常见的需求和问题,都有机会通过这个灵活的 API 进行解决,因此我个人非常看好这个 API 的应用前景。 说到 Savepoint,这里也提一下社区完成的另外一个实用功能,即FLIP-34(Stop with Savepoint)。大家都知道 Flink 会周期性的进行 Checkpoint,并且维护了一个全局的状态快照。假如我们碰到这种场景:用户在两个Checkpoint 周期中间主动暂停了作业,然后过一会又进行重启。这样,Flink 会自动读取上一次成功保存的全局状态快照,并开始计算上一次全局快照之后的数据。虽然这么做能保证状态数据的不多不少,但是输出到 Sink 的却已经有重复数据了。有了这个功能之后,Flink 会在暂停作业的同时做一次全局快照,并存储到Savepoint。下次启动时,会从这个 Savepoint 启动作业,这样 Sink 就不会收到预期外的重复数据了。不过,这个做法并不能解决作业在运行过程中自动Failover而引起的输出到 Sink 数据重复问题。
Hive集成
Hive一直是 Hadoop 生态中一股不可忽视的重要力量。为了更好的推广 Flink 的批处理功能,和 Hive 的集成必不可少。在 1.9 版本的开发过程中,我们也很开心迎来了两位 Apache Hive PMC 来推进 Flink 和 Hive 的集成工作。 首先要解决的是使用 Flink 读取 Hive 数据的问题。通过 FLIP-30 提出的统一的 Catalog API 的帮助,目前 Flink 已经完整打通了对 Hive Meta Store 的访问。同时,我们也增加了 Hive 的 Connector,目前已支持 CSV, Sequence File, Orc, Parquet 等格式。用户只需要配置 HMS 的访问方式,就可以使用 Flink 直接读取 Hive 的表进行操作。在此基础之上,Flink 还增加了对 Hive 自定义函数的兼容,像 UDF, UDTF和 UDAF,都可以直接运行在Flink SQL里。 在写的支持上,目前Flink 还支持的比较简单,暂时只能 INSERT INTO 一张新表。不过和 Hive 的兼容一直是社区工作中一个高优先级的事情,相信后续的版本会有持续的改善。
总结
Flink1.9.0 版本经过大半年的紧张开发,终于顺利发布。在这过程中,Flink 社区不仅迎来了相当多的中国开发者和用户,还迎来了海量的代码贡献,预示着一个良好的开端。未来,无论是功能还是生态,我们会继续在 Flink 社区加大投入,让 Flink 在整个中国乃至全世界大规模的使用起来。我们也衷心希望有更多的开发者可以加入我们,加入Flink 社区,一起把 Apache Flink 做的越来越好!