一、kafka集成SparkStreaming的方法

1、基于Receiver的方式

  • 最早产生的
  • 实现逻辑:把数据在kafka中读取出来,然后缓存在内存,再定时处理。
  • 流程图
  • Kafka之Kafka与SparkStreaming集成开发 - 图1
  • 优点:
    • 因为使用的是kafka的API,不需要关心offset等附加信息,完全由zookeeper来管理
    • 比较简单,当对数据处理要求不严格时可以使用。
  • 缺点:

    • 如果集群退出,而偏移量又没有处理好的话数据就丢失了。后来通过引入日志的方式来避免。相当于存储了两次数据,解决问题的同时也降低了效率,加重了Recever的负担。
    • 上述方式解决了数据丢失,也增加了重复消费的风险。
    • recevier也是executor的一部分,会占用一部分资源,降低了可用于计算的资源,造成浪费。
    • receiver增加了数据消费链路的一个executor中转环节,该环节中的executor会和计算executor相一致才能保证系统稳定,而这两个环节之间是异步的,存在如网络异常、计算压力大的情况下,中转积压和消费缓慢的情况,导致系统崩溃。

      2、基于Direct直接读取的方式

  • 为解决Receiver存储浪费效率低等问题,spark1.3推出了Direct。

  • 使用Kafka consumer直接消费,不再需要缓存。kafka的一个分区会直接对应rdd的分区。
  • 具体流程

    • 实例化kafka的Cluster,根据用户配置的参数连接到集群。
    • 通过kafka API读取Topic中最后一个的Offset。
    • 成功接收,转化为RDD,供后续计算。
    • 流程图
    • Kafka之Kafka与SparkStreaming集成开发 - 图2
    • 优点:
      • 存储效率更高:不需要Receiver中的日志策略再存储一份了。
      • 简化并行设计:Kafka中的分区和spark中的分区一一对应。
      • 降低内存使用量:因为之前的receiver也占内存。
      • 计算效率提高:不需要receiver后,降低了内存消费,更多的用于计算。
      • 对数据处理、性能要求较高时采用这种策略。
    • 问题:
      • offset在receiver时直接由zookeeper维护,而Direct需要checkpoint或者额外的第三方组件实现。提高了开发成本
      • 监控可视化:offset由zookeeper维护时,均可以通过监控zk信息来监控消费情况。而direct是自行维护,其消费监控也需要自行开发。

        二、基于Direct的代码实现

        1、环境搭建

  • idea构建maven项目

  • 加入spark-core、spark-sql、kafka等依赖
  • 加入spark-streaming-kafka依赖
  • 注意添加作用域provided
  • 编码实现

    2、代码实现(wordcount)

    ```scala package com.tl.job015.streamingwithkafka

import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.Seconds

/**

  • streaming集成kafka / object SparkStreamingReadKafka4Direct { def main(args: Array[String]): Unit = { /*

    • 1、构建ssc
    • 2、设置checkpoint
    • 3、构造direct stream对象
    • 4、针对DStream的算子操作
    • 5、环境变量操作 */

      // 1、构造ssc对象 val parasArray = ArrayString val Array(brokers, topics, groupId, maxPoll) = parasArray

  1. val sparkConf = new SparkConf().setAppName("KafkaDirect4Job011")
  2. //可以代码设置运行模式,也可以在spark-submit当中设置
  3. //sparkConf.setMaster(master)
  4. val sc = new SparkContext(sparkConf)
  5. sc.setLogLevel("WARN")
  6. val ssc = new StreamingContext(sc, Seconds(5))
  7. //2、设置offset的存储目录,此目录一般为hdfs目录
  8. ssc.checkpoint("./kafka_direct")
  9. //3、构造direct stream对象
  10. val topicsSet = topics.split(",").toSet
  11. val kafkaParams = Map(
  12. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  13. ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  14. ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
  15. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  16. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
  17. val messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
  18. ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
  19. //4、针对DStream的算子操作
  20. val result: DStream[(String, Int)] = messages.map(_.value).flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
  21. // result.print
  22. result.foreachRDD(x => {
  23. x.foreachPartition(part => {
  24. part.foreach(print)
  25. })
  26. })
  27. //5、环境变量操作
  28. ssc.start()
  29. ssc.awaitTermination()

} } ```

3、线上测试

  • 借助之前的kafkaProducerUtil工具类,实时的向指定topic发送数据
  • 启动SparkStreamingReadKafka4Direct监听kafka队列,并输出计算结果。
  • 也可以在spark WebUI中执行
  • 使用submit脚本执行