Spark Sql ON Hive

一、配置文件

1. spark-defaults.conf

  • 详见配置文件

2. hive 集成配置

  1. 1. hive-site.xml
  2. <configuration>
  3. <property>
  4. <name>hive.metastore.uris</name>
  5. <value>thrift://dw1:9083,thrift://dw2:9083</value>
  6. </property>
  7. <property>
  8. <name>hive.metastore.client.socket.timeout</name>
  9. <value>300</value>
  10. </property>
  11. <property>
  12. <name>hive.metastore.warehouse.dir</name>
  13. <value>/user/hive/warehouse</value>
  14. </property>
  15. </configuration>
  16. 2. spark-env.sh
  17. export HADOOP_HOME=/usr/lib/hadoop
  18. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
  19. export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native:$HADOOP_HOME/lib

二、SparkSQL 客户端模式部署

  1. -- 普通加载模式
  2. spark-sql \
  3. --master yarn \
  4. --deploy-mode client \
  5. --name spark-sql-service_1 \
  6. --driver-cores 2 \
  7. --driver-memory 4096M \
  8. --executor-cores 1 \
  9. --executor-memory 2048M \
  10. --num-executors 3 \
  11. --jars file:///etc/hive/auxlib/dw_hive_udf-1.0.jar,file:///etc/hive/auxlib/json-serde-1.3.7-jar-with-dependencies.jar
  12. -- 配置参数加载模式
  13. spark-sql \
  14. --master yarn \
  15. --deploy-mode client \
  16. --name spark-sql-service \
  17. --driver-cores 2 \
  18. --driver-memory 4096M \
  19. --executor-cores 1 \
  20. --executor-memory 2048M \
  21. --num-executors 3 \
  22. --conf spark.driver.extraJavaOptions="-DJAVA_LIBRARY_PATH=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native:$LD_LIBRARY_PATH" \
  23. --jars file:///etc/hive/auxlib/dw_hive_udf-1.0.jar,file:///etc/hive/auxlib/json-serde-1.3.7-jar-with-dependencies.jar

三、SparkSQL ThriftServer 模式部署

1. 启动 Thriftserver 进程

  • 更多配置见 spark-defaults.conf
  1. 1. Yarn 模式
  2. (1) yarn-client 模式 , Yarn 管理进程 , Driver 运行在客户端 ,Work 运行在 NodeManager
  3. $SPARK_HOME/sbin/start-thriftserver.sh \
  4. --master yarn \
  5. --deploy-mode client \
  6. --name spark-sql-service \
  7. --queue root.default \
  8. --driver-cores 2 \
  9. --driver-memory 4096M \
  10. --executor-cores 1 \
  11. --executor-memory 2048M \
  12. --num-executors 3 \
  13. --jars file://path/xxx.jar,file://path/xxx.jar \
  14. --hiveconf hive.server2.thrift.port=10010
  15. (2) yarn-cluster模式, 集群模式目前不支持
  16. ./sbin/start-thriftserver.sh \
  17. --master yarn \
  18. --deploy-mode client \
  19. --name spark-sql-service \
  20. --driver-cores 2 \
  21. --driver-memory 4096M \
  22. --executor-cores 1 \
  23. --executor-memory 2048M \
  24. --num-executors 3 \
  25. --hiveconf hive.server2.thrift.port=10010
  26. 2. standalone 模式
  27. $SPARK_HOME/sbin/start-thriftserver.sh \
  28. --master spark://uhadoop-ociicy-task3:7077 \
  29. --deploy-mode client \
  30. --name spark-sql \
  31. --driver-cores 2 \
  32. --driver-memory 500M \
  33. --hiveconf hive.server2.thrift.port=10010
  34. 3. JDBC 操作 hive
  35. $SPARK_HOME/bin/beeline !connect jdbc:hive2://hostname:10010

2. SparkSQL thrift-server HA 模式

2.1 配置参数

  1. <property>
  2. <name>hive.server2.support.dynamic.service.discovery</name>
  3. <value>true</value>
  4. </property>
  5. <property>
  6. <name>hive.server2.zookeeper.namespace</name>
  7. <value>sparkserver2_zk</value>
  8. </property>
  9. <property>
  10. <name>hive.zookeeper.quorum</name>
  11. <value>master1:2181,master2:2181,node1:2181</value>
  12. </property>
  13. <property>
  14. <name>hive.zookeeper.client.port</name>
  15. <value>2181</value>
  16. </property>
  17. <!-- HiveServer2 启动的节点地址(根据 HA 的部署节点的名称写, 根据部署节点配置) -->
  18. <property>
  19. <name>hive.server2.thrift.bind.host</name>
  20. <value>node9</value>
  21. </property>

2.2 部署服务

  1. 1. Spark 编译支持 HA 模式的 Jar
  2. spark-hive-thriftserver_2.11-2.0.2.jar 加入到 $SPARK_HOME/jars/spark-hive-thriftserver_2.11-2.0.2.jar
  3. 2. 部署配置, 这里需要在 2 个节点, 同时启动服务
  4. $SPARK_HOME/sbin
  5. ./start-thriftserver.sh \
  6. --master yarn \
  7. --name service_name \
  8. --conf spark.driver.memory=3G \
  9. --executor-memory 1G \
  10. --num-executors 10 \
  11. --hiveconf hive.server2.thrift.port=10003
  12. 3. 测试连接 HA 模式下的 SparkThriftService
  13. $SPARK_HOME/bin/beeline -u "jdbc:hive2://zkNode1:2181,zkNode2:2181,zkNode3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkserver2_zk" hadoop hadoop
  14. # 客户端方式启动
  15. $HIVE_HOME/bin/beeline
  16. > !connect jdbc:hive2://zkNode1:2181,zkNode2:2181,zkNode3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkserver2_zk hadoop hadoop

