第一章.SparkStreaming概述
1.SparkStreaming是什么
![$09[SparkStreaming流式处理] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/059ef1e580fe69bb6249ed4f72d9fff8.png)
2.Spark Streaming架构原理
一.DStream介绍
![$09[SparkStreaming流式处理] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/699b12767a2ea0cf6af023cab2b3abc5.png)
二.架构图
- 整体架构图
![$09[SparkStreaming流式处理] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/b29a1424399c46428004ae4d3c77df9e.png)
- SparkStreaming架构图
![$09[SparkStreaming流式处理] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/1100a40d2c87aeccdf11e18824e35940.png)
三.背压机制
- Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。- 为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。- 通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值false,即不启用。
3.SparkStreaming特点
- 易用
![$09[SparkStreaming流式处理] - 图5](/uploads/projects/liuye-6lcqc@gws1uf/0ae3306617744e93b2b2031228ef1e3c.png)
- 容错
![$09[SparkStreaming流式处理] - 图6](/uploads/projects/liuye-6lcqc@gws1uf/ba3d267fe97a4151fa9dfccace58d07e.png)
- 易整合到Spark体系
![$09[SparkStreaming流式处理] - 图7](/uploads/projects/liuye-6lcqc@gws1uf/d63660871d5f6bc34177e7222c8f172d.png)
第二章.DStream入门
1.WordCount案例实操
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStream读取端口数据并统计不同单词出现的次数
- 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>
- 编写代码
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $01_WordCount {
def main(args: Array[String]): Unit = {
//1.创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf, Seconds(5))
//设置日志级别
ssc.sparkContext.setLogLevel("error")
//2.从网络读取数据
val ds = ssc.socketTextStream("hadoop102", 9999)
//3.数据处理
val ds2 = ds.flatMap(_.split(" "))
val ds3 = ds2.map(x => {
(x, 1)
})
val ds4 = ds3.reduceByKey(_+_)
//4.结果展示
ds4.print()
//5.启动程序
ssc.start()
//6.阻塞主线程,等待外部停止
ssc.awaitTermination()
}
}
- 启动程序并通过netcat发送数据
![$09[SparkStreaming流式处理] - 图8](/uploads/projects/liuye-6lcqc@gws1uf/495de80cceffe08809e3c0cb477934be.png)
- 在IDEA的控制台输出如下内容
![$09[SparkStreaming流式处理] - 图9](/uploads/projects/liuye-6lcqc@gws1uf/9ba0f23ad0df1f5d2715dfb2082e0ad3.png)
注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数
2.WordCount解析
- DStream是Spark Streaming 的基础抽象,代表持续性的数据流和经过各种spark算子操作后的结果数据流
- 在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成DStream,对这些RDD的转换是由Spark引擎来计算
- 说明:DStream中批次与批次之间计算相互独立,如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源,通常情况,批次设置时间要小于计算时间
![$09[SparkStreaming流式处理] - 图10](/uploads/projects/liuye-6lcqc@gws1uf/6e55df29c35a7617ee824fb626c26a33.png)
第三章.DStream创建
1.队列数据源
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object $02_DStreamQueue {
def main(args: Array[String]): Unit = {
//1.创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf,Seconds(5))
//设置日志级别
ssc.sparkContext.setLogLevel("error")
val queue = mutable.Queue[RDD[String]]()
//读取数据
/**
* oneAtATime=true 默认,一次读取队列里面的一个数据
* oneAtATime=false,按照设定的时间,读取队列里面的数据
*/
val ds = ssc.queueStream(queue,false)
//操作数据
val ds2 = ds.flatMap(_.split(" "))
val ds3 = ds2.map((_,1))
val ds4 = ds3.reduceByKey(_+_)
//结果展示
ds4.print()
//启动程序
ssc.start()
for (i<- 1 to 100){
val rdd = ssc.sparkContext.parallelize(List("hello java hello spark","hello hadoop flume spark"))
queue.+=(rdd)
Thread.sleep(2000)
}
//阻塞主线程
ssc.awaitTermination()
}
}
2.自定义数据源
- MyReceiver.scala
package com.atguigu.spark.day09
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK){
//receiver启动的时候调用
override def onStart(): Unit = {
new Thread(){
override def run(): Unit = {
receive()
}
}.start()
}
def receive():Unit={
//创建Socket客户端
val socket = new Socket("hadoop102",9999)
//获取输入流
val br = new BufferedReader(new InputStreamReader(socket.getInputStream))
var line = br.readLine()
while(line!=null && !isStopped()){
store(line)
line= br.readLine()
}
br.close()
socket.close()
}
//receiver停止的时候调用
override def onStop(): Unit = {
}
}
- UserDefinedSource.scala
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $03_UserDefinedSource {
def main(args: Array[String]): Unit = {
//创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf,Seconds(5))
ssc.sparkContext.setLogLevel("info")
val ds = ssc.receiverStream(new MyReceiver)
ds.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
}
- 测试
![$09[SparkStreaming流式处理] - 图11](/uploads/projects/liuye-6lcqc@gws1uf/bb978343862012eb9799e7cc6636c7e0.png)
![$09[SparkStreaming流式处理] - 图12](/uploads/projects/liuye-6lcqc@gws1uf/88a9a1c7073a4c9944bf3cda2dd76f16.png)
3.kafka数据源
- 版本选型
![$09[SparkStreaming流式处理] - 图13](/uploads/projects/liuye-6lcqc@gws1uf/8fbd5fd4f3f793c50363b06b6a498cc5.png)
![$09[SparkStreaming流式处理] - 图14](/uploads/projects/liuye-6lcqc@gws1uf/d7c770d0ece0ec2240ddde8397031c06.png)
注意:目前spark3.0版本只有Direct模式
总结:不同版本的offset存储位置
- 0-8 ReceiverAPI offset默认存储在:Zookeeper中
- 0-8 DirectAPI offset默认存储在:CheckPoint
- 手动维护:MySQL等有事务的存储系统
- 0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题
- 手动维护:MySQL等有事务存储系统
- 代码编写
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印在控制台
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
kafkaSource.scala
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $04_KafkaSource {
def main(args: Array[String]): Unit = {
//创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf,Seconds(5))
ssc.sparkContext.setLogLevel("error")
//读取数据
//设置读取的topic
val arr = Array[String]("spark")
//设置消费者参数
val props = Map[String,Object](
//指定k,v的反序列化器
"key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
//指定kafka broker地址
"bootstrap.servers"->"hadoop102:9092,hadoop103:9092",
//指定消费者组id
"group.id"->"spark1",
//指定消费者组第一次消费topic数据的时候从哪个位置开始消费
"auto.offset.reset"->"earliest",
//是否自动提交offset
"enable.auto.commit"->"true"
)
/**
* 读取kafka数据创建DStream
*/
val ds = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](arr,props))
//处理数据
ds.foreachRDD(rdd=>{
val messageRdd = rdd.map(_.value())
//sparkStreaming读取Kafka数据的时候,rdd分区数=topic分区数
println(s"rdd分区数=${rdd.getNumPartitions}")
val result = messageRdd.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect()
println(result.toList)
})
//启动,阻塞
ssc.start()
ssc.awaitTermination()
}
}
- 测试
- 分别启动Zookeeperhe 和 Kafka集群
![$09[SparkStreaming流式处理] - 图15](/uploads/projects/liuye-6lcqc@gws1uf/14ac42d9ad190602b49da4885b20cf02.png)
- 创建topic
kafka-topics.sh --create --topic spark --bootstrap-server hadoop102:9092 --partitions 2
- 启动生产者
kafka-console-producer.sh --broker-list hadoop102:9092 --topic spark
- 启动消费者测试
kafka-console-consumer.sh --topic spark --bootstrap-server hadoop102:9092
- 运行程序
![$09[SparkStreaming流式处理] - 图16](/uploads/projects/liuye-6lcqc@gws1uf/7a8633692e256e1f1d31a5d5e576049a.png)
第四章.DStream转换
DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如updateStateByKey(),transform()以及各种window相关的原语
1.无状态转换操作
无状态转换操作,就是把RDD转换操作应用到DStream每个批次上,每个批次相互独立,自己算自己的
一.常规无状态转换操作
DStream的部分无状态转化操作列在了下表中,都是DStream自己的API
注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在scala中使用,比如reduceByKey()
| 函数名称 | 目的 | Scala示例 | 函数签名 |
|---|---|---|---|
| map() | 对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream | ds.map(x=>x + 1) | f: (T) -> U |
| flatMap() | 对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。 | ds.flatMap(x => x.split(“ “)) | f: T -> Iterable[U] |
| filter() | 返回由给定DStream中通过筛选的元素组成的DStream | ds.filter(x => x != 1) | f: T -> Boolean |
| repartition() | 改变DStream的分区数 | ds.repartition(10) | N / A |
| reduceByKey() | 将每个批次中键相同的记录规约。 | ds.reduceByKey( (x, y) => x + y) | f: T, T -> T |
| groupByKey() | 将每个批次中的记录根据键分组。 | ds.groupByKey() | N / A |
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的
二.Transform
需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.junit.Test
class $05_NoStatus {
//创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf,Seconds(5))
ssc.sparkContext.setLogLevel("error")
@Test
def transform():Unit={
val ds = ssc.socketTextStream("hadoop102",9999)
val ds2 = ds.transform(rdd=>{
rdd.flatMap(_.split(" "))
})
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
2.有状态转化操作
有状态转化操作,计算当前批次RDD时,需要用到历史RDD的数据
一.UpdateStateByKey
- updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。
- updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。
- 注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
- checkpoint小文件过多
- checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $06_UpdateStateByKey {
/**
* updateStateByKey:统计全局结果
*/
def main(args: Array[String]): Unit = {
//创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
//设置状态的保存路径
ssc.checkpoint("checkpoint")
val ds = ssc.socketTextStream("hadoop102",9999)
val ds2 = ds.flatMap(_.split(" ")).map((_,1))
println(ds2)
val func=(currentBathValues:Seq[Int],state:Option[Int])=>{
/**
* currentBathValues:当前批次key对应的所有value值
* state:之前批次key对应的结果
*/
//获取之前批次统计的value值
val total = state.getOrElse(0) + currentBathValues.sum
Some(total)
}
val ds3 = ds2.updateStateByKey(func)
ds3.print()
ssc.start()
ssc.awaitTermination()
}
}
启动程序并向9999端口发送数据
![$09[SparkStreaming流式处理] - 图17](/uploads/projects/liuye-6lcqc@gws1uf/ea4fe7638fb265abef16eee75c4246e5.png)
![$09[SparkStreaming流式处理] - 图18](/uploads/projects/liuye-6lcqc@gws1uf/86cf44b5aec922c275682853115e9feb.png)
二.Window
window operation可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态,所有基于窗口的操作都需要这两个参数,分别为窗口时长以及滑动步长
- 窗口时长:计算内容的时间范围
- 滑动步长:隔多久触发一次计算
package com.atguigu.spark.day09
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $07_Window {
def main(args: Array[String]): Unit = {
//创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf,Seconds(5))
ssc.sparkContext.setLogLevel("error")
ssc.checkpoint("checkpoint")
val ds = ssc.socketTextStream("hadoop102",9999)
val ds2 = ds.flatMap(_.split(" "))
.map(x=>("---->"+x,1))
ds2.print()
//val ds3 = ds.flatMap(_.split(" ")).map((_,1)).window(Seconds(20),Seconds(10))
//val ds4 = ds3.reduceByKey(_+_)
//ds4.print()
ds.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((agg,curr)=>{
//之前批次结果+滑入批次的结果数据
println(s"第一个函数: ${agg} ${curr}")
agg+curr
},(agg,curr)=>{
//此函数是之前批次的结果-滑窗批次的结果数据
println(s"第二个函数: ${agg} ${curr}")
agg-curr
},windowDuration = Seconds(20),Seconds(5)).print()
ssc.start()
ssc.awaitTermination()
}
}
第五章.DStream输出
DStream通常将数据输出到,外部数据库或屏幕上。
foreachRDD(func):这是最通用的输出操作,即将函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。
在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。
package com.atguigu.spark.day09
import com.mysql.jdbc.Connection
import java.sql.{DriverManager, PreparedStatement}
object $08_Output {
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
//1、创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val ds = ssc.socketTextStream("hadoop102",9999)
ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreachRDD(rdd=>{
//以下代码是在Driver执行的
rdd.foreachPartition(it=>{
//jdbc
var connection:Connection = null
var statement:PreparedStatement = null
try{
connection = DriverManager.getConnection("....")
statement = connection.prepareStatement("insert into .. values(?,?)")
var i = 0
it.foreach(x => {
statement.setString(1,x._1)
statement.setInt(2,x._2)
statement.addBatch()
if(i%1000==0){
//提交一个批次数据
statement.executeBatch()
statement.clearBatch()
}
i = i +1
})
//提交最后一个不满1000条的批次数据
statement.executeBatch()
}catch {
case e:Exception => e.printStackTrace()
}finally {
if(statement!=null)
statement.close()
if(connection!=null)
connection.close()
}
})
})
ssc.start()
ssc.awaitTermination()
}
第六章.优雅的关闭
流式任务需要7*24小时执行,所以有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了 关闭方式:使用外部文件系统来控制内部程序关闭
package com.atguigu.spark.day09
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
object $09_StreamingClose {
def main(args: Array[String]): Unit = {
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
//1、创建StreamingContext,设置一个批次的时间
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
ssc.socketTextStream("hadoop102",9999).flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
//监控HDFS某个目录是否存在
val fs = FileSystem.get(new URI("hdsf://hadoop102://9820"),new Configuration())
//如果目录存在则继续执行,不存在则退出
while(fs.exists(new Path("hdfs://hadoop102:9820/xx"))){
Thread.sleep(2000)
}
//stopGracefully:如果设置为true,则等到所有接收的数据全部处理完成才会停止
ssc.stop(true,true)
ssc.awaitTermination()
}
}
