1, hbase测试环境搭建

https://www.yuque.com/tiankonghewo/note/mlemax

2. saveAsNewAPIHadoopDataset

1, row 映射到 put

  1. import java.text.SimpleDateFormat
  2. import java.util.Date
  3. import org.apache.hadoop.hbase.client.Put
  4. import org.apache.spark.sql.Row
  5. object RowMapper {
  6. def rowMapper(row: Row): Put = {
  7. import org.apache.hadoop.hbase.util.Bytes
  8. val column0_id = row.getLong(0)
  9. val column1_name = row.getDecimal(1)
  10. val end_time = new SimpleDateFormat("yyyyMMdd").format(new Date())
  11. val put = new Put(Bytes.toBytes(end_time + "-" + column0_customer_id.toString))
  12. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(column0_id.toString))
  13. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(column1_name.toString))
  14. put
  15. }
  16. }

2.df 写入 hbase

  1. import java.io.IOException
  2. import java.util.Properties
  3. import org.apache.hadoop.hbase.client.{Connection, HTable, Put}
  4. import org.apache.spark.sql.{DataFrame, Row}
  5. object HbaseWriter {
  6. //批量写hbase,效率更高
  7. def hbasebulk(df: DataFrame, prop: Properties, rowMapper: Row => Put): Unit = {
  8. import org.apache.hadoop.hbase.client.{Put, Result}
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  10. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
  11. import org.apache.hadoop.mapreduce.Job
  12. val hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()
  13. hbaseConf.set("hbase.zookeeper.quorum", prop.getProperty("hbase.zookeeper.quorum"))
  14. hbaseConf.set("hbase.zookeeper.property.clientPort", prop.getProperty("hbase.zookeeper.property.clientPort"))
  15. hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, prop.getProperty("hbase_table_name"))
  16. val job = Job.getInstance(hbaseConf)
  17. job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  18. job.setOutputValueClass(classOf[Result])
  19. job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  20. val rdd = df.rdd.map(row => (new ImmutableBytesWritable, rowMapper(row)))
  21. rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  22. }
  23. }

3, hortonworks hbase connector

1, dependency

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.18.Final</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.hortonworks</groupId>
  8. <artifactId>shc-core</artifactId>
  9. <version>1.1.1-2.1-s_2.11</version>
  10. </dependency>

2, code

  1. package com.wsy.learn
  2. import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
  3. import org.apache.spark.sql.{SaveMode, SparkSession}
  4. object HbaseHortonworks {
  5. //此case class 不能定义在方法中,否则运行报错
  6. case class Student(rowkey: String, spark_height: String, spark_birthday: String)
  7. def main(args: Array[String]): Unit = {
  8. val spark = SparkSession
  9. .builder()
  10. .master("local")
  11. .appName(this.getClass.getSimpleName)
  12. .getOrCreate()
  13. val catlog =
  14. s"""{
  15. |"table":{"namespace":"default", "name":"student", "tableCoder":"PrimitiveType"},
  16. |"rowkey":"rowkey",
  17. |"columns":{
  18. |"rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},
  19. |"spark_height":{"cf":"cf", "col":"height", "type":"string"},
  20. |"spark_birthday":{"cf":"cf", "col":"birthday", "type":"string"}
  21. |}
  22. |}""".stripMargin
  23. var newData = Seq(Student("wty", "62", "20141001"), Student("son", "22", "20210110"))
  24. val rdd = spark.sparkContext.parallelize(newData)
  25. import spark.implicits._
  26. val df1 = rdd.toDF
  27. df1.show()
  28. df1.write.mode(SaveMode.Append)
  29. .option("hbase.zookeeper.quorum", "myhbase:2181")
  30. .options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  31. .format("org.apache.spark.sql.execution.datasources.hbase")
  32. .save()
  33. val df = spark
  34. .read
  35. .option("hbase.zookeeper.quorum", "myhbase:2181")
  36. .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
  37. .format("org.apache.spark.sql.execution.datasources.hbase")
  38. .load()
  39. df.show()
  40. spark.stop()
  41. }
  42. }