3. Spark 动态资源部署

  • 具体参数说明见: spark-defaults.conf
  1. 1. 配置 yarn-site.xml 支持动态资源部署模式( NodeManager 所有节点上, 重启服务)
  2. ##### NodeManager 附属程序支持 Start #####
  3. <!-- NodeManager 上运行的附属服务, 用于提升 Shuffle 计算性能 -->
  4. <property>
  5. <name>yarn.nodemanager.aux-services</name>
  6. <value>mapreduce_shuffle,spark_shuffle</value>
  7. </property>
  8. <!-- NodeManager 中辅助服务对应的类 -->
  9. <property>
  10. <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  11. <value>org.apache.spark.network.yarn.YarnShuffleService</value>
  12. </property>
  13. <!-- Shuffle 服务监听数据获取请求的端口。可选配置, 默认值为 7337 -->
  14. <property>
  15. <name>spark.shuffle.service.port</name>
  16. <value>7337</value>
  17. </property>
  18. ##### NodeManager 附属程序支持 End #####
  19. 2. 配置 spark-defaults.conf 配置动态分配
  20. ##### 动态分配 Start #####
  21. ## 注意事项:根据任务动态向 yarn 申请资源, 会导致申请资源浪费大量时间。
  22. ## 是否使用动态资源分配,它根据工作负载调整为此应用程序注册的执行程序数量。
  23. spark.dynamicAllocation.enabled true
  24. ## 使用外部 shuffle, 保存了由 executor 写出的 shuffle 文件所以 executor 可以被安全移除, spark.dynamicAllocation.enabled 为 true, 这个选项才可以为 true
  25. spark.shuffle.service.enabled true
  26. ## 每个 Application 最小分配的 executor 数
  27. spark.dynamicAllocation.minExecutors 3
  28. ## 每个 Application 最大并发分配的 executor 数。 ThriftServer 模式是整个 ThriftServer 同时并发的最大资源数,如果多个用户同时连接,则会被多个用户共享竞争
  29. spark.dynamicAllocation.maxExecutors 6
  30. ## 如果启用动态分配,并且有超过此持续时间的挂起任务积压,则将请求新的执行者。
  31. spark.dynamicAllocation.schedulerBacklogTimeout 6s
  32. ## 与 spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于后续执行者请求
  33. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 6s
  34. ## 如果启用动态分配,并且执行程序已空闲超过此持续时间,则将删除执行程序。
  35. spark.dynamicAllocation.executorIdleTimeout 60s
  36. ##### 动态分配 End #####
  37. 2. 部署(spark 默认会读取 spark-defaults.conf 配置文件, 但是命令行启动会覆盖 spark-defaults.conf)
  38. $SPARK_HOME/sbin/start-thriftserver.sh \
  39. --master yarn \
  40. --deploy-mode client \
  41. --queue root.default \
  42. --name test \
  43. --driver-cores 4 \
  44. --driver-memory 8192M \
  45. --executor-cores 6 \
  46. --executor-memory 12288M \
  47. --conf spark.dynamicAllocation.enabled=true \
  48. --conf spark.dynamicAllocation.minExecutors=3 \
  49. --conf spark.dynamicAllocation.maxExecutors=6 \
  50. --hiveconf hive.server2.thrift.port=10010 \
  51. --jars file:///etc/hive/auxlib/dw_hive_udf-1.0.jar,file:///etc/hive/auxlib/json-serde-1.3.7-jar-with-dependencies.jar

三、Spark Sql Udf

  • Hive UDF 与 Spark UDF 通用
  • UDF

四、Spark Sql 编程

  1. package com.dw2345.machine_learn.combination.sql;
  2. // $example on:spark_hive$
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SparkSession;
  5. // $example off:spark_hive$
  6. object TestSparkSql {
  7. def main(args: Array[String]) {
  8. // warehouseLocation points to the default location for managed databases and tables
  9. var warehouseLocation = "hdfs://nameservice1/user/hive/warehouse"
  10. // 需要 hive-site.xml 文件和 hdfs-site.xml 文件
  11. val spark = SparkSession
  12. .builder()
  13. .master("local")
  14. .appName("Spark Hive Example")
  15. .config("spark.sql.warehouse.dir", warehouseLocation)
  16. .enableHiveSupport()
  17. .getOrCreate()
  18. import spark.implicits._
  19. import spark.sql
  20. // Queries are expressed in HiveQL
  21. sql("SELECT * FROM web_logs_text LIMIT 10").show()
  22. // sql("SELECT * FROM dm_db.dm_channel_inst_compete WHERE p_dt='2017-06-30' AND p_hours='11' LIMIT 10").show()
  23. // sql("SELECT * FROM ods.ods_pic_use WHERE p_type='2' AND p_dt='2017-08-28' AND p_hours='23' LIMIT 10").show()
  24. }
  25. }