1, hbase测试环境搭建
https://www.yuque.com/tiankonghewo/note/mlemax
2. saveAsNewAPIHadoopDataset
1, row 映射到 put
import java.text.SimpleDateFormatimport java.util.Dateimport org.apache.hadoop.hbase.client.Putimport org.apache.spark.sql.Rowobject RowMapper {def rowMapper(row: Row): Put = {import org.apache.hadoop.hbase.util.Bytesval column0_id = row.getLong(0)val column1_name = row.getDecimal(1)val end_time = new SimpleDateFormat("yyyyMMdd").format(new Date())val put = new Put(Bytes.toBytes(end_time + "-" + column0_customer_id.toString))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(column0_id.toString))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(column1_name.toString))put}}
2.df 写入 hbase
import java.io.IOExceptionimport java.util.Propertiesimport org.apache.hadoop.hbase.client.{Connection, HTable, Put}import org.apache.spark.sql.{DataFrame, Row}object HbaseWriter {//批量写hbase,效率更高def hbasebulk(df: DataFrame, prop: Properties, rowMapper: Row => Put): Unit = {import org.apache.hadoop.hbase.client.{Put, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.mapreduce.Jobval hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", prop.getProperty("hbase.zookeeper.quorum"))hbaseConf.set("hbase.zookeeper.property.clientPort", prop.getProperty("hbase.zookeeper.property.clientPort"))hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, prop.getProperty("hbase_table_name"))val job = Job.getInstance(hbaseConf)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])val rdd = df.rdd.map(row => (new ImmutableBytesWritable, rowMapper(row)))rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)}}
3, hortonworks hbase connector
1, dependency
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.18.Final</version></dependency><dependency><groupId>com.hortonworks</groupId><artifactId>shc-core</artifactId><version>1.1.1-2.1-s_2.11</version></dependency>
2, code
package com.wsy.learnimport org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalogimport org.apache.spark.sql.{SaveMode, SparkSession}object HbaseHortonworks {//此case class 不能定义在方法中,否则运行报错case class Student(rowkey: String, spark_height: String, spark_birthday: String)def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate()val catlog =s"""{|"table":{"namespace":"default", "name":"student", "tableCoder":"PrimitiveType"},|"rowkey":"rowkey",|"columns":{|"rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},|"spark_height":{"cf":"cf", "col":"height", "type":"string"},|"spark_birthday":{"cf":"cf", "col":"birthday", "type":"string"}|}|}""".stripMarginvar newData = Seq(Student("wty", "62", "20141001"), Student("son", "22", "20210110"))val rdd = spark.sparkContext.parallelize(newData)import spark.implicits._val df1 = rdd.toDFdf1.show()df1.write.mode(SaveMode.Append).option("hbase.zookeeper.quorum", "myhbase:2181").options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()val df = spark.read.option("hbase.zookeeper.quorum", "myhbase:2181").options(Map(HBaseTableCatalog.tableCatalog -> catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()df.show()spark.stop()}}
hortonworks-spark官网
https://github.com/hortonworks-spark/shc
Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside.Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
https://hbase.apache.org/book.html#_sparksqldataframes
https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-using-spark-query-hbase
case class 定义位置bug
https://blog.csdn.net/zhao897426182/article/details/78284327
https://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd
4, alibaba hbase connector
1, dependency
<dependency><groupId>com.aliyun.apsaradb</groupId><artifactId>alihbase-spark</artifactId><version>1.1.3_2.3.2-1.0.3</version></dependency>
2, 官方代码
import org.apache.spark.sql.SparkSessionobject Hello {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate()val sparkTableName = "x"val hbaseTableName = "student"val zkAddress = "myhbase:2181"val createCmd =s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark| OPTIONS ('catalog'=| '{"table":{"namespace":"default", "name":"${hbaseTableName}"},"rowkey":"rowkey",| "columns":{| "rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},| "spark_height":{"cf":"cf", "col":"height", "type":"string"},| "spark_birthday":{"cf":"cf", "col":"birthday", "type":"String"}}}',| 'hbase.zookeeper.quorum' = '${zkAddress}'| )""".stripMarginspark.sql(createCmd)spark.sql("select * from " + sparkTableName + " limit 5").showspark.stop()}}
3, spark.read
package com.wsy.learnimport org.apache.spark.sql.datasources.hbase.HBaseTableCatalogimport org.apache.spark.sql.{SaveMode, SparkSession}object HbaseAlibaba {case class Student(rowkey: String, spark_height: String, spark_birthday: String)def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate()val catalog ="""|{| "table": {| "namespace": "default",| "name": "student"| },| "rowkey": "key",| "columns": {| "rowkey": {| "cf": "rowkey",| "col": "key",| "type": "string"| },| "spark_height": {| "cf": "cf",| "col": "height",| "type": "string"| },| "spark_birthday": {| "cf": "cf",| "col": "birthday",| "type": "string"| }| }|}""".stripMarginvar newData = Seq(Student("wty", "61", "20141001"), Student("son", "21", "20210110"))val rdd = spark.sparkContext.parallelize(newData)import spark.implicits._val df1 = rdd.toDF()//todo 读取是没问题的,写入只能新建表,是个bug,有待df1.write.mode(SaveMode.Append).option("hbase.zookeeper.quorum", "myhbase:2181").options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2")).format("org.apache.hadoop.hbase.spark").save()val df = spark.read.option("hbase.zookeeper.quorum", "myhbase:2181").options(Map(HBaseTableCatalog.tableCatalog -> catalog)).format("org.apache.hadoop.hbase.spark").load()df.show()spark.stop()}}
https://help.aliyun.com/document_detail/93908.html?spm=a2c6h.12873639.0.0.f45f9079aM2gH8
https://github.com/aliyun/aliyun-apsaradb-hbase-demo/blob/master/spark/spark-examples/src/main/scala/com/aliyun/spark/hbase/SparkOnHBaseSparkSession.scala
