一、非kerberos环境下程序开发

1、测试环境

1.1、组件版本

  • cdh5.16.2
  • cm5.16.2
  • spark2.1.0
  • kafka3.0.0
  • hbase1.2.0

    1.2、前置条件

  1. Intellij IDEA开发工具
  2. 未启用kerberos

2、环境准备

2.1、IDEA的Scala环境

  1. 下载scala的sdk

image.png
image.png

  1. 创建maven项目,导入Scala-sdk

image.png

  1. 选择刚刚下载的scala,点击应用。如下:

image.png

3、Spark应用开发

3.1、SparkWordCount

  1. 导入pom依赖 ```xml 2.1.0 2.6.0
org.apache.spark spark-core_2.11 2.1.0 org.apache.hadoop hadoop-client 2.6.0 org.apache.hadoop hadoop-common 2.6.0

  1. 2. 创建Spark2_WordCountScala
  2. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609057513626-7d00fc7c-b63b-4bfb-a1e4-ac32276392d1.png#align=left&display=inline&height=145&margin=%5Bobject%20Object%5D&name=image.png&originHeight=212&originWidth=1089&size=23507&status=done&style=none&width=746)
  3. 3. 编写SparkWordCount程序
  4. ```java
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import org.apache.spark.rdd.RDD
  7. object Spark_WordCount {
  8. def main(args: Array[String]): Unit = {
  9. //System.setProperty("HADOOP_USER_NAME","hdfs")
  10. val sparkConf = new SparkConf().setAppName("WordCount")//.setMaster("local[*]")
  11. val sc = new SparkContext(sparkConf)
  12. val rdd:RDD[String] = sc.textFile(args(0)) // "datas/test.txt"
  13. val flatRDD = rdd.flatMap(_.split(" "))
  14. val mapRDD = flatRDD.map((_,1))
  15. val groupRDD = mapRDD.groupByKey()
  16. val datasRDD = groupRDD.mapValues(_.size)
  17. datasRDD.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }
  1. 使用maven自带plugins进行打包

image.png

  1. jar包导入linux中,准备需要count的文件,提交任务

    1. spark2-submit --class Spark_WordCount --master local runPackage/sparkstreaming-1.0-SNAPSHOT.jar file:///package/testText/test.txt

    image.png

  2. 执行结果如下:

image.png

  1. 提示如下错误,class文件编译版本为jdk1.8,cdh环境jdk版本为1.7。,升级集群jdk版本或者在IDEA中以1.7版本编译WordCount

image.png

  • spark可以读取本地数据文件,但是需要在所有的节点都有这个数据文件,文件格式为file://+本机路径

3.2、非Kerberos环境下Spark2Streaming拉取kafka2数据写入HBase

3.2.1、前置准备

  1. 准备模拟数据,编写java代码模拟kafka生产者向消费者发送数据 ```xml package test2;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig;

import java.io.*; import java.util.Properties;

public class MyProducer {

  1. public static void main(String[] args) {
  2. // TODO 1.创建Kafka生产者的配置信息
  3. Properties properties = new Properties();
  4. // TODO 2.指定连接的kafka 集群,broker-list
  5. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.222:9092");
  6. // TODO 3.ACK应答级别
  7. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  8. // TODO 4.重复次数
  9. properties.put("retries", 1);
  10. // TODO 5.批次大小
  11. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16KB
  12. // TODO 6.等待时间
  13. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  14. // TODO 7.RecordAccumulator 缓冲区大小(32MB)
  15. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  16. // TODO 8.Key,Value的序列化类
  17. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  18. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  19. // TODO 9.创建生产者对象
  20. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  21. // TODO 10.发送数
  22. // 创建流对象
  23. BufferedReader br = null;
  24. try {
  25. br = new BufferedReader(new FileReader("datas/userInfo.txt"));
  26. // 定义字符串,保存读取的一行文字
  27. String line = null;
  28. // 循环读取,读取到最后返回null
  29. while ((line = br.readLine()) != null) {
  30. System.out.println(line);
  31. //producer.send(new ProducerRecord<String, String>("kafka2_spark2", "Hello World" + line));
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }finally {
  36. // 释放资源
  37. if (br != null) {
  38. try {
  39. br.close();
  40. } catch (IOException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. // TODO 11.关闭资源
  46. producer.close();
  47. }

}

  1. 2. 创建hbase表-->user_info 测试表
  2. ```shell
  3. hbase shell
  4. ---------------------
  5. create 'user_info','info'
  1. CDH可视化界面查看创建成功

image.png

3.2.2、程序开发

  1. 使用maven创建scala语言的spark工程,pom.xml依赖如下 ```xml 2.1.0 2.6.0
org.apache.hbase hbase-client 1.2.0 org.apache.hadoop hadoop-client 2.6.0 org.apache.hbase hbase-client 1.2.0 org.apache.spark spark-core_2.11 2.1.0 org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.0 org.apache.spark spark-streaming_2.11 2.1.0 2. 在conf下创建hbase_conf.properties配置文件,内容如下: kafka.brokers=cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 kafka.topics=kafka2_spark2 zookeeper.list=192.168.0.200 zookeeper.port=2181 kafka.group.id=testgroup ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609425497082-a27bc6dc-4f46-43c4-a0c9-ad4373c05c35.png#align=left&display=inline&height=185&margin=%5Bobject%20Object%5D&name=image.png&originHeight=320&originWidth=1289&size=51029&status=done&style=none&width=746) 3. 创建SparkStreamingFromKafkaToHBase_NotKerberos.scala文件,内容如下:scala import java.io.{File, FileInputStream} import java.util.Properties import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import scala.util.Try object SparkStreamingFromKafkaToHBase_NotKerberos { // 非kerberos环境下 def getHBaseConn(zkList: String, port: String): Connection = { val conf = HBaseConfiguration.create() conf.set(“hbase.zookeeper.quorum”, zkList) //设置zookeeper为所有zookeeper conf.set(“hbase.zookeeper.property.clientPort”, port) val connection = ConnectionFactory.createConnection(conf) connection } def main(args: Array[String]): Unit = { // 加载配置文件 val properties = new Properties() var confPath: String = System.getProperty(“user.dir”) + File.separator + “conf”+File.separator val hbaseConfPath = new FileInputStream(new File(confPath+”hbase_conf.properties”)) if (hbaseConfPath == null) { println(“找不到conf.properties配置文件”) } else { properties.load(hbaseConfPath) println(“已加载配置文件”) } // 读取配置文件 val brokers = properties.getProperty(“kafka.brokers”) val topics = properties.getProperty(“kafka.topics”) val zkHost = properties.getProperty(“zookeeper.list”) val zkport = properties.getProperty(“zookeeper.port”) val groupId = properties.getProperty(“kafka.group.id”) // 流式数据环境 val sparkConf = new SparkConf().setAppName(“Kafka2Spark2HBase”)//.setMaster(“local”) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) //设置Spark时间窗口,每5s处理一次 // kafka配置 val kafkaPara: Map[String, Object] = MapString, Object // 创建连接流式数据采集 val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStreamString, String, kafkaPara) ) dStream.foreachRDD( rdd => { // 以分区为单位进行遍历 rdd.foreachPartition( ps => { // 获取Hbase连接 val connection = getHBaseConn(zkHost, zkport) // 遍历每个分区的数据 ps.foreach( line => { // 得到数据 val datas = line.value().split(“ “) val id = datas(0) val name = datas(1) val age = datas(2) // 获取表 val tableName = TableName.valueOf(“user_info”) val table = connection.getTable(tableName) // 封装数据 val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(name)) put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”), Bytes.toBytes(age)) // 写入数据 Try(table.put(put)).getOrElse(table.close()) //将数据写入HBase,若出错关闭table // 关闭表 table.close() }) // 关闭连接 connection.close() }) }) ssc.start() ssc.awaitTermination() } } <a name="UB2H7"></a> #### 3.2.3、程序测试 1. 使用maven工具打jar包后上传至hadoop环境 ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609127217808-13ea223b-7e10-4071-bcd2-bd9a35c87518.png#align=left&display=inline&height=458&margin=%5Bobject%20Object%5D&name=image.png&originHeight=915&originWidth=1872&size=248641&status=done&style=none&width=936) 2. 执行提交xml spark2-submit —class SparkStreamingFromKafkaToHBase_NotKerberos —master local runPackage/sparkstreaming-1.0-SNAPSHOT.jar 3. 结果如下: ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131914896-80e25aa7-9e85-4da2-94a6-5bea30ac23bf.png#align=left&display=inline&height=379&margin=%5Bobject%20Object%5D&name=image.png&originHeight=732&originWidth=1439&size=224342&status=done&style=none&width=746) 4. 准备需要发送的数据,运行kafka生产者模拟向kafka2_spark2队列发送消息 ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609126278907-f9df909a-2825-49eb-a197-fa8dfb1c824a.png#align=left&display=inline&height=447&margin=%5Bobject%20Object%5D&name=image.png&originHeight=895&originWidth=1765&size=238772&status=done&style=none&width=882.5) 5. 成功发送消息后在linux运行hbase shell,执行scan 'user_info' 查看消息成功发送到hbase ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609126392987-df4ea48b-25c8-462d-83cd-379666756cba.png#align=left&display=inline&height=236&margin=%5Bobject%20Object%5D&name=image.png&originHeight=393&originWidth=1243&size=67921&status=done&style=none&width=746) <a name="5G0qm"></a> #### 3.2.4、问题: 1. 提示找不到部分类 原因:因为打jar包的时候没有把依赖包打进去<br />解决方案:把构建项目所需要的jar包拷贝到/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131461013-70ee3fa5-9bd1-4dac-8a73-2e1d4314969a.png#align=left&display=inline&height=195&margin=%5Bobject%20Object%5D&name=image.png&originHeight=209&originWidth=798&size=22251&status=done&style=none&width=746)<br />**注意:所有节点都要拷贝** 2. 提示kafka客户带版本过低截图如下: ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131101072-8b75b0f4-fcfc-4939-bd77-21985d0f6d7b.png#align=left&display=inline&height=366&margin=%5Bobject%20Object%5D&name=image.png&originHeight=732&originWidth=1436&size=137029&status=done&style=none&width=718)<br />解决方案:登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10。重启spark<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131213991-89a2e104-3be2-43b6-9db0-250769acf313.png#align=left&display=inline&height=419&margin=%5Bobject%20Object%5D&name=image.png&originHeight=697&originWidth=1241&size=73795&status=done&style=none&width=746) 3. 提示NoClassDefFoundError: org/apache/htrace/Trace - 对于用maven构建hadoop开发项目的时候,在加入了hadoop的依赖项之后,会出现无法更新htrace-core的现象,所以要自己单独拷贝。 ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131313618-066558d7-787b-47fe-9bc2-42187be8fc3a.png#align=left&display=inline&height=366&margin=%5Bobject%20Object%5D&name=image.png&originHeight=732&originWidth=1436&size=185352&status=done&style=none&width=718)<br />解决方案:拷贝如下htrace-core的jar包到每一个节点。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131976880-fb2498fa-6959-4066-94c3-c864e0f8545b.png#align=left&display=inline&height=223&margin=%5Bobject%20Object%5D&name=image.png&originHeight=223&originWidth=685&size=24770&status=done&style=none&width=685) <a name="BjBBv"></a> ### 3.3、非Kerberos环境下Spark2Streaming拉取Kafka2数据写入Hive <a name="DWVB7"></a> #### 3.3.1、前置准备 1. 准备模拟数据,编写java代码模拟kafka生产者向消费者发送数据xml package test2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.io.*; import java.util.Properties; public class MyProducer { public static void main(String[] args) { // TODO 1.创建Kafka生产者的配置信息 Properties properties = new Properties(); // TODO 2.指定连接的kafka 集群,broker-list properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.0.222:9092”); // TODO 3.ACK应答级别 properties.put(ProducerConfig.ACKS_CONFIG, “all”); // TODO 4.重复次数 properties.put(“retries”, 1); // TODO 5.批次大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16KB // TODO 6.等待时间 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // TODO 7.RecordAccumulator 缓冲区大小(32MB) properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // TODO 8.Key,Value的序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); // TODO 9.创建生产者对象 KafkaProducer producer = new KafkaProducer(properties); // TODO 10.发送数 // 创建流对象 BufferedReader br = null; try { br = new BufferedReader(new FileReader(“datas/userInfo.txt”)); // 定义字符串,保存读取的一行文字 String line = null; // 循环读取,读取到最后返回null while ((line = br.readLine()) != null) { System.out.println(line); //producer.send(new ProducerRecord(“kafka2_spark2”, “Hello World” + line)); } } catch (Exception e) { e.printStackTrace(); }finally { // 释放资源 if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } // TODO 11.关闭资源 producer.close(); } } 2. 登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10。重启spark ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609131213991-89a2e104-3be2-43b6-9db0-250769acf313.png#align=left&display=inline&height=419&margin=%5Bobject%20Object%5D&name=image.png&originHeight=697&originWidth=1241&size=73795&status=done&style=none&width=746) 3. 创建测试表sql CREATE table kafka2_spark2_hive2(id String, name String, age String); ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609135801518-12ccce0d-502b-4617-8de9-1a0a7a235c65.png#align=left&display=inline&height=124&margin=%5Bobject%20Object%5D&name=image.png&originHeight=179&originWidth=1073&size=19694&status=done&style=none&width=746) <a name="ITRVr"></a> #### 3.3.2、程序开发 1. 使用maven创建scala语言的spark工程,pom.xml依赖如下xml 1.6.0 2.6.0 org.apache.spark spark-core_2.11 2.1.0 org.apache.spark spark-streaming_2.11 2.1.0 org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.0 org.apache.spark spark-sql_2.11 2.1.0 org.apache.spark spark-hive_2.11 2.1.0

  1. 2. resources下创建conf.properties配置文件,内容如下:
  2. ```xml
  3. kafka.brokers=cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092
  4. kafka.topics=kafka2_spark2
  5. zookeeper.list=cdh1.macro.com:2181
  6. zookeeper.port=2181
  1. 编写Spakr程序 ```scala import java.io.{File, FileInputStream} import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingFromKafkaToHive_NotKerberos {

def main(args: Array[String]): Unit = {

  1. // StreamingContext创建
  2. val sparkConf = new SparkConf().setAppName("SparkStreaming")
  3. val ssc = new StreamingContext(sparkConf,Seconds(3))
  4. // SparkSQL
  5. val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
  6. // 加载配置文件
  7. val properties = new Properties()
  8. var confPath: String = System.getProperty("user.dir") + File.separator + "conf"+File.separator
  9. val hiveConfPath = new FileInputStream(new File(confPath+"hive_conf.properties"))
  10. if (hiveConfPath == null) {
  11. println("找不到hive_conf.properties配置文件")
  12. } else {
  13. properties.load(hiveConfPath)
  14. println("已加载配置文件")
  15. }
  16. // 读取配置文件
  17. val brokers = properties.getProperty("kafka.brokers")
  18. val topics = properties.getProperty("kafka.topics")
  19. val tableName = properties.getProperty("hive.table.name")
  20. val groupId = properties.getProperty("kafka.group.id")
  21. // kafka配置
  22. val kafkaPara: Map[String, Object] = Map[String, Object](
  23. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  24. ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  25. "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  26. "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
  27. )
  28. val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  29. ssc,
  30. LocationStrategies.PreferConsistent, //描述采集的节点跟计算的节点如何匹配
  31. ConsumerStrategies.Subscribe[String, String](Set(topics), kafkaPara)
  32. )
  33. // 隐式引入
  34. import spark.implicits._
  35. kafkaDataDS.foreachRDD(
  36. rdd => {
  37. val unit: RDD[UserInfo] = rdd.map(
  38. line => {
  39. val a = line.value()
  40. val data = a.split(" ")
  41. val id = data(0)
  42. val name = data(1)
  43. val age = data(2)
  44. new UserInfo(id,name, age)
  45. }
  46. )
  47. val userinfoDF = spark.sqlContext.createDataFrame(unit)
  48. userinfoDF.write.mode(SaveMode.Append).insertInto(tableName)
  49. }
  50. )
  51. // 开始运行
  52. ssc.start()
  53. // 等待Streaming运行结束关闭资源
  54. ssc.awaitTermination()

} case class UserInfo(id:String,name:String, age:String)

}

  1. <a name="HTQtb"></a>
  2. #### 3.3.3、程序测试
  3. 1. 使用maven工具打jar包后上传至hadoop环境
  4. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609137760481-b2e2eb81-8f11-4c79-a2b6-338cf793d931.png#align=left&display=inline&height=114&margin=%5Bobject%20Object%5D&name=image.png&originHeight=114&originWidth=709&size=18034&status=done&style=none&width=709)
  5. 2. 使用spark2-submit提交任务
  6. ```sql
  7. spark2-submit --class SparkStreamingFromKafka2ToHive2_NotKerberos --master local --jars sparkStreamingFromKafkaToHive-1.0.0.jar

注意:每个节点都需要有如下包
image.png

  1. java程序模拟kafka生产者发送消息

image.png

  1. 查询kafka2_spark2_hive2表,消息成功发送

image.png

3.5、遇到的问题:

  1. 提示找不到视图或表(找不到元数据)

image.png
解决方案:
在hadoop环境中拷贝hive-site.xml文件到项目中的resource文件夹下(不知道路径用:find / -name hive-site.xml 搜索)
image.png
image.png

  1. 提示实例化错误,截图如下:

image.png
原因:window下需要安装hadoop环境
解决:
1、下载集群对应linux的hadoop版本 <—超链接
2、下载对应的window下的hadoop需要的bin目录 <—超链接
3、解压Linux,把bin目录替换为winutils的bin目录
4、在代码中引用这个地址

  1. System.setProperty("hadoop.home.dir", "E:\\Application\\hadoop-2.6.0")

image.png

  1. java.lang.IllegalArgumentException: Missing application resource 找不到资源

image.png
原因:jar包路径出错
去掉 —jars再次执行成功

3.4、kerberos环境模拟kafka生产者发送消息到队列

3.4.1、程序开发

  • 采用keytab形式做安全认证
  1. 准备访问Kafka的Keytab文件,使用xst命令导出keytab文件

    1. kadmin.local
    2. xst -norandkey -k userA.keytab userA@MACRO.COM

    image.png

  2. 使用klist命令检查导出的keytab文件是否正确

    1. klist -ek userA.keytab

    image.png

  3. 导入keytab文件到java项目的resource文件下

image.png

  1. 准备producer_jaas.cof文件内容如下:

    1. KafkaClient{
    2. com.sun.security.auth.module.Krb5LoginModule required
    3. useKeyTab=true
    4. keyTab="E:\\JavaWorkSpace\\kafka\\conf\\userA.keytab"
    5. storeKey=true
    6. useTicketCache=false
    7. principal="userA@MACRO.COM"
    8. serviceName=kafka;
    9. };

    image.png

  2. 导入linux的/etc目录下的brk5.conf文件到javaresource目录

image.png

  1. 准备配置文件,放在项目根目录下的conf目录下

    1. security.protocol=SASL_PLAINTEXT
    2. sasl.mechanism=GSSAPI
    3. sasl.kerberos.service.name=kafka
    4. bootstrap.servers=cdh1.macro.com:9092
    5. acks=all
    6. retries=1
    7. batch.size=16384
    8. linger.ms=1
    9. buffer.memory=33554432
    10. key.serializer=org.apache.kafka.common.serialization.StringSerializer
    11. value.serializer=org.apache.kafka.common.serialization.StringSerializer
    12. topic=kafka2_spark2
  2. 编写生产者java代码用作发送消息 ```java import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*; import java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger;

public class MyProducer {

  1. public static void main(String[] args) {
  2. Logger.getLogger("org").setLevel(Level.INFO);
  3. // 获取当前路径
  4. String localPath = System.getProperty("user.dir") + File.separator;
  5. String krb5Path = localPath + "conf/krb5.conf";
  6. String jaasPath = localPath + "conf/jaas.conf";
  7. if (!new File(krb5Path).exists()) {
  8. System.out.println("未加载到krb5.conf配置文件,程序结束");
  9. System.exit(0);
  10. }
  11. if (!new File(jaasPath).exists()) {
  12. System.out.println("未加载到jaas.conf配置文件,程序结束");
  13. System.exit(0);
  14. }
  15. // kerberos必须添加krb5和jaas地址
  16. //在windows中设置JAAS,也可以通过-D方式传入
  17. System.setProperty("java.security.krb5.conf", krb5Path);
  18. System.setProperty("java.security.auth.login.config", jaasPath);
  19. System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
  20. // 创建Kafka生产者的配置信息
  21. Properties properties = new Properties();
  22. FileInputStream is = null;
  23. //InputStream ips = ClassLoader.getSystemClassLoader().getResourceAsStream("conf.properties");
  24. try {
  25. is = new FileInputStream(new File(localPath+"conf/conf.properties"));
  26. } catch (FileNotFoundException e) {
  27. e.printStackTrace();
  28. }
  29. try {
  30. properties.load(is);
  31. } catch (IOException e) {
  32. System.out.println("配置文件加载失败!");
  33. }finally {
  34. try {
  35. if (is != null) {
  36. is.close();
  37. }
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. // 创建生产者对象
  43. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  44. // 发送数据
  45. // 创建流对象
  46. BufferedReader br = null;
  47. try {
  48. br = new BufferedReader(new FileReader("datas/userInfo.txt"));//args[0] //"datas/userInfo.txt"
  49. // 定义字符串,保存读取的一行文字
  50. String line = null;
  51. // 循环读取,读取到最后返回null
  52. while ((line = br.readLine()) != null) {
  53. producer.send(new ProducerRecord<String, String>(properties.getProperty("topic"), line));
  54. }
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. } finally {
  58. // 释放资源
  59. if (br != null) {
  60. try {
  61. br.close();
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. // 关闭资源
  68. producer.close();
  69. }

}

  1. 7. 准备datas/userInfo.txt文件,
  2. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609308956427-4cf0ea2c-7ef0-4022-94e6-76e8faf0f53a.png#align=left&display=inline&height=248&margin=%5Bobject%20Object%5D&name=image.png&originHeight=325&originWidth=978&size=28988&status=done&style=none&width=746)
  3. 8. 测试数据发送成功
  4. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609308848373-f66c9b8a-7b5a-4271-9132-b8da107c9ceb.png#align=left&display=inline&height=570&margin=%5Bobject%20Object%5D&name=image.png&originHeight=984&originWidth=1288&size=208914&status=done&style=none&width=746)
  5. 9. 消费者端成功接收收据
  6. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609308901259-0c1aa6bc-da66-4666-9cb1-a1a13e68f055.png#align=left&display=inline&height=396&margin=%5Bobject%20Object%5D&name=image.png&originHeight=626&originWidth=1178&size=47777&status=done&style=none&width=746)
  7. <a name="ofW12"></a>
  8. ## 4.3、打成jar包编写脚本用做发送数据测试
  9. 1. 打包之前修改代码,手动指定文件地址
  10. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609309133147-56b39fef-cda0-4129-a348-2bb5340d27cc.png#align=left&display=inline&height=210&margin=%5Bobject%20Object%5D&name=image.png&originHeight=218&originWidth=773&size=24222&status=done&style=none&width=746)
  11. 2. maven自带工具打包
  12. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2680099/1609309233941-ec3d4401-117b-46a8-9471-b8afd526a527.png#align=left&display=inline&height=282&margin=%5Bobject%20Object%5D&name=image.png&originHeight=563&originWidth=1127&size=80282&status=done&style=none&width=563.5)
  13. 3. 上传jar包到linux环境,编写shell脚本,脚本如下
  14. ```shell
  15. #!/bin/bash
  16. JAVA_HOME=/usr/java/jdk1.8.0_144/
  17. read_file=$1
  18. for file in `ls lib/*jar`
  19. do
  20. CLASSPATH=$CLASSPATH:$file
  21. done
  22. export CLASSPATH
  23. ${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m MyProducer $read_file
  1. 新建目录kafka_kerberos_producer,拷贝idea下conf目录,新建lib目录,移动jar包到lib,目录结构如下

image.png

  • 运行会报错缺失部分类,移动对应目录到lib目录即可

682045777299c9cbd76aeb94bffc414.png

  1. 运行效果如下,test是我们要发送的文件

image.png

  1. consumer成功接收数据

image.png

5、Kerberos环境SparkStreaming拉取kafka数据写入hive

5.1、环境准备

  • 采用keytab形式做安全认证
  1. 准备访问Kafka的Keytab文件,使用xst命令导出keytab文件

    1. kadmin.local
    2. xst -norandkey -k hive.keytab hive@MACRO.COM

    image.png

  2. 使用klist命令检查导出的keytab文件是否正确

    1. klist -ek hive.keytab

    image.png

  3. 准备jaas.cof文件内容如下:

    1. KafkaClient {
    2. com.sun.security.auth.module.Krb5LoginModule required
    3. useKeyTab=true
    4. keyTab="/root/spark/conf/hive.keytab"
    5. principal="hive@MACRO.COM";
    6. };
    7. Client {
    8. com.sun.security.auth.module.Krb5LoginModule required
    9. useKeyTab=true
    10. storeKey=true
    11. keyTab="/root/spark/conf/hive.keytab"
    12. principal="hive@MACRO.COM";
    13. };
  4. 将hive.keytab和jaas.conf文件拷贝至集群的所有节点统一的/root/spark/conf目录下


  1. 准备刚刚编写的Kerberos环境发送数据的脚本
  2. 登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

    5.2、程序开发

  3. 使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

    1. <dependencies>
    2. <!-- Spark依赖关系 -->
    3. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    4. <dependency>
    5. <groupId>org.apache.spark</groupId>
    6. <artifactId>spark-core_2.11</artifactId>
    7. <version>2.1.0</version>
    8. </dependency>
    9. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    10. <dependency>
    11. <groupId>org.apache.spark</groupId>
    12. <artifactId>spark-streaming_2.11</artifactId>
    13. <version>2.1.0</version>
    14. </dependency>
    15. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    16. <dependency>
    17. <groupId>org.apache.spark</groupId>
    18. <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    19. <version>2.1.0</version>
    20. </dependency>
    21. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    22. <dependency>
    23. <groupId>org.apache.spark</groupId>
    24. <artifactId>spark-sql_2.11</artifactId>
    25. <version>2.1.0</version>
    26. </dependency>
    27. <dependency>
    28. <groupId>org.apache.spark</groupId>
    29. <artifactId>spark-hive_2.11</artifactId>
    30. <version>2.1.0</version>
    31. </dependency>
    32. </dependencies>
  4. 在recource下创建conf.properties配置文件,内容如下:

    1. kafka.brokers=cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092
    2. kafka.topics=kafka2_spark2
    3. group.id=testgroup
  5. 编写spark代码如下: ```scala import java.io.{File, FileInputStream} import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingFromKafkaToHive_Kerberos { var confPath: String = System.getProperty(“user.dir”) + File.separator + “conf” + File.separator

def main(args: Array[String]): Unit = { // StreamingContext创建 val sparkConf = new SparkConf().setAppName(“SparkStreaming”)//.setMaster(“local[*]”) val ssc = new StreamingContext(sparkConf, Seconds(3)) // SparkSQL val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

  1. // 加载配置文件
  2. val properties = new Properties()
  3. val in = new FileInputStream(new File(confPath + "conf.properties"))
  4. if (in == null) {
  5. println("找不到conf.properties配置文件")
  6. } else {
  7. properties.load(in)
  8. //println("已加载配置文件")
  9. }
  10. // 读取配置文件
  11. val brokers = properties.getProperty("kafka.brokers")
  12. val topics = properties.getProperty("kafka.topics")
  13. val groupId = properties.getProperty("group.id")
  14. val tableName = properties.getProperty("hive.tableName")
  15. // kafka配置
  16. val kafkaPara: Map[String, Object] = Map[String, Object](
  17. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  18. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  19. "group.id" -> groupId,
  20. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  21. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  22. "security.protocol" -> "SASL_PLAINTEXT",
  23. "sasl.kerberos.service.name" -> "kafka"
  24. )
  25. val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  26. ssc,
  27. LocationStrategies.PreferConsistent, //描述采集的节点跟计算的节点如何匹配
  28. ConsumerStrategies.Subscribe[String, String](Set(topics), kafkaPara)
  29. )
  30. // 隐式引入
  31. import spark.implicits._
  32. kafkaDataDS.foreachRDD(
  33. rdd => {
  34. val unit: RDD[UserInfo] = rdd.map(
  35. line => {
  36. val a = line.value()
  37. val data = a.split(" ")
  38. val name = data(0)
  39. val age = data(1)
  40. new UserInfo(name, age)
  41. }
  42. )
  43. val userinfoDF = spark.sqlContext.createDataFrame(unit)
  44. userinfoDF.write.mode(SaveMode.Append).insertInto(tableName)
  45. }
  46. )
  47. // 开始运行
  48. ssc.start()
  49. // 等待Streaming运行结束关闭资源
  50. ssc.awaitTermination()

}

case class UserInfo( name: String, age: String)

}

  1. <a name="lKR9Q"></a>
  2. ## 5.3、测试
  3. 1. 通过maven自带工具打jar包,上传至linux环境根目录下,运行jar包
  4. ```shell
  5. spark2-submit --class SparkStreamingFromKafkaToHive_Kerberos --master local --deploy-mode client --executor-memory 1g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal hive@MACRO.COM --keytab /spark/conf/hive.keytab --files "/spark/conf/jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/spark/conf/jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/spark/conf/jaas.conf" sparkstreaming-1.0-SNAPSHOT.jar
  • 注意:如果用yarn运行,/spark/conf下一定要有jaar.conf和hive.keytab等文件配置文件
  1. 运行结果如下

image.png

  1. 运行kafka的producer脚本向kafka2_spark2队列添加数据

image.png

  1. 数据成功添加

image.png

6、Kerberos环境SparkStreaming拉取kafka数据写入

HBase

6.1、测试环境

6.2、环境准备

6.3、程序开发

6.4、测试

6.5、问题
image.png

整合运行

  • 为了方便运行,所有功能打到一个包下面
  • 移动package包到根目录下
  1. sparkwordcount运行

    1. spark2-submit --class Spark_WordCount --master local runPackage/sparkstreaming-1.0-SNAPSHOT.jar file:///package/testText/test.txt

    image.png

  2. 非kerberos的spark拉去kafka数据向hive写数据

    1. spark2-submit --class SparkStreamingFromKafkaToHive_NotKerberos --master local runPackage/sparkstreaming-1.0-SNAPSHOT.jar
  3. 非kerberos下kafka模拟生产者发送数据

    1. sh run_producer.sh MyProducerNotKerberos testText/HiveAndHBaseTestText.txt

    image.png

  4. 非kerberos的spark拉去kafka数据向HBase写数据

    1. spark2-submit --class SparkStreamingFromKafkaToHBase_NotKerberos --master local runPackage/sparkstreaming-1.0-SNAPSHOT.jar

    image.png

  5. 非kerberos下kafka模拟生产者发送数据

    1. sh run_producer.sh MyProducerNotKerberos testText/HiveAndHBaseTestText.txt

    image.png

  6. kerberos下spark拉去kafka数据向Hive写数据