1, hbase测试环境搭建
https://www.yuque.com/tiankonghewo/note/mlemax
2. saveAsNewAPIHadoopDataset
1, row 映射到 put
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.sql.Row
object RowMapper {
def rowMapper(row: Row): Put = {
import org.apache.hadoop.hbase.util.Bytes
val column0_id = row.getLong(0)
val column1_name = row.getDecimal(1)
val end_time = new SimpleDateFormat("yyyyMMdd").format(new Date())
val put = new Put(Bytes.toBytes(end_time + "-" + column0_customer_id.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(column0_id.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(column1_name.toString))
put
}
}
2.df 写入 hbase
import java.io.IOException
import java.util.Properties
import org.apache.hadoop.hbase.client.{Connection, HTable, Put}
import org.apache.spark.sql.{DataFrame, Row}
object HbaseWriter {
//批量写hbase,效率更高
def hbasebulk(df: DataFrame, prop: Properties, rowMapper: Row => Put): Unit = {
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
val hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", prop.getProperty("hbase.zookeeper.quorum"))
hbaseConf.set("hbase.zookeeper.property.clientPort", prop.getProperty("hbase.zookeeper.property.clientPort"))
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, prop.getProperty("hbase_table_name"))
val job = Job.getInstance(hbaseConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rdd = df.rdd.map(row => (new ImmutableBytesWritable, rowMapper(row)))
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
3, hortonworks hbase connector
1, dependency
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.18.Final</version>
</dependency>
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.1-2.1-s_2.11</version>
</dependency>
2, code
package com.wsy.learn
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{SaveMode, SparkSession}
object HbaseHortonworks {
//此case class 不能定义在方法中,否则运行报错
case class Student(rowkey: String, spark_height: String, spark_birthday: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val catlog =
s"""{
|"table":{"namespace":"default", "name":"student", "tableCoder":"PrimitiveType"},
|"rowkey":"rowkey",
|"columns":{
|"rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},
|"spark_height":{"cf":"cf", "col":"height", "type":"string"},
|"spark_birthday":{"cf":"cf", "col":"birthday", "type":"string"}
|}
|}""".stripMargin
var newData = Seq(Student("wty", "62", "20141001"), Student("son", "22", "20210110"))
val rdd = spark.sparkContext.parallelize(newData)
import spark.implicits._
val df1 = rdd.toDF
df1.show()
df1.write.mode(SaveMode.Append)
.option("hbase.zookeeper.quorum", "myhbase:2181")
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
val df = spark
.read
.option("hbase.zookeeper.quorum", "myhbase:2181")
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.show()
spark.stop()
}
}
hortonworks-spark官网
https://github.com/hortonworks-spark/shc
Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside.
Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
https://hbase.apache.org/book.html#_sparksqldataframes
https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-using-spark-query-hbase
case class 定义位置bug
https://blog.csdn.net/zhao897426182/article/details/78284327
https://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd
4, alibaba hbase connector
1, dependency
<dependency>
<groupId>com.aliyun.apsaradb</groupId>
<artifactId>alihbase-spark</artifactId>
<version>1.1.3_2.3.2-1.0.3</version>
</dependency>
2, 官方代码
import org.apache.spark.sql.SparkSession
object Hello {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val sparkTableName = "x"
val hbaseTableName = "student"
val zkAddress = "myhbase:2181"
val createCmd =
s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
| OPTIONS ('catalog'=
| '{"table":{"namespace":"default", "name":"${hbaseTableName}"},"rowkey":"rowkey",
| "columns":{
| "rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string"},
| "spark_height":{"cf":"cf", "col":"height", "type":"string"},
| "spark_birthday":{"cf":"cf", "col":"birthday", "type":"String"}}}',
| 'hbase.zookeeper.quorum' = '${zkAddress}'
| )""".stripMargin
spark.sql(createCmd)
spark.sql("select * from " + sparkTableName + " limit 5").show
spark.stop()
}
}
3, spark.read
package com.wsy.learn
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{SaveMode, SparkSession}
object HbaseAlibaba {
case class Student(rowkey: String, spark_height: String, spark_birthday: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val catalog =
"""
|{
| "table": {
| "namespace": "default",
| "name": "student"
| },
| "rowkey": "key",
| "columns": {
| "rowkey": {
| "cf": "rowkey",
| "col": "key",
| "type": "string"
| },
| "spark_height": {
| "cf": "cf",
| "col": "height",
| "type": "string"
| },
| "spark_birthday": {
| "cf": "cf",
| "col": "birthday",
| "type": "string"
| }
| }
|}
""".stripMargin
var newData = Seq(Student("wty", "61", "20141001"), Student("son", "21", "20210110"))
val rdd = spark.sparkContext.parallelize(newData)
import spark.implicits._
val df1 = rdd.toDF()
//todo 读取是没问题的,写入只能新建表,是个bug,有待
df1.write.mode(SaveMode.Append)
.option("hbase.zookeeper.quorum", "myhbase:2181")
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2"))
.format("org.apache.hadoop.hbase.spark")
.save()
val df = spark
.read
.option("hbase.zookeeper.quorum", "myhbase:2181")
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.hadoop.hbase.spark")
.load()
df.show()
spark.stop()
}
}
https://help.aliyun.com/document_detail/93908.html?spm=a2c6h.12873639.0.0.f45f9079aM2gH8
https://github.com/aliyun/aliyun-apsaradb-hbase-demo/blob/master/spark/spark-examples/src/main/scala/com/aliyun/spark/hbase/SparkOnHBaseSparkSession.scala