创建
主要是通过kafka创建
direct模式
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}object Spark01_DirectAPI010 {def main(args: Array[String]): Unit = {//1.创建SparkConfval conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")//2.创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//3.构建Kafka参数val kafkaParmas: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//4.消费Kafka数据创建流val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))//5.计算WordCount并打印kafkaDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//6.启动任务ssc.start()ssc.awaitTermination()}}
转换
无状态
无状态转化操作就是把简单的RDD转化操作应用到单个批次上,例如reduceByKey只会汇总单个批次的数据。 又分为DStream的API 、非DStream的API
DStream的API:
DStream的API :val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//转换为RDD操作val wordAndCountDStream: DStream[(String, Int)] = lineDStream.map。。。。flatMapfilterrepartitioinreduceByKey:聚合小批次内 仅仅groupByKey
非DStream的API:Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来.
- 使用transform函数只有一定要返回一个新的DStream。
```scala
object Spark06_Nostate_Transform {
def main(args: Array[String]): Unit = {
//创建SparkConf
val conf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“WordCount”)
//创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//创建DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream(“hadoop202”, 9999)
//转换为RDD操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {
//打印 wordAndCountDStream.print //启动 ssc.start() ssc.awaitTermination() }val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})
<a name="WkGym"></a>#### 有状态- updateStateByKey需要对检查点目录进行配置```scalaimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object WorldCount {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("hdfs://hadoop102:9000/streamCheck")val lines = ssc.socketTextStream("hadoop102", 9999)val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))// 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val stateDstream = pairs.updateStateByKey[Int](updateFunc)stateDstream.print()//或者直接写//val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey(// (seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))})ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate}}
windows操作: 窗口时长 和 滑动步长为采集周期的整倍数 ```scala object Spark08State_window { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“WordCount”) val ssc = new StreamingContext(sparkConf, Seconds(3)) //设置检查点路径 用于保存状态 ssc.checkpoint(“D:\dev\workspace\my-bak\spark-bak\cp”) //创建DStream val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream(“hadoop202”, 9999) val flatMapDS: DStream[String] = lineDStream.flatMap(.split(“ “))
//设置窗口大小,滑动的步长 val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3)) //结构转换 val mapDS: DStream[(String, Int)] = windowDS.map((,1)) //聚合 val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(+_) reduceDS.print() //启动 ssc.start() ssc.awaitTermination() } }
```scala
(1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream
(2)countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素。
(3)reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。
(6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。
DF&SQL
- 要想用DF操作流,必须创建SQLContext(SparkSession里包含SQLContext)
加入隐式转换
object Spark01_WordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Spark01_W").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(3)) val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202",9999) val flatMapDS: DStream[String] = socketDS.flatMap(_.split(" ")) val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1)) // val spark = SparkSession.builder.config(conf).getOrCreate() import spark.implicits._ mapDS.foreachRDD(rdd =>{ val df: DataFrame = rdd.toDF("word", "count") df.createOrReplaceTempView("words") spark.sql("select * from words").show }) //val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_) //reduceDS.print() //启动采集器 ssc.start() //等待采集结束,终止上下文环境对象 ssc.awaitTermination() } }行动算子 action
print()
在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。
- saveAsTextFiles(prefix, [suffix])
以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
- saveAsObjectFiles(prefix, [suffix])
以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”. Python中目前不可用。
- saveAsHadoopFiles(prefix, [suffix]
将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
- foreachRDD(func) 这里的RDD是指每一个批次,因为一个批次一个RDD
这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中,但是在使用的时候需要注意以下几点
连接不能写在driver层面(序列化); 让task创建链接
里面写foreachPartition,在分区创建(获取)。
// 下面的代码就是 foreachRDD 里套foreachPartition
filterDStream.foreachRDD {
rdd => { //rdd里面 foreachpartition外面的代码,创建的对象是在driver端
rdd.foreachPartition {
jsonObjIter => {
val dauList: List[(String, DauInfo)] = jsonObjIter.map {
jsonObj => {
val commonJsonObj: JSONObject = jsonObj.getJSONObject("common")
val dauInfo: DauInfo = DauInfo(
commonJsonObj.getString("mid"),
commonJsonObj.getString("uid"),
commonJsonObj.getString("ar"),
commonJsonObj.getString("ch"),
commonJsonObj.getString("vc"),
commonJsonObj.getString("dt"),
commonJsonObj.getString("hr"),
"00",
commonJsonObj.getLong("ts")
)
(dauInfo.mid, dauInfo)
}
}.toList
//调用ES工具类,将dauList中的数据批量保存到ES中
val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date)
//向ES中批量插入数据
MyESUtil.bulkInsert(dauList, "gmall0421_dau_info_" + dt)
}
}
//保存offset
OffsetManageUtil.saveOffset(topic, groupId, offsetRanges)
}
}