hortonworks-spark官网
https://github.com/hortonworks-spark/shc

  1. Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside.
  2. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.

https://hbase.apache.org/book.html#_sparksqldataframes
https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-using-spark-query-hbase
case class 定义位置bug
https://blog.csdn.net/zhao897426182/article/details/78284327
https://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd

4, alibaba hbase connector

1, dependency

  1. <dependency>
  2. <groupId>com.aliyun.apsaradb</groupId>
  3. <artifactId>alihbase-spark</artifactId>
  4. <version>1.1.3_2.3.2-1.0.3</version>
  5. </dependency>

2, 官方代码

  1. import org.apache.spark.sql.SparkSession
  2. object Hello {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession
  5. .builder()
  6. .master("local")
  7. .appName(this.getClass.getSimpleName)
  8. .getOrCreate()
  9. val sparkTableName = "x"
  10. val hbaseTableName = "student"
  11. val zkAddress = "myhbase:2181"
  12. val createCmd =
  13. s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
  14. | OPTIONS ('catalog'=
  15. | '{"table":{"namespace":"default", "name":"${hbaseTableName}"},"rowkey":"rowkey",
  16. | "columns":{
  17. | "rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},
  18. | "spark_height":{"cf":"cf", "col":"height", "type":"string"},
  19. | "spark_birthday":{"cf":"cf", "col":"birthday", "type":"String"}}}',
  20. | 'hbase.zookeeper.quorum' = '${zkAddress}'
  21. | )""".stripMargin
  22. spark.sql(createCmd)
  23. spark.sql("select * from " + sparkTableName + " limit 5").show
  24. spark.stop()
  25. }
  26. }

3, spark.read

  1. package com.wsy.learn
  2. import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
  3. import org.apache.spark.sql.{SaveMode, SparkSession}
  4. object HbaseAlibaba {
  5. case class Student(rowkey: String, spark_height: String, spark_birthday: String)
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession
  8. .builder()
  9. .master("local")
  10. .appName(this.getClass.getSimpleName)
  11. .getOrCreate()
  12. val catalog =
  13. """
  14. |{
  15. | "table": {
  16. | "namespace": "default",
  17. | "name": "student"
  18. | },
  19. | "rowkey": "key",
  20. | "columns": {
  21. | "rowkey": {
  22. | "cf": "rowkey",
  23. | "col": "key",
  24. | "type": "string"
  25. | },
  26. | "spark_height": {
  27. | "cf": "cf",
  28. | "col": "height",
  29. | "type": "string"
  30. | },
  31. | "spark_birthday": {
  32. | "cf": "cf",
  33. | "col": "birthday",
  34. | "type": "string"
  35. | }
  36. | }
  37. |}
  38. """.stripMargin
  39. var newData = Seq(Student("wty", "61", "20141001"), Student("son", "21", "20210110"))
  40. val rdd = spark.sparkContext.parallelize(newData)
  41. import spark.implicits._
  42. val df1 = rdd.toDF()
  43. //todo 读取是没问题的,写入只能新建表,是个bug,有待
  44. df1.write.mode(SaveMode.Append)
  45. .option("hbase.zookeeper.quorum", "myhbase:2181")
  46. .options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2"))
  47. .format("org.apache.hadoop.hbase.spark")
  48. .save()
  49. val df = spark
  50. .read
  51. .option("hbase.zookeeper.quorum", "myhbase:2181")
  52. .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
  53. .format("org.apache.hadoop.hbase.spark")
  54. .load()
  55. df.show()
  56. spark.stop()
  57. }
  58. }

https://help.aliyun.com/document_detail/93908.html?spm=a2c6h.12873639.0.0.f45f9079aM2gH8
https://github.com/aliyun/aliyun-apsaradb-hbase-demo/blob/master/spark/spark-examples/src/main/scala/com/aliyun/spark/hbase/SparkOnHBaseSparkSession.scala