概述
这是原始版本的,不是用phoenix的
准备HBase数据
此时 HBase的ns1下的t1是有数据的
hbase(main):005:0> scan 'ns1:t1'
ROW COLUMN+CELL
1001 column=cf1:age, timestamp=1605086867452, value=27
1001 column=cf1:name, timestamp=1605073387047, value=Nick
1001 column=cf1:sex, timestamp=1605064625041, value=male
1002 column=cf1:name, timestamp=1605073396422, value=Nick
1003 column=cf1:name, timestamp=1605073400548, value=Nick
1004 column=cf1:name, timestamp=1605073404158, value=Nick
1005 column=cf1:name, timestamp=1605073406729, value=Nick
1006 column=cf1:name, timestamp=1605073410678, value=Nick
1007 column=cf1:name, timestamp=1605073414299, value=Nick
1008 column=cf1:name, timestamp=1605073419160, value=Nick
1009 column=cf1:name, timestamp=1605073422869, value=Nick
b1 column=cf1:name, timestamp=1605160116107, value=jack
10 row(s) in 0.2440 seconds
hbase(main):006:0>
pom.xml
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
读操作
package com.hbase
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.jackson.Serialization
import scala.collection.mutable
object HbaseRead {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
// 连接hbase的配置
val hbaseConf: Configuration = HBaseConfiguration.create()
// zookeeper的配置
// hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")
hbaseConf.set("hbase.zookeeper.quorum", "zjj101")
// 读取HBase的ns1下的t1表
hbaseConf.set(TableInputFormat.INPUT_TABLE, "ns1:t1")
// 从hbase读数据
val rdd1 = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable], // rowKey封装在这个类型中
classOf[Result]
)
// 读到数据封装下
val resultRDD = rdd1.map {
// iw只封装rowKey result封装一行数据
case (iw, result) => {
val map = mutable.Map[String, Any]()
// 把rowKey存入到map中
map += "rowKew" -> Bytes.toString(iw.get())
// 再把每一列也存入到map中
val cells: util.List[Cell] = result.listCells()
import scala.collection.JavaConversions._ //给Java列名转成scala的.
for (cell <- cells) { // 遍历list集合
// 列名->列值
val key = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
map += key -> value // 保存到map中.
}
// 为了好看把map转成json json4s(json4scala)
implicit val df = org.json4s.DefaultFormats
Serialization.write(map)
}
}
resultRDD.collect.foreach(println)
sc.stop()
}
}
运行结果
{"rowKew":"1001","name":"Nick","age":"27","sex":"male"}
{"rowKew":"1002","name":"Nick"}
{"rowKew":"1003","name":"Nick"}
{"rowKew":"1004","name":"Nick"}
{"rowKew":"1005","name":"Nick"}
{"rowKew":"1006","name":"Nick"}
{"rowKew":"1007","name":"Nick"}
{"rowKew":"1008","name":"Nick"}
{"rowKew":"1009","name":"Nick"}
{"rowKew":"b1","name":"jack"}
写数据
package com.hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object HbaseWrite {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "zjj101")
// hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "ns1:t1")
val job = Job.getInstance(hbaseConf) //得到一个Job
//设置往外写的一个类
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
//准备写的数据 第一个值是rowkey 第二个值是name ,第三个值是age
val initialRDD = sc.parallelize(
List(("1010", "zjj1", "25"),
("1011", "zjj2", "26"),
("1012", "zjj3", "28")))
// 先把rdd数据封装成 TableReduce需要的那种格式,才能往HBase里面写数据.
val hbaseRDD = initialRDD.map {
case (rk, name, age) =>
val rowkey = new ImmutableBytesWritable()
rowkey.set(Bytes.toBytes(rk)) //rowkey
val put = new Put(Bytes.toBytes(rk))
//cf1 是列族,
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
(rowkey, put)
}
//保存到HBase的数据库里面.
hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
写完查看HBase结果
仔细观察,发现rowKey为1010 ,1011, 1012 的数据已经写进HBase里面了.
hbase(main):006:0> scan 'ns1:t1'
ROW COLUMN+CELL
1001 column=cf1:age, timestamp=1605086867452, value=27
1001 column=cf1:name, timestamp=1605073387047, value=Nick
1001 column=cf1:sex, timestamp=1605064625041, value=male
1002 column=cf1:name, timestamp=1605073396422, value=Nick
1003 column=cf1:name, timestamp=1605073400548, value=Nick
1004 column=cf1:name, timestamp=1605073404158, value=Nick
1005 column=cf1:name, timestamp=1605073406729, value=Nick
1006 column=cf1:name, timestamp=1605073410678, value=Nick
1007 column=cf1:name, timestamp=1605073414299, value=Nick
1008 column=cf1:name, timestamp=1605073419160, value=Nick
1009 column=cf1:name, timestamp=1605073422869, value=Nick
1010 column=cf1:age, timestamp=1606127159307, value=25
1010 column=cf1:name, timestamp=1606127159307, value=zjj1
1011 column=cf1:age, timestamp=1606127159307, value=26
1011 column=cf1:name, timestamp=1606127159307, value=zjj2
1012 column=cf1:age, timestamp=1606127159307, value=28
1012 column=cf1:name, timestamp=1606127159307, value=zjj3
b1 column=cf1:name, timestamp=1605160116107, value=jack
13 row(s) in 0.4560 seconds