本文主要介绍以下2大部分内容

  • 实时数仓概述
  • 基于flink实现典型ETL场景

作者: 买蓉 · 美团点评 / 高级技术专家
整理: 赵阳(Apache Flink China 社区志愿者)
校对: 苗浩冲(Apache Flink China 社区志愿者)

1.实时数仓的相关概述

1.1实时数仓产生背景

  1. 我们先来回顾一下数据仓库的概念。<br />![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_05.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590635716914-59bdf004-4276-4dd3-a120-58a4be537d52.png#align=left&display=inline&height=1440&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_05.png&originHeight=1440&originWidth=2560&size=243178&status=done&style=none&width=2560)<br /> 其实数据仓库是在90年代被提出的时候,更多的是与原来传统的OLTP数据库去做一个对比,所以数据仓库相关的4个核心点,我们要结合着OLTP当时的一个状态去理解。
  • 1.面向主题的:对于数据仓库来说,它与OLTP面向事务处理不同。因为数据仓库是面向分析决策的,所以它的组织方式会有一些比如说分析场景或者是分析对象等按照主题去组织,即在第1点组织方式是不同的。
  • 2.集成的:对于数据仓库来说,它经常需要去集合多个分散的、异构的数据源,然后去做一些清洗和ETL处理,整合成一块数据仓库,因此和传统OLTP也是不同的。
  • 3.相对稳定的:OLTP数据库一般都是面向业务的,它主要的作用是要去能够把当前的业务状态精准的反映出来,所以它需要做很大量的增、删、改的操作。但是对于数据仓库来说,只要是入仓的存下来的数据,大部分的场景都是查询,因此这部分数据是相对稳定的。
  • 4.反映历史变化:数据仓库是反映历史变化的数据集合,可以理解成它会将历史的一些数据的快照存下来。而对于OLTP来说,我们只要反映当时的我现在最新的一个状态就可以了。

    以上这4个点是数据仓库的一个核心的定义。我们也可以看出对比实时数据仓库来说,其传统数据仓库,也就是离线数据仓库,有一些定义可能是会被弱化,比如说在反应历史变化这一块。介绍了一下数据仓库的基本概念,另外我们还会在数据仓库建模这块会用到一些经典的建模方法,主要由范式建模、维度建模和Data Vault。在互联网大数据场景下,用的最多的是维度建模方法。

    然后先看一下离线数仓的一个经典的架构。如下图:
    【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_06.png
    数仓架构其实更多的是偏向互联网大数据的场景,上图可以看出来它有三个大的环节。
    1.数据源的部分,在互联网的数据源部分,其实我们主要有两类的数据源。

    • 第1类就是通过在客户端埋点上报,收集用户的行为日志,包括一些后端日志的这一类数据源。对于埋点收集行为日志来说,一般会经过一个这样的流程,首先会去报到nginx然后经过分我们收集,然后存储到kafka这样的消息队列,然后再由实施或者是离线的一些拉取的任务,拉取到我们的离线数据仓库hdfs
    • 第2类的话是业务数据库,而对于业务数据库的话,一般会经过canal去收集它的binlog,然后也是收集到消息队列中,最终再由Maxwell拉取到我们HDFS。

这两部分数据源最终都会落地到hdfs中的ods层,也就是我们天源数据层。
2.中间这部分和原始的数据源是保持一致的。大家可以看到中间的蓝色的框,就是我们离线数据仓库的一个重要的环节,这里可以看到它是一个分层的结构模型设计,其实它是依赖于维度建模的思路来的:

  • 最底下是ods层,基本保持原始的日志数据不变。
  • 然后在ods层之上,一般我们会进行一些统一的数据的清洗、归一。经过这样一条条的处理,就得到了dwd明细数据层。这一层也会有一些统一的维度数据。
  • 然后基于dwd明细数据层,我们会按照一些分析的场景、分析的实体等等去组织我们的数据,组织成一些分类主题的汇总数据层dws,
  • 然后在dws之上,我们会面向应用场景去做一些更贴近应用的一些APP应用数据层,这些数据的话应该是高度汇总的,然后并且能够直接导入到我们的应用服务去使用的。

在中间的离线数据仓库的生产环节,一般都是采用一些离线生产的架构引擎,比如说mapreduce、hive、spark等等,数据一般是存在hdfs上。
3.经过前两部分环节之后,我们的一些应用层的数据就会存储到数据服务里,比如说 hbase 、redis、kylin这样的一些 kv的存储。并且会针对存在这些数据存储上的一些数据,封装一些对应的服务接口,对外提供服务。在最外层我们会去产出一些比如说面向业务的报表、面向分析的数据产品,以及会支持线上的一些业务产品等等。这一层的话,称之为更贴近业务端的数据应用部分,
以上3点是一个基本的离线数仓经典架构的介绍。

  1. 大家也都体会到现在随着移动设备的普及,我们逐渐的由制造业时代过渡到了互联网时代。在制造业的时代,传统的数仓,主要是为了去支持以前的一些传统行业的企业的业务决策者、一些高管,去做一些业务决策。那个时代的业务决策周期是比较长的,同时当时的数据的话可以存在OracleDB2这一类数据库上就已经足够了。<br /> 但是随着分布式计算技术的发展、智能化技术发展、以及我们算力的提升、互联网的发展等等几大因素。我们现在在互联网上收集数据,已经呈指数级的增长。并且我们不在只依赖人去做决策,另外我们做决策的对象,很大部分转变由计算机的算法去做一些决策。比如说一些智能推荐的技术等等,这个时候我们的决策的周期,可能就由原来的天级要求到秒级,决策时间是非常短的。在场景上的话,也会面对更多的需要实时数据处理一些场景,例如实时的个性化推荐、广告的场景、一些传统企业实时的监控加工的产品是否有质量问、金融行业需要重度依赖的反作弊等等。因此在这样的一个背景下,实时数仓必须被提出来了。<br />

1.2实时数仓架构

  1. 首先跟大家介绍一下实时数仓经典架构-Lambda架构:<br />![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_08.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590648812257-2069cf30-bf78-4476-aad1-25e743b921d3.png#align=left&display=inline&height=1440&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_08.png&originHeight=1440&originWidth=2560&size=285832&status=done&style=none&width=2560)<br />这个架构是storm的作者提出来的,其实Lambda架构的主要思路是在原来离线数仓架构的基础上叠加上实时入仓的部分,然后将离线的存量数据与我们t+0的实时的数据做一个merge,就可以产生我们能够看到及时的更新了这样的一个数据结果。
  • 和上述1.1离线数据仓库架构图可以明显的看到,实时数仓增加的部分就是黄色的这块区域。这块区域我们一般会把相关的一些数据放在Kafka这样的消息队列上,也会有对维度建模的一些分层,但是在汇总数据的部分,我们不会将APP层的一些数据放在是实时数仓,而是更多的会移到数据服务侧去做一些计算。
  • 然后在实时计算的部分,我们经常会使用flink、spark-streaming和storm这样计算引擎,然后时效性上,我们由原来的天级、小时级可以提升到秒级、分钟级。

大家也可以看到从这个架构上,很直观的就是我们在数据仓库这中间的一个环节其实有两套模块,一个是离线的数据仓库的模块,还有一个是实时的数据仓库模块。我们必须要运维两套(实时计算和离线计算)引擎,并且在代码层面,我们也需要去实现实时和离线的业务代码。然后在合并的时候,我们需要保证实施和离线的数据一致性,所以但凡我们的代码做变更,我们也需要去做大量的这种实时离线数据的对比和校验。其实这对于不管是资源还是运维成本来说都是比较高的。这个是一个lamda架构上比较明显和突出的一个问题。因此就产生了kappa结构。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_09.png
kappa架构的一个主要的思路就是在数仓部分移除了离线数仓,数仓的生产全部采用实时数仓。从上图可以看到刚才中间的部分,离线数仓模块已经没有了。
熟悉实时数仓生产的同学,应该就会有一个问题。因为我们经常会面临的是业务的变更,所以我们很多业务逻辑是需要去迭代的。我们之前产出的一些数据,如果口径变更了,我们就需要去重算,甚至重刷历史数据。因此对于实时数仓来说,它怎么去解决这样的一个问题?kappa架构在这一块的思路是说:你要准备好一个能够存储历史数据的消息队列,比如说Kafka,并且这个消息对列是可以支持你从某个历史的节点重新开始消费的。 接着如果你有新的这种重刷重算的需求的时候,新起一个任务,然后去从原来的比较早的一个时间节点去消费你Kafka上的数据,然后当你新的任务运行的进度已经能够和现在的正在跑的任务齐平的时候,你就可以把现在任务的下游切换到新的任务上面,现在的任务就可以停掉,然后并且现在产出的结果表也可以被删掉。

然而随着我们现在实时olap技术的一些来提升,有一个新的是olap变体被提了出来,这个思路其实说我们可以把大量的聚合、分析、计算,由实时olap引擎来去承担。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_10.png

在实时计算的部分,我们不需要做的特别重,尤其是聚合相关的一些逻辑,然后这样就可以保障我们在数据应用层可以去灵活的面对各种业务分析的做一些需求的变更,整个架构变得灵活起来。

最后我们来整体对比一下,实时入仓的这几个架构:
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_11.png

  • 从计算引擎角度:lamda架构它需要去维护批流两套计算引擎,kappa结构和实时olap变体只需要维护流计算引擎就好了。
  • 开发成本:对lamda架构来说,因为它需要维护实时离线两套代码,所以它的开发成本会高一些。 kappa结构和实时olap变体只用维护一套代码就可以了。
  • 分析灵活性:,实时olap变体这套方式是比较高的。
  • 在实时olap引擎依赖上:我们可以知道第3个方法它是强依赖实时olap变体引擎的能力的,
  • 计算资源:lamda的架构需要批了两套计算资源,然后kappa开发架构只需要流计算资源,然后实时olap变体需要额外的olap资源,
  • 逻辑变更重算:lamda架构是通过批处理来去重算的,然后kappa架构还需要按照刚才的方式去重新消费消息队列重算,然后实时olap变体这块的话也需要重新消费消息队列,并且这个数据还要重新导入到olap引擎里,去做计算。

这是整体三个关于实时数仓架构的一个对比

1.3传统数仓vs实时数仓

  1. 然后我们来看一下传统数仓和实时数仓整体的一个差。![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_12.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590649033142-c58ffc9c-a932-46f7-b5da-6f458c56d9bc.png#align=left&display=inline&height=387&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_12.png&originHeight=1440&originWidth=2560&size=243963&status=done&style=none&width=688)
  1. 首先从时效性来看:离线数仓是支持小时级和天级的,实时数仓到秒级分钟级,所以实时数仓时效性是非常高的。
  2. 在数据存储方式来看:离线数仓它需要存在hdfs和rds上面,实时数仓一般是存在消息队列,还有一些kv存储,像维度数据的话会更多的存在这种kv存储上。
  3. 在生产加工过程中,离线数仓是需要去依赖离线计算引擎以及离线的调度。 但对于实时数仓来说,主要是依赖实时计算。

    2.基于flink实现典型的ETL场景

    这里我们主要讲述两方面:维表join和双流join
    维表join

    • 预加载维表
    • 热存储关联
    • 广播维表
    • Temporal table function join

      双流join

    • 离线join vs. 实时join

    • Regular join
    • Interval join
    • Window join

      2.1维表join

      2.1.1 预加载维表

      方案1:

      就是将维表全量的区域加载到内存里去做关联,具体的实现方式就是我们定义一个类,去实现RichFlatMapFunction,然后在open函数中去读取维度数据库,然后将数据全量的加载到内存,然后在probe流上使用算子 ,运行时与内存维度数据做关联。
      这个方式的话,它的优点就是实现起来比较简单,然后缺点也比较明显,因为我们要把每个维度数据都加载到内存里面,所以它只支持少量的维度数据。然后同时如果我们要去更新维表的话,还需要重启作业,所以它在维度数据的更新方面代价是有点高的,而且会造成有一段的时间的延迟。对于预加载维表来说,它适用的场景就是小维表,然后变更频率诉求不是很高,然后对于变更的及时性的要求也比较低的这种场景。

      接下来我们看一个简单的代码的示例:
      【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_16.png
      就是在这段代码的话,截取的是关键的一个片段,然后我们定义了一个DimFlatMapFunction来去实现RichFlatMapFunction。然后其中有一个map类型的dim,其实就是为了之后在读取db的维度数据以后,可以用于存放我们的维度数据,然后在open函数里面我们需要去连接我们的db,进而获取db里的数据。然后在下面可以看到我们的场景是从一个商品表里面去取出商品的ID、商品的名字。然后我们在获取到 db里面的维度数据里面,数据以后会把它存放到dim里面。
      接下来在flatMap里面我们就会使用到dim,我们在获取了 probe流的数据以后,我们会去dim里面比较。是否含有同样的商品ID的数据,如果有的话就把相关的商品名称append到数据元组,然后做一个输出。这就是一个基本的流程。
      其实这是一个基本最出版的方案实现。但这个方案也有一个改进的方式,就是在open函数里面,可以新建一个线程,定时的去加载维表。这样的话我们就不需要人工的去重启job,来去让我们的维度数据做更新,可以实现一个周期性的维度数据的更新。

      方案2:

      通过 Distributed cash的机制去分发本地的维度文件到task manager后在加载到内存做关联。实现方式可以分为三步:

    • 第1步是通过env.registerCached注册文件。

    • 第2步是实现RichFunction,然后在open函数里面通过RuntimeContext来获取cache文件。
    • 第3步的话就是我们解析和使用这部分文件数据。

这种方式的一个优点就是你不需要去准备或者依赖外部数据库,缺点就是因为数据也是要加载到内存中的,所以支持的维表数据量也是比较小。而且如果这个维度数据去做更新的话,也需要去重启作业。 因此在正规的生产过程中不太建议使用,因为其实从数仓角度,希望所有的数据都能够通过schema化的做管理。把数据放在文件里面去做这样一个操作,不利于我们做整体数据的管理和规范化的。所以这个方式的话,大家在做一些小的demo的时候,或者一些测试的时候可以去使用。
那么它适用的场景就是维度数据是文件形式的、数据量比较小、并且更新的频率也比较低的一些场景,比如说我们读一个静态的码表、配置文件,等等。

2.1.2 热存储关联

【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_18.png
具体是说我们把维度数据导入到像redis、tair、hbase这样的一些热存储中,然后通过异步IO去查询,并且叠加使用cache机制,还可以做一些淘汰的机制,最后将维度数据缓存在内存里,来去减轻我们整个对热存储的访问压力。
如上图展示的这样的一个流程。在cache这块的话,比较推荐谷歌的guava Cache,它封装了一些关于cache的一些异步的交互,还有cache淘汰的一些机制,我们用起来是比较方便的。
刚才的实验方案里面有两个重要点,一个就是我们需要用的异步IO方式去访问它的存储,这里也跟大家一起再回顾一下:同步IO与异步IO的区别:

  • 对于同步IO来说,发出一个请求以后,必须等待请求返回以后才能继续去发新的request。所以它的整个吞吐是比较小的。由于实时数据处理对于延迟特别关心和在意的这样的场景,这种同步io的方式,对于我们说是不太能够接受的。
  • 异步IO就是我们并发可以发出多个请求,整个的吞吐是比较高的,延迟会相对低很多。如果使用异步 io的的话,它对于外部存储的吞吐量升以后,也会使得外部存储有比较大的压力,反而也会造成我们整个数据处理上可能作为一个延迟的瓶颈。所以引入cache机制也是希望通过cache来去减少我们对外部存储的访问量。

刚才提到的guava Cache,它的使用是非常简单的,下图是一个定义cache样例:
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_19.png
大家可以看到它的使用接口是非常简单,希望大家可以去尝试使用。对于热存储关联这样的一个方案来说,它的优点就是维度数据因为不用全量的加载在内存里,所以就不受限于内存大小,维度数据可以支持更多。然后再比如说在美团点评的流量场景里面,我们的维度数据可以支持到10亿量级,从去年来说也是比较明显的,我们需要依赖和存储的资源,而且维度的更新,反馈的结果是有一定的延迟的。 因为我们首先需要把数据导入到一个存储,然后同时在cache过期的时间上也会有损失。
总体来说这个方法适用的场景是说我们维度数据量比较大,又能够接受维度更新有一定延迟的这种场景。

2.1.3 广播维表

  1. 利用broadcast State将维度数据流广播到下游taskjoin。<br /> 实现方式:
  • 将维度数据发送到kakfa作为广播原始流S1
  • 定义状态描述符MapStateDescriptor。调用S1.broadcast(),获得broadCastStream S2
  • 调用非广播流S3.connect(S2),得到BroadcastConnectedStream S4
  • 在KeyedBroadcastProcessFunction/BroadcastProcessFunction实现关联处理逻辑,并作为参数调用S4.process()

    这个方案,它的优点是说维度的变更可以及时的更新到结果。然后缺点就是数据还是需要保存在内存中,因为它是存在state里的,所以支持维表数据量仍然不是很大。适用的场景就是我们需要时时的去感知维度的变更,然后唯度数据又可以转化为实时流,这样的场景我们就可以使用这样的方式。
    下面是一个小的demo:
    【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_21.png
    我们这里面用到的广播流pageStream,它其实是定义了一个页面ID和页面的名称。然后对于非广播流probeStream,它是一个json格式的string,然后里面包含了设备ID、页面的ID、还有时间戳,我们可以理解成用户在设备上做PV访问的这样的一个行为的记录。
    整个实现来看,就是上述的步骤:

  • 第1步骤的话就是要定义广播的状态描述符。

  • 第2步骤我们这里去生成broadCastStream。
  • 第3步骤的话我们就需要去把两个stream做connect。
  • 第4步最主要的一个环节就是需要实现BroadcastProcessFunction。第1个参数是我们的probeStream,第2个参数的话是广播流的数据,第3个参数就是我们的要输出的数据,可以看到主要的数据处理逻辑是在processElement里面。

在数据处理过程中,我们首先通过context来获取我们的broadcastStateDesc,然后解析 probe流的数据,最终获取到对应的一个配置ID。接着的话我们就在我们刚才拿到了 state里面去查询,是否有同样的配置ID,因为我能够找到对应的配置id话,我们会把对应的配置name添加到我们整个json stream去做输出。

2.1.4 Temporal table function join

  1. 介绍完了上面的方法以后,还有一种比较重要的方法是用Temporal table function join。首先说明一下什么是Temporal table?它其实是一个概念:就是能够返回持续变化表的某一时刻数据内容的视图,持续变化表也就是changing table,可以是一个实时的changelog的数据,也可以是放在外部存储上的一个雾化的尾表。 <br /> 它的实现是通过udtf去做 probe流和Temporal table 的一个join,称之Temporal table function join。然后这种join的方式,它适用的场景是维度数据为changelog流的形式,而且我们有需要按时间版本去关联的诉求。<br />

首先来看一个例子,而这个例子是用的官网关于汇率和货币交易的一个例子。对于我们的维度数据来说,也就是刚刚提到的changelog stream,它是RateHistory。它反映的是不同的货币相对于日元来说,不同时刻的一个汇率。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_23.png

第1个字段是时间,第2个字段是currency货币。第3个字段就是相对日元的汇率,然后在我们的Temporal table来看的话,它定义的是购买不同货币的订单的情况。然后比如说在10:15购买了两欧元,他记录是货币交易的一个情况。我们在这个例子里面,我们要求的就是购买货币的总的日元交易额,如何通Temporal table function join来去实现我们这个目标。

  • 第1步首先我们要在changelog流上面去定义TemporalTableFunction,这里面有两个关键的参数是必要的。然后第1个参数就是能够帮我们去识别版本信息的一个time attribute,第2个参数呢是需要去做关联的组件,这里的话我们选择的是 currency。
  • 接着的话我们在 tableEnv里面去注册TemporalTableFunction的名字。

然后我们来看一下我们注册TemporalTableFunction,它能够起到什么样的一个效果。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_24.png

比说如果我们去使用rates函数,去获取它11:50的一个状态。对应可以看到对于美元来说,它在11:50的状态其实落在11:49~11:56这个区间的,所以我们选取的是99。然后对于欧元来说,11:50的时刻是落在11:15和12:10之间的,所以我们会选取119这样的一条数据。它其实就是实现我们在一刚开始去定义TemporalTable的概念,能够获取到changelog的某一时刻的有效数据的这样的一个作用。那定义好TemporalTableFunction以后,我们就要需要使用这个Function,具体实现业务逻辑。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_25.png
大家注意这里需要去指定我们对具体需要用到的 join key。比如说因为两个流都是在一直持续更新的,对于我们的order table里面11:00的这一条记录来说,关联到的就是欧元在10:45这一条状态,然后它是116,所以最后的结果就是232。
刚才介绍就是Temporal table function join的用法。

2.1.5 维表join的对比

  1. 然后来整体回顾一下在维表join这块,各个维度join的一些差异,便于我们更好的去理解各个方法适用的场景。
  • 【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_26.png
  • 在实现复杂度上面的:除了热存储关联稍微复杂一些,其它的实现方式基本上复杂度是比较低的。
  • 在维表数据量上:对于热存储关联和Temporal table function join两种方式的话可以支持比较多的数据量。其它的方式因为都要把维表加到内存,所以就受限内存的大小会有一个限制。
  • 在维表更新频率上面:因为预加载DB数据到内存和Distributed Cache在重新更新维表数据的时候都需要重启,所以它们不适合维表需要经常变更的场景。而对于广播维表和Temporal table function join来说,可以实时的更新自己的维表数据反映到结果,所以它们可以支持维表频繁更新的场景。
  • 对维表更新实时性来说:在广播维表和Temporal table function join,它们可以达到比较快的实时更新的效果。热存储关联在大部分场景也是可以满足业务需求的。
  • 在维表形式上面:可以看到第1种方式主要是可以支持去访问DB存储少量数据的这种形式,Distributed Cache只是支持文件的形式,热存储关联需要访问hbase和tair等等这种热存储。广播维表和Temporal table function join都需要维度数据能转化成实时流的形式,
  • 在外部存储上面:第1种方式和热存储关联都是需要依赖外部存储的。

在维表join这一块,我们就介这几个基本的方法。也可能有的同学会有一些其他的方式,之后的话可以反馈交流,因为这里面其实主要提到了一些比较常用的方式,不限于这些方式。

2.2双流join

  1. 首先我们来回顾一下,在批处理是怎么去处理两个表join的?一般的话在批处理引擎实现的时候,会采用两个思路。<br /> 一个是基于排序的Sort-Merge join。另外一个的是转化hash table加载到内存里做Hashj oin。这两个思路对于我们双流join的场景是否还同样适用?在双流join场景里面要处理的对象不再是这种批的数据、有限的数据。而是是无穷数据集,对于无穷数据集来说,我们没有办法去排序以后再做处理,同样也没有办法把无穷数据集全部转成cache加载到内存去做处理。 所以这个代价是比较高的,这样的方式基本是我们不能够适用的。同时在我们的双流join场景里面,我们的join对象是两个流,数据也是不断的在进入的,所以我们join的结果也是需要持续性的去更新的。 <br /> 所以我们应该有什么样的一个方案去解决双流join的实现问题。这里面flink的一个基本的思路是需要将两个流的数据存到state中持续性的去存储,然后去使用。因为我们需要不断的去更新join的结果,以前的数据理论上如果没有任何附加条件的话是不能丢弃的。但是从实现上来说state又不能永久的保存所有的数据,所以需要通过一些方式将join的这种全局范围局部化,就是说把一个无线的数据流,能尽可能给它拆分切分成一段一段的有线数据集去做join <br /> 其实基本的思路就是基于这样的一个大的思路,接下来去看一下具体的实现方式。

2.2.1 离线join vs. 实时join

  1. 接下来我们以inner join为例去看一下,一个简单的实践的思路:<br />![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_28.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590734961971-374b2d5a-cea5-48b0-91a7-16b3fed51b65.png#align=left&display=inline&height=1440&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_28.png&originHeight=1440&originWidth=2560&size=212433&status=done&style=none&width=2560)<br />左流是黑色标出来的这一条,然后右流是蓝色标出来的,对于这条两流要去做inner join。首先左流和右流在元素进入以后,需要把相关的元素存储到对应的 state上面。然后除了存储到state上面以外,左流的数据元素到来以后需要去和右边的Right Stream去做一个比较看能不能mtach。同样右边的流元素到了以后,也需要和左边的Left Stream去做一个比较看是否能够match,能够match的话就会作为inner join的结果输出。这个图是比较比较粗的展示出来一个inner join的大概的细节。也是让大家大概的体会双流join大概的一些实现思路。

2.2.2 Regular join

  1. 我们首先来看一下第1类重要的方式是Regular join。这种方式的话需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以这种数据的话就需要持续性的存在state里面,那么state又不能存的过大,因此这个场景的只适合有界数据流。然后它的语法可以看一下,是比较像我们的离线批处理的sql的:<br />![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_30.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590742079391-959ab134-9502-45f3-a0ff-4880338192e1.png#align=left&display=inline&height=1441&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_30.png&originHeight=1440&originWidth=2560&size=329708&status=done&style=none&width=2560)<br />在上图页面里面是现在flink支持Regular join的一些写法,可以看到和我们普通的sql基本是一致的。 <br />

2.2.3 Interval join

  1. 在双流join里面flink支持的第2 join 就是Interval join也叫区间join。它是什么意思呢?就是加入了一个时间窗口的限定在两个流做join的时候。其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的join key相同才能够完成join。加入了时间窗口的一个限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的state。<br /> Interval join是同时支持 processing timeeven time去定义时间的。如果使用的是processing time的话,flink的内部会使用系统时间去划分窗口,并且去做相关的state清理。如果使用even time就会利用Watermark的机制去划分窗口,并且做系列的清理。<br /> 下面我们来看一些示例:<br />

截屏2020-05-29 17.06.16.png

上图这个示例用的数据是是两张表:一个是订单表,另外一个是配送表。这里定义的时间限定是说配送的时间,必须在下单后的4个小时内。
Flink的作者之前有一个内容非常直观的分享,我们就直接复用了它这部分的一个示例:
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_32.png
我们可以看到对于Interval join来说:它定义一个时间的下限,就可以使得我们对于在时间下限之外的数据就可以做一个清理。比如说在刚才的sql里面,其实我们就限定了join条件是ordertime必须要大shiptime减去4个小时。 对于Shipments流来说,如果接收到12:00点的Watermark,就意味着对于Orders流的数据小于8:00点之前的数据时间戳就可以去做一个丢弃,不再保留在state里面了。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_33.png
同时对于shiptime来说,其实它也设定了一个时间的下限,就是它必须要大于ordertime。对于Orders流来说如果我接收到了一个10:15点的Watermark, 那么Shipments的state10:15这部分之前的数据我就可以抛弃掉。 所以Interval join它就使得我们可以对于一部分历史的state去做清理。

2.2.4 Window join

  1. 最后来说一下双流join的第3Window join:它的概念是将两个流中有相同key和处在相同window里的元素去做join。它的执行的逻辑比较像Inner join,必须同时满足join key相同,而且在同一个window里元素才能够在最终结果中输出。然后使用的方式是这样的:<br />![截屏2020-05-29 17.48.45.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590745740559-36be4c58-c043-4673-8528-865cb2d6d2f8.png#align=left&display=inline&height=644&margin=%5Bobject%20Object%5D&name=%E6%88%AA%E5%B1%8F2020-05-29%2017.48.45.png&originHeight=644&originWidth=2008&size=371814&status=done&style=none&width=2008)<br />目前Window join只支持Data stream的API,所以这里使用方式也是Data stream的一个形式。可以看到我们首先把两流去做join,然后在where和equalTo里面去定义我们我们的join key的条件,然后在window中需要去指定我们 window划分的方式WindowAssigner,最后的话我们要去定义JoinFunction或者是FlatJoinFunction,来去实现我们匹配元素的一个处理的具体的逻辑。<br /> 因为window其实划分为三类,所以我们的Window join这里也会分为三类:<br />第1类Tumbling Window join:它是按照时间区间去做划分的 window。<br />![【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_35.png](https://cdn.nlark.com/yuque/0/2020/png/1360420/1590755738865-8f652a6b-28b7-4812-ab6e-4830ff02c44a.png#align=left&display=inline&height=1440&margin=%5Bobject%20Object%5D&name=%E3%80%90%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E7%AF%87%E3%80%91%E5%9F%BA%E4%BA%8Eflink%E7%9A%84%E5%85%B8%E5%9E%8BETL%E5%9C%BA%E6%99%AF%E5%AE%9E%E7%8E%B0-%E4%B9%B0%E8%93%89%E7%9A%84%E5%89%AF%E6%9C%AC_35.png&originHeight=1440&originWidth=2560&size=315024&status=done&style=none&width=2560)<br />然后我们可以看到这个图里面是两个流(绿色的流年和黄色的流)。然后在这个例子里面我们定义的是一个两毫秒的窗口,可以看到每一个圈是我们每个流上一个单个元素,上面的时间戳代表元素对应的时间,所以我们可以看到它是按照两毫秒的间隔去做划分的,然后 window和window之间是不会重叠的。 对于第1个窗口我们可以看到绿色的流有两个元素符合,然后黄色流也有两个元素符合,它们会有pair组合,最后输入到JoinFunction或者是FlatJoinFunction里面去做具体的处理。

第2类是Sliding Window Join:这里用的其实是Sliding Window。
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_36.png

它定义是我们首先定义一个窗口大小,然后再定义一个滑动时间窗的一个大小。如果滑动时间窗的大小小于定义的窗口大小,窗口和窗口之间是会存在重叠的情况。就像这个图里显示出来的,红色的窗口和黄色窗口是有重叠的,其中绿色流的0元素同时处于红色的窗口和黄色窗口,说明一个元素是可以同时处于两个窗口的。然后在具体的 Sliding Window Join的时候,可以看到对于红色的窗口来说有两个元素,绿色0和黄色的0,它们两个元素是符合window join条件的,于是它们会组成一个0,0的pair。 然后对于黄色的窗口符合条件的是绿色的0与黄色0和1两位数,它们会去组合成0,1、0,0和1,0两个pair,最后会进入到我们定义的JoinFunction里面去做处理。

第3类是Session Window jon:这里面用到的 window是session window
【实时数仓篇】基于flink的典型ETL场景实现-买蓉的副本_37.png
它是定义一个时间间隔,如果一个流的元素在这个时间间隔内没有元素到达的话,那么它就会重新开一个新的窗口。在上图里面我们可以看到窗口和窗口之间是不会重叠的。我们这里定义的Gap是1,对于第1个窗口来说,可以看到有绿色的0元素和黄色的1、2元素都是在同一个窗口内,所以它会组成在1 ,0和2,0这样的一个pair。剩余类似这些符合条件的pair都会进入到最后JoinFunction里面去做处理。

整体我们可以回顾一下,这一节主要是介绍了维表join和双流join两大类的场景。在维表join上:主要介绍了预加载维表、热存储关联、广播为表、Temporal table function join这4种方式。然后在双流join上我们介绍了Regular join、Interval join和Window join。