SparkSQL 与 Hbase 整合

一、环境

  1. 1) 创建 hive on hbase 数据表, 具体参见 hive-on-hbase 文档
  2. 1) hbase 所有的主机名配置到 /etc/hosts 中,保证每个 host 都能访问到 hbase 集群的服务器
  3. vim /etc/hosts
  4. 例如 :
  5. 10.10.33.175 uhadoop-ociicy-core1
  6. 10.10.7.68 uhadoop-ociicy-core2
  7. 10.10.43.97 uhadoop-ociicy-core3
  8. 10.10.240.22 uhadoop-ociicy-core4
  9. 10.10.236.241 uhadoop-ociicy-core5
  10. 10.10.222.21 uhadoop-ociicy-core6
  11. 10.10.229.183 uhadoop-ociicy-task3
  12. 10.10.234.131 uhadoop-ociicy-task4
  13. 2) 配置相关的依赖 jar 包(具体跟随集群环境走)
  14. $HBASE_HOME/lib/hbase-annotations-1.2.0-cdh5.9.2.jar
  15. $HBASE_HOME/lib/hbase-spark-1.2.0-cdh5.9.2.jar
  16. $HBASE_HOME/lib/hbase-common-1.2.0-cdh5.9.2.jar
  17. $HBASE_HOME/lib/hbase-client-1.2.0-cdh5.9.2.jar
  18. $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.9.2.jar
  19. $HBASE_HOME/lib/hbase-protocol-1.2.0-cdh5.9.2.jar
  20. $HBASE_HOME/lib/guava-12.0.1.jar
  21. $HBASE_HOME/lib/htrace-core-3.2.0-incubating.jar
  22. $HBASE_HOME/lib/zookeeper.jar
  23. $HBASE_HOME/lib/protobuf-java-2.5.0.jar
  24. $HBASE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.9.2.jar
  25. $HBASE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.9.2.jar
  26. $HBASE_HOME/lib/metrics-core-2.2.0.jar
  27. $HIVE_HOME/lib/hive-hbase-handler-1.1.0-cdh5.9.0.jar

二、命令

  1. 1. 注意事项
  2. 配置好 hbase-site.xml, 配置好 --jars
  3. spark-sql \
  4. --master yarn \
  5. --deploy-mode client \
  6. --name spark-hbase-demo \
  7. --driver-cores 1 \
  8. --driver-memory 1024M \
  9. --executor-cores 1 \
  10. --executor-memory 1024M \
  11. --num-executors 1 \
  12. --files $HBASE_HOME/conf/hbase-site.xml \
  13. --jars file://$HBASE_HOME/lib/hbase-annotations-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-spark-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-common-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-client-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-server-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-protocol-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/guava-12.0.1.jar,file://$HBASE_HOME/lib/htrace-core-3.2.0-incubating.jar,file://$HBASE_HOME/lib/zookeeper.jar,file://$HBASE_HOME/lib/protobuf-java-2.5.0.jar,file://$HBASE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/metrics-core-2.2.0.jar,file://$HIVE_HOME/lib/hive-hbase-handler-1.1.0-cdh5.9.0.jar

三、RDD 编程

使用 Spark On HBase 机制, 在不迁移数据的情况下,使用 Spark 嵌入 HBase 中分析查询,目前有如下几个方案:

  1. hortonworks/shc - spark-on-hbase 方案 实现了 Spark Datasource API , 直接使用 Spark Catalyst 引擎(spark 2.0 引入)进行查询优化,耦合低,由于使用 API 访问,后续 Spark 版本升级 Spark Catalyst 引擎被优化,性能也会跟着提升。

  2. Huawei-Spark/Spark-SQL-on-HBase - 华为 2015 入侵方案 在 Spark Catalyst 引擎内嵌入自己的查询优化计划, 将 RDD 发送到 HBase,入侵到 HBase 的 Coprocessors 协同处理器中执行任务,例如 Group By。由于此查询计划是自己实现功能的复杂性,不使用 Spark Catalyst 官方优化引擎,所以日后升级、补丁,不跟随 Spark 官方走,会导致日后维护难和不稳定

  3. nerdammer/spark-hbase-connector - nerdammer 对传统读写 Hbase TableInputFormat 和 TableOutputFormat 的封装

  4. cloudera-labs/SparkOnHBase - coluder 2015 方案 (使用 Spark 1.6 版本) cloudera 提供的方案 2015 年方案

  1. import org.apache.spark.sql.{SparkSession, DataFrame, SQLContext}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.sql.execution.datasources.hbase._
  4. /**
  5. * SparkRDD 读取 HBase 数据
  6. * 下载 git@github.com:hortonworks-spark/shc.git
  7. * git clone git@github.com:hortonworks-spark/shc.git
  8. * 选择 spark 和 hbase 对应版本, 切换到 v1.1.0-2.0 分支, Hbase 1.1.0, spark 2.0, https://github.com/hortonworks-spark/shc/tree/v1.1.0-2.0
  9. * git fetch origin v1.1.0-2.0:v1.1.0-2.0
  10. * git checkout v1.1.0-2.0
  11. * 打包编译, 修改 pom.xml 文件, 修改 spark 版本号信息为 2.0.2
  12. * mvn clean package
  13. * 放到项目 lib 目录下即可
  14. */
  15. object TestSparkHBase {
  16. def main(args: Array[String]): Unit = {
  17. // 初始化上下文
  18. val sparkConf = new SparkConf()
  19. //.setMaster("local")
  20. .setAppName("HBaseExample")
  21. val sc = new SparkContext(sparkConf)
  22. val sqlContext = new SQLContext(sc)
  23. // 隐式转换
  24. import sqlContext.implicits._
  25. // 装在 HBase 数据到 DataFrame
  26. val df = withCatalog(sqlContext, catalog)
  27. // 读取 RDD
  28. df.take(10).foreach { x => println(x) }
  29. // 读取hive
  30. df.registerTempTable("user_profile")
  31. sqlContext.sql("SELECT col0,col1,col2,col3 from user_profile LIMIT 10").show
  32. }
  33. // 定义 HBase 信息
  34. def catalog = s"""{
  35. |"table":{"namespace":"default", "name":"user_profile"},
  36. |"rowkey":"key",
  37. |"columns":{
  38. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  39. |"col1":{"cf":"browser", "col":"channel_id", "type":"string"},
  40. |"col2":{"cf":"common", "col":"update_date", "type":"string"},
  41. |"col3":{"cf":"pcsafe", "col":"gc_nav_click", "type":"string"}
  42. |}
  43. |}""".stripMargin
  44. /**
  45. * 装在 HBbase 数据
  46. */
  47. def withCatalog(sqlContext: SQLContext, cat: String): DataFrame = {
  48. sqlContext
  49. .read
  50. .options(Map(HBaseTableCatalog.tableCatalog->cat))
  51. .format("org.apache.spark.sql.execution.datasources.hbase")
  52. .load()
  53. }
  54. }