Streaming概述

在很多实时数据处理的场景中,都需要用到流式处理(Stream Process)框架,Spark也包含了两个完整的流式处理框架Spark StreamingStructured Streaming(Spark 2.0出现)。
流式计算,对无边界的数据进行连续不断的处理、聚合和分析image.png

应用场景

  • 电商实时大屏
  • 商品推荐
  • 工业大数据
  • 集群监控

image.png
【实时增量ETL】:实时接收数据,进行ETL处理,最后保存至存储引擎,比如HBase、Elasticsearch中。

计算模式

原生流计算模式

  • 一条一条数据处理分析,实时性很高
  • 比如Storm框架、Flink框架,以及StructedStreaming也支持

image.png

微批处理计算模式

  • 按照时间间隔划分流,将批次数据进行处理分析
  • 比如SparkStreaming和StructedStreaming

image.png

计算思想

SparkStreaming是一个基于SparkCore之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点
image.png
对于Spark Streaming来说,将流式数据按照时间间隔BatchInterval划分为很多部分,每一部分Batch(批次),针对每批次数据Batch当做RDD进行快速分析和处理。
它的核心是DStream,DStream类似于RDD,它实质上一系列的RDD的集合,DStream可以按照秒、分等时间间隔将数据流进行批量的划分。首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等。
DStream = Seq[RDD]

工作原理

  1. ssc.start()启动receiver接收器,实时接收数据源数据
  2. 按照时间间隔BlockInterval划分流式数据Block
  3. 存储Block至Executors内存中,设置副本数
  4. receiver想StreamingContext发送Block报告
  5. 达到BatchInterval时间间隔生成批次,将数据当做RDD交给SparkContext处理

整个Streaming运行过程中,涉及到两个时间间隔:

  • 批次时间间隔:BatchInterval
  • Block时间间隔:BlockInterval
    接收器划分流式数据的时间间隔,可以调整大小,官方建议最小值不能小于50ms
    默认值为200ms,属性:spark.streaming.blockInterval,调整设置

    BatchInterval: 1s = 1000ms = 5 * BlockInterval

    Lambda架构

    Lambda架构是由Storm的作者Nathan Marz提出的一个实时大数据处理框架
    Lambda架构的目标是设计出一个能满足实时大数据系统关键特性的架构,包括有:高容错、低延时和可扩展等。Lambda架构整合离线计算和实时计算,融合不可变性(Immunability),读写分离和复杂性隔离等一系列架构原则,可集成Hadoop,Kafka,Storm,Spark,Hbase等各类大数据组件
    image.png
    Lamdab架构分为三层:

  • Batch Layer:批处理层,离线分析

  • Speed Layer:速度层,实时分析
  • Serving Layer:服务层,存储批处理层和速度层分析结果数据

image.png

DStream

SparkStreaming模块将流式数据封装的数据结构:DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种Spark算子操作后的结果数据流。
DStream内部是由一系列连续的RDD组成的,每个RDD都包含了特定时间间隔内的一批数据,如下图所示:
image.png

DStream Operations

DStream类似RDD,里面包含很多函数,进行数据处理和输出操作,主要分为两大类:

在SparkStreaming企业实际开发中,建议:能对RDD操作的就不要对DStream操作,当调用DStream中某个函数在RDD中也存在,使用针对RDD操作

流式应用状态

无状态Stateless

  • 使用transform和foreacRDD函数
  • 实时增量数据ETL:实时从Kafka Topic中获取数据,经过初步转换操作,存储到ES或HBase表中

    有状态State

  • 函数:updateStateByKey、mapWithState(推荐,性能好)

  • 双11大屏幕所有实时统计数字(比如销售额和销售量等),比如销售额、网站PV、UV等

    窗口统计

    窗口函数【window】声明如下,包含两个参数:

  • 窗口大小(WindowInterval,每次统计数据范围)

  • 滑动大小(每隔多久统计一次),都必须时批处理时间间隔BatchInterval整数倍

image.png
参考文档:http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations

Kafka集成

在实际项目中,无论使用Storm还是SparkStreaming与Flink,主要从Kafka实时消费数据进行处理分析,流式数据实时处理技术架构大致如下:
image.png

技术栈

Flume/SDK/Kafka Producer API -> KafKa —> SparkStreaming/Flink/Storm(Hadoop YARN) -> Redis -> UI

https://github.com/alibaba/canal/wiki/QuickStart

  • Maxwell
    实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序
    http://maxwells-daemon.io/

https://github.com/zendesk/maxwell

kafka 0.8.2(3.x版本不支持)

https://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html

  • 简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据
  • 高级消费API(Consumer High Level API),Receiver接收器接收数据

    kafka 0.10.x

    https://spark.apache.org/docs/3.1.2/streaming-kafka-0-10-integration.html

  • 直接到Kafka Topic中依据偏移量范围获取数据,进行处理分析

  • 每批次中RDD的分区与Topic分区一对一关系
  • 获取Topic中数据的同时,还可以获取偏移量和元数据信息 ```xml org.apache.spark spark-streaming-kafka-0-10_2.12 3.1.2

``` 工具类KafkaUtils中createDirectStream函数API使用说明(函数声明):
image.png

Kafka偏移量管理

当应用关闭以后,再次启动(Restart)执行,并没有继续从上次消费偏移量读取数据和获取以前状态信息,而是从最新偏移量(Latest Offset)开始的消费,有三种方式:Checkpoint恢复、Kafka自身和手动管理偏移量。

Checkpoint 恢复

当流式应用再次启动时,从Checkpoint 检查点目录恢复,可以读取上次消费偏移量信息和状态相关数据,继续实时处理数据。
image.png
参考文档:http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#checkpointing

Kafka itself

  • Kafka 自身管理消费偏移量
  • 当每批次结果RDD输出完成以后,异步手动提交偏移量

    手动管理偏移量(推荐)

    用户编程管理每批次消费数据的偏移量,当再次启动应用时,读取上次消费偏移量信息,继续实时处理数据。针对偏移量数据:自己管理偏移量,将偏移量存储到MySQL表、Zookeeper、HBase或Redis。
    参考文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#storing-offsets
    以下MySQL可以是其他数据:
    image.png

    性能优化

    image.png