一、kafka集成SparkStreaming的方法
1、基于Receiver的方式
- 最早产生的
- 实现逻辑:把数据在kafka中读取出来,然后缓存在内存,再定时处理。
- 流程图
- 优点:
- 因为使用的是kafka的API,不需要关心offset等附加信息,完全由zookeeper来管理
- 比较简单,当对数据处理要求不严格时可以使用。
缺点:
为解决Receiver存储浪费效率低等问题,spark1.3推出了Direct。
- 使用Kafka consumer直接消费,不再需要缓存。kafka的一个分区会直接对应rdd的分区。
具体流程
- 实例化kafka的Cluster,根据用户配置的参数连接到集群。
- 通过kafka API读取Topic中最后一个的Offset。
- 成功接收,转化为RDD,供后续计算。
- 流程图
- 优点:
- 存储效率更高:不需要Receiver中的日志策略再存储一份了。
- 简化并行设计:Kafka中的分区和spark中的分区一一对应。
- 降低内存使用量:因为之前的receiver也占内存。
- 计算效率提高:不需要receiver后,降低了内存消费,更多的用于计算。
- 对数据处理、性能要求较高时采用这种策略。
- 问题:
idea构建maven项目
- 加入spark-core、spark-sql、kafka等依赖
- 加入spark-streaming-kafka依赖
- 注意添加作用域provided
- 编码实现
2、代码实现(wordcount)
```scala package com.tl.job015.streamingwithkafka
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.Seconds
/**
streaming集成kafka / object SparkStreamingReadKafka4Direct { def main(args: Array[String]): Unit = { /*
- 1、构建ssc
- 2、设置checkpoint
- 3、构造direct stream对象
- 4、针对DStream的算子操作
5、环境变量操作 */
// 1、构造ssc对象 val parasArray = ArrayString val Array(brokers, topics, groupId, maxPoll) = parasArray
val sparkConf = new SparkConf().setAppName("KafkaDirect4Job011")
//可以代码设置运行模式,也可以在spark-submit当中设置
//sparkConf.setMaster(master)
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
//2、设置offset的存储目录,此目录一般为hdfs目录
ssc.checkpoint("./kafka_direct")
//3、构造direct stream对象
val topicsSet = topics.split(",").toSet
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//4、针对DStream的算子操作
val result: DStream[(String, Int)] = messages.map(_.value).flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
// result.print
result.foreachRDD(x => {
x.foreachPartition(part => {
part.foreach(print)
})
})
//5、环境变量操作
ssc.start()
ssc.awaitTermination()
3、线上测试
- 借助之前的kafkaProducerUtil工具类,实时的向指定topic发送数据
- 启动SparkStreamingReadKafka4Direct监听kafka队列,并输出计算结果。
- 也可以在spark WebUI中执行
- 使用submit脚本执行