SparkSQL 与 Hbase 整合
一、环境
1) 创建 hive on hbase 数据表, 具体参见 hive-on-hbase 文档1) 把 hbase 所有的主机名配置到 /etc/hosts 中,保证每个 host 都能访问到 hbase 集群的服务器vim /etc/hosts例如 :10.10.33.175 uhadoop-ociicy-core110.10.7.68 uhadoop-ociicy-core210.10.43.97 uhadoop-ociicy-core310.10.240.22 uhadoop-ociicy-core410.10.236.241 uhadoop-ociicy-core510.10.222.21 uhadoop-ociicy-core610.10.229.183 uhadoop-ociicy-task310.10.234.131 uhadoop-ociicy-task42) 配置相关的依赖 jar 包(具体跟随集群环境走)$HBASE_HOME/lib/hbase-annotations-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-spark-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-common-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-client-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-server-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-protocol-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/guava-12.0.1.jar$HBASE_HOME/lib/htrace-core-3.2.0-incubating.jar$HBASE_HOME/lib/zookeeper.jar$HBASE_HOME/lib/protobuf-java-2.5.0.jar$HBASE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.9.2.jar$HBASE_HOME/lib/metrics-core-2.2.0.jar$HIVE_HOME/lib/hive-hbase-handler-1.1.0-cdh5.9.0.jar
二、命令
1. 注意事项配置好 hbase-site.xml, 配置好 --jarsspark-sql \--master yarn \--deploy-mode client \--name spark-hbase-demo \--driver-cores 1 \--driver-memory 1024M \--executor-cores 1 \--executor-memory 1024M \--num-executors 1 \--files $HBASE_HOME/conf/hbase-site.xml \--jars file://$HBASE_HOME/lib/hbase-annotations-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-spark-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-common-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-client-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-server-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-protocol-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/guava-12.0.1.jar,file://$HBASE_HOME/lib/htrace-core-3.2.0-incubating.jar,file://$HBASE_HOME/lib/zookeeper.jar,file://$HBASE_HOME/lib/protobuf-java-2.5.0.jar,file://$HBASE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.9.2.jar,file://$HBASE_HOME/lib/metrics-core-2.2.0.jar,file://$HIVE_HOME/lib/hive-hbase-handler-1.1.0-cdh5.9.0.jar
三、RDD 编程
使用 Spark On HBase 机制, 在不迁移数据的情况下,使用 Spark 嵌入 HBase 中分析查询,目前有如下几个方案:
hortonworks/shc - spark-on-hbase 方案 实现了 Spark Datasource API , 直接使用 Spark Catalyst 引擎(spark 2.0 引入)进行查询优化,耦合低,由于使用 API 访问,后续 Spark 版本升级 Spark Catalyst 引擎被优化,性能也会跟着提升。
Huawei-Spark/Spark-SQL-on-HBase - 华为 2015 入侵方案 在 Spark Catalyst 引擎内嵌入自己的查询优化计划, 将 RDD 发送到 HBase,入侵到 HBase 的 Coprocessors 协同处理器中执行任务,例如 Group By。由于此查询计划是自己实现功能的复杂性,不使用 Spark Catalyst 官方优化引擎,所以日后升级、补丁,不跟随 Spark 官方走,会导致日后维护难和不稳定
nerdammer/spark-hbase-connector - nerdammer 对传统读写 Hbase TableInputFormat 和 TableOutputFormat 的封装
cloudera-labs/SparkOnHBase - coluder 2015 方案 (使用 Spark 1.6 版本) cloudera 提供的方案 2015 年方案
import org.apache.spark.sql.{SparkSession, DataFrame, SQLContext}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.execution.datasources.hbase._/*** SparkRDD 读取 HBase 数据* 下载 git@github.com:hortonworks-spark/shc.git* git clone git@github.com:hortonworks-spark/shc.git* 选择 spark 和 hbase 对应版本, 切换到 v1.1.0-2.0 分支, Hbase 1.1.0, spark 2.0, https://github.com/hortonworks-spark/shc/tree/v1.1.0-2.0* git fetch origin v1.1.0-2.0:v1.1.0-2.0* git checkout v1.1.0-2.0* 打包编译, 修改 pom.xml 文件, 修改 spark 版本号信息为 2.0.2* mvn clean package* 放到项目 lib 目录下即可*/object TestSparkHBase {def main(args: Array[String]): Unit = {// 初始化上下文val sparkConf = new SparkConf()//.setMaster("local").setAppName("HBaseExample")val sc = new SparkContext(sparkConf)val sqlContext = new SQLContext(sc)// 隐式转换import sqlContext.implicits._// 装在 HBase 数据到 DataFrameval df = withCatalog(sqlContext, catalog)// 读取 RDDdf.take(10).foreach { x => println(x) }// 读取hivedf.registerTempTable("user_profile")sqlContext.sql("SELECT col0,col1,col2,col3 from user_profile LIMIT 10").show}// 定义 HBase 信息def catalog = s"""{|"table":{"namespace":"default", "name":"user_profile"},|"rowkey":"key",|"columns":{|"col0":{"cf":"rowkey", "col":"key", "type":"string"},|"col1":{"cf":"browser", "col":"channel_id", "type":"string"},|"col2":{"cf":"common", "col":"update_date", "type":"string"},|"col3":{"cf":"pcsafe", "col":"gc_nav_click", "type":"string"}|}|}""".stripMargin/*** 装在 HBbase 数据*/def withCatalog(sqlContext: SQLContext, cat: String): DataFrame = {sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}}
