6.2 双11背后的大规模数据处理
1. 实时数据总线服务-TT
TimeTunnel(TT)在阿里巴巴集团内部是一个有着超过6年历史的实时数据总线服务,它是前台在线业务和后端异步数据处理之间的桥梁。从宏观方面来看,开源界非常著名的Kafka+Flume的组合在一定程度上能够提供和TT类似的基础功能;不同的是,在阿里巴巴的业务体量和诉求下,我们有比较多的配置管控、资源调度、轨迹校验和血缘识别等方面的工作。
TimeTunnel产品架构
1.1 Pub/Sub服务
通过上图我们清楚地看到,TT的核心部分是一个基于HBase做中间存储的Pub/Sub服务,它提供了一个能支撑高读写比、大吞吐量和数据不丢的队列服务。除此之外,基于日常运维考虑,我们还支持了按时间seek和弹性伸缩的能力。
数据需要在Pub/Sub“落地”的需求一方面来自于业务上对热点数据多份消费的考虑,另一方面一些在线算法方面的应用需要经常性地对数据进行回放训练,数据“落地”能够比较好地对前后台进行解耦。事实上,TT里最热门的数据(例如天猫交易相关)有超过100倍的读写比;而从整体来看,仅双11当天流出TT的数据也比流入的数据多了3倍以上。
选择HBase作为中间存储的原因是能够成本较低地复用基于HDFS的多副本存储能力,以及HBase自身在提供读写服务时对于热点数据的内存管理能力。图 8是写入TT的数据在HBase中的存储模型,我们在broker层面通过构造合理的rowkey来使得同一个分区下的数据可按rowkey顺序scan;同时,因为在生成rowkey的时候我们使用了broker上的时间戳作为高位变量,因此能很方便地提供按时间seek的能力。
数据在HBase中的存储模型
1.2数据采集
上图左侧黄色部分是TT的数据采集方案。我们通过以下途径来准实时地收集前台业务产生的增量数据:
- 依赖DRC实现对MySQL、OceanBase以及Oracle等前台业务数据库的增量变更进行捕捉解析;
- 自研的日志Agent部署在数十万台的应用服务器上,准实时地捕捉应用日志的变化;
- 和其他一些内部主流存储例如OTS进行打通;
- 用户采用TT提供的SDK主动写入。
随着集团内重要业务异地多活架构和全球化的发展,数据采集分散在跨越数千甚至上万公里的多个IDC中;而与此相反,以Galaxy、ODPS为代表的大数据计算服务则需要考虑充分地利用大集中的架构来提升吞吐能力。因此,不可避免地在数据采集过程中需要对数据进行缓冲和压缩以尽可能降低长途链路对于吞吐量的负面影响。
矛盾的是,缓冲意味着前端产生的数据需要在采集端“等待”,也就意味着消费方看到的数据是延迟的。这对于像阿里妈妈这样依赖TT做反作弊和实时计费的业务来讲是很难接受的,数据延迟意味着资损,意味着用户体验的显著下降。同样地,压缩也是要消耗采集端的服务器资源的,尤其在双11这样的场景下,前台业务对于采集端的功耗尤其敏感。
遗憾的是,世界上从来没有一个只带来好处而没有任何弊端的事物,软件和产品的设计中处处都是折衷和取舍。除了在技术层面将实现细节做到尽可能极致,TT为了服务这些不同的场景,也提供了一些可配置的参数例如buffersize、sendthreads、compressLevel等用来匹配用户对延时、性能以及功耗的不同需求。
1.3 轨迹校验
TT区别于其他类似产品的最大之处,是我们通过技术埋点实现了一套完整的数据轨迹校验的方案——我们称之为“门将”。轨迹校验的目的在于通过监控的手段来保证“数据不丢”,设计得好,甚至可以识别出数据的重复、乱序等情况。
几乎所有类似的产品都宣称自己能做到“数据不丢”,当然也包括配备了“门将”之前的TT。有意思的是,几乎所有类似的产品都会被“丢数据”这个问题困扰,同样包括TT。因为我相信我们一定有能力在软件设计以及编码实现方面做到“数据不丢”的承诺,但往往会在一些预期外的异常case、版本升级或者系统耦合的地方出现这样那样的纰漏,从而导致下游消费方看起来缺失了部分数据。
以日志采集为例,我们碰到过因为操作系统的限制(请参阅max_user_watches相关的说明),inotify没有通知到新文件的产生而发生整个文件漏采集;也碰到过因为软件的bug在递归创建子目录的情况下出现了时序问题导致文件漏采集;还碰到过保存在应用服务器上的checkpoint文件被意外损坏导致的“丢数据”。这样的案例实在太多,而且防不胜防。
所以,工业界真正的“数据不丢”我认为是有完备的机制能够快速地发现数据丢失,考验的是系统的监控能力。
上文提到过,TT支撑着阿里妈妈的实时反作弊和点击计费业务;同样地,蚂蚁金服大量涉及资金核对和商户对账的业务也将身家性命托付在TT上。这样的业务不允许有任何原因导致的数据正确性问题。
“门将”的核心思路是在采集端往TT写入数据的同时,构造恰当的meta,将数据“链表化”,从而能够在“门将”的校验服务里对数据轨迹进行还原,进而和源头进行校验(图 8)。
仍然以日志采集为例。在采集过程中,我们以ip+dev+inode+sign来唯一识别内网上的一个文件,在构造meta时记录下当前数据包在原始文件中的offset和当前数据包的大小size,那么对于同一个文件的多个数据包,通过offset和size就能快速地识别出文件内有没有被重复采集或者遗漏采集。如果在恰当的时间内与这台机器上ls命令得到的结果进行比对,就很容易发现有没有文件被漏采集。
1.4 小结
所有的技术实现都是业务需求的抽象,这些需求有可能来自于大多数用户需要用到的功能,更有可能来自对上下游业务架构和场景的理解。数据总线服务是一个和业务架构耦合非常密切的基础组件,阿里巴巴集团独特的技术架构、多样性的存储方案和横向平台化的研发模式赋予了TT探究更复杂问题的原动力。
在2016年双11这样一个万众瞩目的时间点,TT通过前期的软件性能和机房规划上的努力,高峰期单一集群承担了15GB/s的写入和50GB/s的读取流量,首次做到了对所有业务进行不降级服务。这对于我们、对于搭建在TT上的众多业务,都是极大的鼓舞。
2. Galaxy:大规模数据流处理技术
每年双11除了“折扣”,阿里人关注的另一个焦点,就是面向全世界媒体直播的“实时大屏”(如下图所示)。包括总成交量在内的各项指标,通过数字维度展现了双11狂欢节这一是买家,卖家及物流小二共同创造的奇迹!
图:双11媒体直播大屏
为实现这一大屏,背后需要实时处理海量的、庞大电商系统各个模块产生的交易日志。例如双11当天产生的日志量达到了PB级别,而每秒处理的峰值更是高达近1亿事件!
如此大规模、高吞吐和低延时计算,带来一系列世界级的技术挑战,包括:
- 实时编程:流式的数据处理给业务逻辑的表达和推理带来了很多的复杂性。特别面对不断变化的业务需求,如何帮助用户快速地编写和验证实时计算逻辑是至关重要的。
- 低延时:实时计算强调计算延时和结果的时效性。例如实时大屏对计算延时特别敏感,每年的双11都超越前一年更早地达到相同的成交量,系统需要在秒级甚至毫秒级反应出每一笔交易。即使在流量高峰时(双11晚0:00点)也需要保证延时!
- 集群利用率:为提高资源利用率,我们将不用业务的实时处理逻辑共享一个集群。这样的共享也带来性能隔离的问题,即如何让同一台物理机上的不同逻辑任务不互相干扰。这也是大部分开源框架忽略的重要问题。
- 严格容错及数据一致性:随着应对高吞吐而不断扩大的集群规模,各种软硬件故障都难以避免。如何保证实时计算在任何故障下都能产生准确、一致的计算结果,不遗漏、重复事件输出,也不引起内部状态的偏差,是另一个重大挑战。
- 多样化场景支持:随着实时决策对业务的价值越来越多,系统还需要支持越来越复杂和多样化的场景,如在线机器学习、结合图计算实现的动态关系网络分析等等。
下文介绍Galaxy的重要技术创新,简要描述它们如何帮助应对以上技术挑战。
2.1 SQL与增量计算——复用熟悉的离线思维,自动实现增量(流式)计算
为了简化用户编程,特别是利用原有的离线计算作业快速实现实时计算,Galaxy允许通过高层描述性语言,如用户熟悉的SQL来编写流计算作业。通过简单几行SQL代码就可以实现过滤、双流关联等业务逻辑。
在执行时,由于数据是以流式进入系统的,用户的SQL就像数据库视图一样,被自动增量更新,并以一定的频率输出结果,供下游计算和展示。
这一独特的编程设计,不仅帮助用户借助熟悉的离线处理思维表达实时计算逻辑,也因为同样的程序可以在离线系统运行,使得结果的对比变得易如反掌。
2.2 高性能优化引擎——实现低延时计算
用户的SQL脚本经过编译优化,生成数据流图,然后运行于Galaxy的分布式引擎之上。相比开源数据流引擎,Galaxy引擎在“阿里巴巴规模”下,面对真实复杂的业务场景做了很多优化。包括自适应的消息打包、自定义序列化、数据行+列压缩、先进的内存管理、和内部缓存队列和线程模型,以及基于下游向上游“反向”传递压力的流控策略等。
图:Galaxy优化执行流和运行时模块
经过以上一系列的优化,Galaxy相比去年提升了6倍左右的吞吐性能。下图显示了Galaxy相比开源系统的性能优势。在面对今年双11 3倍于去年的峰值情况下,表现非常稳健。
图:开源框架性能对比,通过“窗口WordCount(6组参数)”基准测试获取
2.3 灵活的资源调度
Galaxy面对阿里巴巴集团众多业务场景,将不同业务放置于大规模(几千台服务器组成的)共享集群中,以提高资源利用率。另一方面也随之带来了“多租户”环境下的作业资源隔离问题,它直接影响资源的有效利用和作业的计算性能。
经过多年的积累,Galaxy支持CPU、内存、网络和磁盘I/O等多维度资源的隔离。例如,对于CPU的隔离支持灵活的min-max策略,既保证了每个作业最基本的资源需求,也使的空闲的资源被最大限度利用。
图:作业维度的CPU资源min-max共享模型
在此基础上,Galaxy的资源调度还支持一定比例的“超卖”、作业优先级调度、动态负载均衡和微作业共享单一物理核等多种机制。对于资源消耗特别大的作业还支持动态按需分配(即资源的弹性分配)。在满足复杂的运维要求和实时计算连续性的同时,实现了高效的资源利用和性能隔离。
2.4 容错与状态管理
流计算需要连续处理可能无界的输入和连续产生输出。在长时间运行中,大规模计算集群的各种软件或硬件故障难以避免。由此对于计算和中间结果(如内存状态)的容错就至关重要。为了做到精确的容错和故障恢复,保证结果的准确性。Galaxy支持多种灵活的容错策略,以在不同计算特性下,权衡容错资源消耗和恢复性能。如基于输入的重新计算、状态检查点(checkpoint),甚至是多副本的状态和计算容错等。
特别是自动的分布式增量检查点功能,系统自动利用内存、本地磁盘和远程存储构成的多级存储,在不影响流计算延时的情况下异步实现了计算状态的持久化。当有故障发生时,保存的状态可以被快速加载。这一切对用户都是无感知的。
图:自动利用多级存储的流计算状态管理
2.5 开放可编程API(兼容Apache Beam)
除了SQL这样高层的描述语言和用户自定义逻辑(UDF),Galaxy还支持Apache Beam API,以提供更为灵活的实时逻辑编程。Beam是一个统一开放的大数据应用编程接口,可以同时描述离线和在线逻辑,最早由Google提出。Beam提供了功能丰富的编程接口,能有效的处理有界、无界、乱序的数据流输入。 下面显示了通过Beam实现的流式WordCount的例子:
1.指定Runner(底层计算引擎)创建一个Pipeline。
2.使用Source在Pipeline上生成一个PCollection,输入数据。
3.对PCollection应用Transforms操作,比如wordCount中的count操作。
4.对最后的PCollection应用Sink,输出结果到外部存储中。
5.Run Pipeline到底层的计算引擎中。
使用Beam实现WordCount代码样例
public static class CountWords extends PTransform
PCollection
@Override
public PCollection
// Convert lines of text into individual words.
PCollection
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection
words.apply(Count.
return wordCounts;
}
}
借助Beam,用户可以利用高性能的Galaxy引擎,定制面向特定领域的系统交互接口。同时,Galaxy今后也将兼容更多生态(如Spark Streaming和Flink Streaming API)。
2.6 可视化集成开发平台和自动化运维
Galaxy还提供了“一站式”的集成开发环境——贝叶斯(Bayes,https://data.aliyun.com/product/sc)和自动化运维平台——特斯拉(Tesla)。通过它们,用户可以方便地管理流计算应用的生命周期,包括编程、调试、监控运维,极大地降低了流计算系统的使用门槛。
图:贝叶斯集成开发环境
2.7 双11的宝贵工程经验!
为保障系统在双11平稳支撑业务,在以上功能基础上,我们还总结了完整的全链路保障方法:
- 主备双链路容灾:利用Galaxy对多副本执行的支持,面向双11重点媒体大屏等实时业务,实现了跨机房的多链路副本。哪怕是整个机房的故障,都能在秒级自动切换到另一副本上执行,保障了双11系统高可用。
- 实时全链路监控:我们从数据采集、读取、消费、入库各个环节都增加延时指标的埋点,可以清晰地看到整条链路各个阶段的延时,快速分析哪个组件性能瓶颈。另外,针对作业本身运行情况,比如输入吞吐、流量、CPU和内存消耗,都做了实时分析和展示的系统,能在秒级发现作业的异常。
- 运维诊断工具:为应对各种应急响应,我们做了一套完整的运维诊断工具用于发现集群热点机器、热点作业。在Tesla页面上能快速找到集群的热点机器,通过“机器分析”工具查看这台机器上实时跑的任务,并且能定位到相应的业务和用户。通过“作业分析”工具能自动诊断异常,结合作业的优先级,实现了一键负载均衡、启停、续跑等运维操作。
通过这些保障设施,双11当天,即使在发生交换机硬件故障的情况下,面向全球直播的媒体大屏业务并没有受到任何影响!
2.8 小结
拥有这些和其它诸多能力,Galaxy已经具备了相当完善的实时计算能力,也提供了“一站式”的解决方案。今年双11当天,Galaxy处理了PB级别数据,处理峰值达到了1亿事件每秒,平均处理延迟在毫秒级!除了双11媒体大屏,Galaxy还支撑着阿里巴巴集团内外众多实时业务,包括数据运营、广告营销、搜索个性化、智能客服、物流调度、支付宝、聚划算等。
3. MaxCompute
每年双11都是阿里巴巴从最“前端”到最“后台”所有系统整条链路的一次大考。电商在线系统的浏览和消费产生了大量数据,其数据量是平常的数倍到数十倍。这些数据最终要流到阿里巴巴的大数据计算服务—MaxCompute上来处理。
MaxCompute承载了阿里巴巴集团所有的离线计算任务,是集团内部核心大数据平台。截止到目前支撑着每日百万级规模的作业,整个系统拥有数万台机器,单集群规模上万,存储已经到达了EB级别,每天有数千位活跃的工程师在平台上做数据处理。
面对如此多的海量数据,首先需要能够低成本的将数据存储下来。MaxCompute依托背后的飞天分布式操作系统,将大量低成本PC服务管理起来。早在2013年,我们基于对业务增长速度的判断,发现系统的存储马上就要“撞墙”了,集群的规模将要应付不了与日俱增的数据量。直到后来成立了5k项目组,对技术难点进行了攻坚,将单集群规模扩大到了5000台,阿里巴巴也成为了中国首个独立研发拥有大规模通用计算平台技术的公司。
实际上单集群规模到达上万台本身技术挑战非常大,因为规模上来以后对系统设计要求非常高,整个架构不能有单点。但是整个业务规模决定了1万台机器是不够的,因此MaxCompute抽象出来一个控制层,将分布在各个不同数据中心的多个计算集群统一管理,根据业务特点将不同的业务放在不同的计算集群中,通过跨集群复制,自动将数据在多个集群中同步,使得用户可以把计算引擎当成一个平台。
3.1 跨集群复制和全局调度
运行在MaxCompute上的业务种类非常多,各个业务部门之间数据也有着错综复杂的依赖关系。如果恰好数据不在同一个地域/机房中,那么就要进行数据的异地读写。比如分析支付宝的数据需要淘宝的数据,支付宝的数据和淘宝的数据并不在同一个机房集群,那就需要跨集群的去读(直读),或者将数据拷贝到本地再读(跨集群复制)。此外由于数据是会被更新的,比如淘宝的数据更新了,这个时候要求支付宝的作业能够读到最新版本的数据。生产任务有各自的基线时间,对处理时间有要求,不能由于互访数据导致任务延时太长。机房之间虽然有几十到上百G的直连网络专线,但其他生产业务也对网络带宽有需求,互访数据不能把带宽都占满,需要有网络流量控制。多个任务可能会访问同一份异地数据,再考虑带宽占用的限制,所以访问异地数据不能全部都通过直读异地数据来解决,有的异地数据需要在本地复制一份以供多次任务使用。
为了解决这个问题,MaxCompute引入了跨集群复制和全局调度机制。MaxCompute上所有的数据表和分区的元数据引入了版本号,当数据被更新时,其对应的计算集群版本号也会更新。版本更新后,新版本所在的计算集群的数据需要被复制到其他计算集群。但这个复制操作该何时发生,需要考虑多种因素,比如任务完成时效要求,多集群之间的带宽大小等。对这些因素进行全局分析,才能利用动态预先调整,远程读,复制等多种手段做到全局调度。但这一全局分析需要系统运行数据才能进行。MaxCompute中的元数据、数据血缘关系的分析,以及整个系统运行过程中产生的数据都会收集到元数据仓库,这样可以利用平台本身的数据分析能力来分析这些数据。这些数据被用来辅助MaxCompute平台的工程师做数据化运营,甚至用来帮助系统自身进行优化。
3.2 基于历史运行信息的优化
通过对每天运行的作业进行分析,我们发现大部分作业都是重复执行的。这是数据仓库中的一个典型的使用场景: 每天产生的新数据被同一套数据处理任务批量重复执行。这样的场景带来了巨大的优化机会。首先每天运行的任务所占用的资源信息会被记录下来,比如运行时占用的CPU、内存和运行时间。工程师新开发的作业在第一次运行时,申请的CPU和内存一般都会和实际占用的CPU、内存有所差别。如果申请的大于实际占用的,会造成调度的时候为作业多留资源,造成资源浪费,即资源的利用率下降。如果申请的小于实际占用的,会造成一台机器上调度的作业超过了机器能够承载的负荷。这两种资源错配的后果都会降低系统使用效率。最理想的结果是作业申请的资源与实际使用的能够完全匹配。
HBO( History-ed Based Optimization) 基于历史运行信息的优化就是通过收集作业的历史运行记录,根据实际CPU、内存占用来指导作业合理设置的一种优化手段。它是对集群资源分配的一种优化,概括起来就是根据:任务执行历史+集群状态信息+优化规则,得到最优的作业资源配置。
HBO包含两部分工作:
- 在线部分(Online):查找是否存在相应的hbo优化计划,如果有,则按照计划进行资源分配并执行
- 离线部分(Offline):从元数据仓库和神农获取任务的历史执行记录,按照一定的策略生成hbo优化计划
下图为HBO的流程架构图:
正常情况下,这种基于历史的优化效果非常显著,因为作业总体数据量在天与天之间变化一般不会很大。但到了双11,由于当天产生的数据量通常是前几天的数倍甚至数十倍,对于一些极限情况需要做特殊处理。比如作业instance数会因为处理的数据量增大同步增长而超过单个作业instance数量上限。依托HBO的工作,可以识别重复的作业、并且能够精准的对单个作业进行设置。利用这个能力,我们可以在节日前先对所有作业做一次分析,比如找出输入表在去年双11当天数据量显著增涨的作业,或者找出instance数量已经快要接近极限的作业,将他们单个instance处理的数据量设大,顺利度过双11的考验。以同样的手法可以指导制作针对双11的预案,比如调整CPU、内存的设置、提前发现数据倾斜等等。