一、SparkStreaming概述
1、数据处理的类型分析
静态数据
- 数据源时不变的,有限的,比如爬虫工程师爬下来的每天的数据。
- 这类数据适合离线,批量计算
流式数据
- 数据时变动的,无限的,连续的
- 多适合实时处理,能在秒级处理完成
SparkStreaming是什么
- 定义:微批处理的流式数据实时计算框架
- 原理:把输入数据按某一时间间隔批量的处理,将时间间隔缩短到秒级别,即可达到实施处理
重要概念说明
- StreamingContext:上下文环境
- 数据源:source指数据是从哪里来,文件,socket,kafka
- 离散流:Discretized Stream 缩写为DStream,微批处理当中的数据抽象单位
- 输入离散流:Input DStream。连接一个外部数据源来读取数据的统称
- 批数据:将源数据以时间间隔分隔开的数据
- 时间片:作为拆分流数据的依据
- 窗口长度:一个窗口覆盖的数据量,时间必须是时间片的整数倍
- 滑动窗口和滚动窗口:
- 滑动窗口时间间隔:上一个窗口到下一个窗口的时间间隔,必须是时间片倍数
二、scala快速构建SparkStreaming
1、构建步骤
2、代码实现
package com.tledu.sparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author
*/
object SparkStreaming01 {
def main(args: Array[String]): Unit = {
//初始化配置文件
val conf = new SparkConf().setMaster("local[*]").setAppName("mhk_sparkStreaming_test")
//初始化上下文并设置时间间隔
val streamingContext = new StreamingContext(conf,Seconds(3))
val lines = streamingContext.socketTextStream("192.168.2.134",9999)
val word = lines.flatMap(item => item.split(" "))
var wprdKV = word.map(item => (item,1))
var res = wprdKV.reduceByKey(_ + _)
res.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
3、SparkStreaming的注意事项
- 一旦启动了就不能添加新的流式计算
- 上下文停止后,无法重新启动
- 在JVM中只能激活一个StreamingContext
- StreamingContext中的Stop()也会停止SparkContext。想要仅停止StreamingContext将stop()可选参数stopSparkContext设置为false
- 只要在创建下一个StreamingContext之前停止StreamingContext,就可以重复使用SparkContext创建。
- SparkStreaming中使用的流数据专用数据抽象DStream,内部均为指定时间间隔生成的RDD形式
DStream内部即为RDD持续的离散序列,RDD内部均是以文本行为基本元素,对DStream的处理即转换为对内部RDD的处理,所以对RDD的操作大多适用于对DStream处理,并且DStream提供更强大的API使用
4、SparkStreaming常见问题说明
输入DStream和Receivers
- Dstream:从数据源流接收的输入数据流的DStream
- 基本流数据源:包括本地或者hdfs文件系统,socket、akkr、actor等
- 高级流数据源:包括kafka,flume等数据源,引入第三方工具使用该数据源
- Receivers:每个输入DStream(文件流除外)均与Receivers相关联,负责从源数据流接受数据并spark内存中方便处理
- Dstream:从数据源流接收的输入数据流的DStream
关于多路输入流的处理
- SparkStreaming对多路输入流进行自然和谐的支持
- 通过SparkStreamingContext=可以同时创建多路输入流,直接提供API进行各种操作。
DStream的输出操作
- print():在运行流应用程序的驱动程序节点上DStream每一批数据的前十个元素。
- saveAsTextFiles:将此DStream内容保存为文本文件
- saveAsObjectFiles:将此DStream内容保存为SequenctFiles序列化Java对象
- saveAsHadoopFiles:将此DStream内容保存为hadoop文件
- foreachRDD:最通用的输出运算符,在驱动程序Driver进程中执行
- foreach和foreachPartion运行在Worker节点。
关于SparkStream在本地运行时线程数量设置
- 在本地运行时不能使用loacl或者loacl【1】
- 因为要有线程来运行接收器,所以一般使用loacl[n],n要大于接收器数量
关于SparkStreaming在集群中运行时cpu逻辑核心数设置
- 一个逻辑CPU相当于一个开启线程的能力
- 在集群上运行时核心数必须大于接收器数。否则将只能接受而无法处理数据