Kafka依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.10.1</version> </dependency>
Kafka数据源
package com.streamingimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}object Spark_Kafka { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG -> "hao", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("hao"), kafkaPara) ) kafkaDataDS.map(_.value()).print() ssc.start() ssc.awaitTermination() }}