概述
这是原始版本的,不是用phoenix的
准备HBase数据
此时 HBase的ns1下的t1是有数据的
hbase(main):005:0> scan 'ns1:t1'ROW COLUMN+CELL1001 column=cf1:age, timestamp=1605086867452, value=271001 column=cf1:name, timestamp=1605073387047, value=Nick1001 column=cf1:sex, timestamp=1605064625041, value=male1002 column=cf1:name, timestamp=1605073396422, value=Nick1003 column=cf1:name, timestamp=1605073400548, value=Nick1004 column=cf1:name, timestamp=1605073404158, value=Nick1005 column=cf1:name, timestamp=1605073406729, value=Nick1006 column=cf1:name, timestamp=1605073410678, value=Nick1007 column=cf1:name, timestamp=1605073414299, value=Nick1008 column=cf1:name, timestamp=1605073419160, value=Nick1009 column=cf1:name, timestamp=1605073422869, value=Nickb1 column=cf1:name, timestamp=1605160116107, value=jack10 row(s) in 0.2440 secondshbase(main):006:0>
pom.xml
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency>
读操作
package com.hbaseimport java.utilimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.client.Resultimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}import org.apache.spark.{SparkConf, SparkContext}import org.json4s.jackson.Serializationimport scala.collection.mutableobject HbaseRead {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)// 连接hbase的配置val hbaseConf: Configuration = HBaseConfiguration.create()// zookeeper的配置// hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")hbaseConf.set("hbase.zookeeper.quorum", "zjj101")// 读取HBase的ns1下的t1表hbaseConf.set(TableInputFormat.INPUT_TABLE, "ns1:t1")// 从hbase读数据val rdd1 = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable], // rowKey封装在这个类型中classOf[Result])// 读到数据封装下val resultRDD = rdd1.map {// iw只封装rowKey result封装一行数据case (iw, result) => {val map = mutable.Map[String, Any]()// 把rowKey存入到map中map += "rowKew" -> Bytes.toString(iw.get())// 再把每一列也存入到map中val cells: util.List[Cell] = result.listCells()import scala.collection.JavaConversions._ //给Java列名转成scala的.for (cell <- cells) { // 遍历list集合// 列名->列值val key = Bytes.toString(CellUtil.cloneQualifier(cell))val value = Bytes.toString(CellUtil.cloneValue(cell))map += key -> value // 保存到map中.}// 为了好看把map转成json json4s(json4scala)implicit val df = org.json4s.DefaultFormatsSerialization.write(map)}}resultRDD.collect.foreach(println)sc.stop()}}
运行结果
{"rowKew":"1001","name":"Nick","age":"27","sex":"male"}{"rowKew":"1002","name":"Nick"}{"rowKew":"1003","name":"Nick"}{"rowKew":"1004","name":"Nick"}{"rowKew":"1005","name":"Nick"}{"rowKew":"1006","name":"Nick"}{"rowKew":"1007","name":"Nick"}{"rowKew":"1008","name":"Nick"}{"rowKew":"1009","name":"Nick"}{"rowKew":"b1","name":"jack"}
写数据
package com.hbaseimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.Putimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.{SparkConf, SparkContext}object HbaseWrite {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val hbaseConf: Configuration = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", "zjj101")// hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "ns1:t1")val job = Job.getInstance(hbaseConf) //得到一个Job//设置往外写的一个类job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Put])//准备写的数据 第一个值是rowkey 第二个值是name ,第三个值是ageval initialRDD = sc.parallelize(List(("1010", "zjj1", "25"),("1011", "zjj2", "26"),("1012", "zjj3", "28")))// 先把rdd数据封装成 TableReduce需要的那种格式,才能往HBase里面写数据.val hbaseRDD = initialRDD.map {case (rk, name, age) =>val rowkey = new ImmutableBytesWritable()rowkey.set(Bytes.toBytes(rk)) //rowkeyval put = new Put(Bytes.toBytes(rk))//cf1 是列族,put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))(rowkey, put)}//保存到HBase的数据库里面.hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)sc.stop()}}
写完查看HBase结果
仔细观察,发现rowKey为1010 ,1011, 1012 的数据已经写进HBase里面了.
hbase(main):006:0> scan 'ns1:t1'ROW COLUMN+CELL1001 column=cf1:age, timestamp=1605086867452, value=271001 column=cf1:name, timestamp=1605073387047, value=Nick1001 column=cf1:sex, timestamp=1605064625041, value=male1002 column=cf1:name, timestamp=1605073396422, value=Nick1003 column=cf1:name, timestamp=1605073400548, value=Nick1004 column=cf1:name, timestamp=1605073404158, value=Nick1005 column=cf1:name, timestamp=1605073406729, value=Nick1006 column=cf1:name, timestamp=1605073410678, value=Nick1007 column=cf1:name, timestamp=1605073414299, value=Nick1008 column=cf1:name, timestamp=1605073419160, value=Nick1009 column=cf1:name, timestamp=1605073422869, value=Nick1010 column=cf1:age, timestamp=1606127159307, value=251010 column=cf1:name, timestamp=1606127159307, value=zjj11011 column=cf1:age, timestamp=1606127159307, value=261011 column=cf1:name, timestamp=1606127159307, value=zjj21012 column=cf1:age, timestamp=1606127159307, value=281012 column=cf1:name, timestamp=1606127159307, value=zjj3b1 column=cf1:name, timestamp=1605160116107, value=jack13 row(s) in 0.4560 seconds
