概述

这是原始版本的,不是用phoenix的

准备HBase数据

此时 HBase的ns1下的t1是有数据的

  1. hbase(main):005:0> scan 'ns1:t1'
  2. ROW COLUMN+CELL
  3. 1001 column=cf1:age, timestamp=1605086867452, value=27
  4. 1001 column=cf1:name, timestamp=1605073387047, value=Nick
  5. 1001 column=cf1:sex, timestamp=1605064625041, value=male
  6. 1002 column=cf1:name, timestamp=1605073396422, value=Nick
  7. 1003 column=cf1:name, timestamp=1605073400548, value=Nick
  8. 1004 column=cf1:name, timestamp=1605073404158, value=Nick
  9. 1005 column=cf1:name, timestamp=1605073406729, value=Nick
  10. 1006 column=cf1:name, timestamp=1605073410678, value=Nick
  11. 1007 column=cf1:name, timestamp=1605073414299, value=Nick
  12. 1008 column=cf1:name, timestamp=1605073419160, value=Nick
  13. 1009 column=cf1:name, timestamp=1605073422869, value=Nick
  14. b1 column=cf1:name, timestamp=1605160116107, value=jack
  15. 10 row(s) in 0.2440 seconds
  16. hbase(main):006:0>

pom.xml

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-server</artifactId>
  4. <version>1.3.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-client</artifactId>
  9. <version>1.3.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-sql_2.11</artifactId>
  14. <version>2.1.1</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-core_2.11</artifactId>
  19. <version>2.1.1</version>
  20. </dependency>

读操作

  1. package com.hbase
  2. import java.util
  3. import org.apache.hadoop.conf.Configuration
  4. import org.apache.hadoop.hbase.client.Result
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  6. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  7. import org.apache.hadoop.hbase.util.Bytes
  8. import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
  9. import org.apache.spark.{SparkConf, SparkContext}
  10. import org.json4s.jackson.Serialization
  11. import scala.collection.mutable
  12. object HbaseRead {
  13. def main(args: Array[String]): Unit = {
  14. val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
  15. val sc: SparkContext = new SparkContext(conf)
  16. // 连接hbase的配置
  17. val hbaseConf: Configuration = HBaseConfiguration.create()
  18. // zookeeper的配置
  19. // hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")
  20. hbaseConf.set("hbase.zookeeper.quorum", "zjj101")
  21. // 读取HBase的ns1下的t1表
  22. hbaseConf.set(TableInputFormat.INPUT_TABLE, "ns1:t1")
  23. // 从hbase读数据
  24. val rdd1 = sc.newAPIHadoopRDD(
  25. hbaseConf,
  26. classOf[TableInputFormat],
  27. classOf[ImmutableBytesWritable], // rowKey封装在这个类型中
  28. classOf[Result]
  29. )
  30. // 读到数据封装下
  31. val resultRDD = rdd1.map {
  32. // iw只封装rowKey result封装一行数据
  33. case (iw, result) => {
  34. val map = mutable.Map[String, Any]()
  35. // 把rowKey存入到map中
  36. map += "rowKew" -> Bytes.toString(iw.get())
  37. // 再把每一列也存入到map中
  38. val cells: util.List[Cell] = result.listCells()
  39. import scala.collection.JavaConversions._ //给Java列名转成scala的.
  40. for (cell <- cells) { // 遍历list集合
  41. // 列名->列值
  42. val key = Bytes.toString(CellUtil.cloneQualifier(cell))
  43. val value = Bytes.toString(CellUtil.cloneValue(cell))
  44. map += key -> value // 保存到map中.
  45. }
  46. // 为了好看把map转成json json4s(json4scala)
  47. implicit val df = org.json4s.DefaultFormats
  48. Serialization.write(map)
  49. }
  50. }
  51. resultRDD.collect.foreach(println)
  52. sc.stop()
  53. }
  54. }

运行结果

  1. {"rowKew":"1001","name":"Nick","age":"27","sex":"male"}
  2. {"rowKew":"1002","name":"Nick"}
  3. {"rowKew":"1003","name":"Nick"}
  4. {"rowKew":"1004","name":"Nick"}
  5. {"rowKew":"1005","name":"Nick"}
  6. {"rowKew":"1006","name":"Nick"}
  7. {"rowKew":"1007","name":"Nick"}
  8. {"rowKew":"1008","name":"Nick"}
  9. {"rowKew":"1009","name":"Nick"}
  10. {"rowKew":"b1","name":"jack"}

写数据

  1. package com.hbase
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.hbase.HBaseConfiguration
  4. import org.apache.hadoop.hbase.client.Put
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  6. import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
  7. import org.apache.hadoop.hbase.util.Bytes
  8. import org.apache.hadoop.mapreduce.Job
  9. import org.apache.spark.{SparkConf, SparkContext}
  10. object HbaseWrite {
  11. def main(args: Array[String]): Unit = {
  12. val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
  13. val sc: SparkContext = new SparkContext(conf)
  14. val hbaseConf: Configuration = HBaseConfiguration.create()
  15. hbaseConf.set("hbase.zookeeper.quorum", "zjj101")
  16. // hbaseConf.set("hbase.zookeeper.quorum", "zjj101,zjj102,zjj103")
  17. hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "ns1:t1")
  18. val job = Job.getInstance(hbaseConf) //得到一个Job
  19. //设置往外写的一个类
  20. job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  21. //
  22. job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  23. job.setOutputValueClass(classOf[Put])
  24. //准备写的数据 第一个值是rowkey 第二个值是name ,第三个值是age
  25. val initialRDD = sc.parallelize(
  26. List(("1010", "zjj1", "25"),
  27. ("1011", "zjj2", "26"),
  28. ("1012", "zjj3", "28")))
  29. // 先把rdd数据封装成 TableReduce需要的那种格式,才能往HBase里面写数据.
  30. val hbaseRDD = initialRDD.map {
  31. case (rk, name, age) =>
  32. val rowkey = new ImmutableBytesWritable()
  33. rowkey.set(Bytes.toBytes(rk)) //rowkey
  34. val put = new Put(Bytes.toBytes(rk))
  35. //cf1 是列族,
  36. put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
  37. put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
  38. (rowkey, put)
  39. }
  40. //保存到HBase的数据库里面.
  41. hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
  42. sc.stop()
  43. }
  44. }

写完查看HBase结果

仔细观察,发现rowKey为1010 ,1011, 1012 的数据已经写进HBase里面了.

  1. hbase(main):006:0> scan 'ns1:t1'
  2. ROW COLUMN+CELL
  3. 1001 column=cf1:age, timestamp=1605086867452, value=27
  4. 1001 column=cf1:name, timestamp=1605073387047, value=Nick
  5. 1001 column=cf1:sex, timestamp=1605064625041, value=male
  6. 1002 column=cf1:name, timestamp=1605073396422, value=Nick
  7. 1003 column=cf1:name, timestamp=1605073400548, value=Nick
  8. 1004 column=cf1:name, timestamp=1605073404158, value=Nick
  9. 1005 column=cf1:name, timestamp=1605073406729, value=Nick
  10. 1006 column=cf1:name, timestamp=1605073410678, value=Nick
  11. 1007 column=cf1:name, timestamp=1605073414299, value=Nick
  12. 1008 column=cf1:name, timestamp=1605073419160, value=Nick
  13. 1009 column=cf1:name, timestamp=1605073422869, value=Nick
  14. 1010 column=cf1:age, timestamp=1606127159307, value=25
  15. 1010 column=cf1:name, timestamp=1606127159307, value=zjj1
  16. 1011 column=cf1:age, timestamp=1606127159307, value=26
  17. 1011 column=cf1:name, timestamp=1606127159307, value=zjj2
  18. 1012 column=cf1:age, timestamp=1606127159307, value=28
  19. 1012 column=cf1:name, timestamp=1606127159307, value=zjj3
  20. b1 column=cf1:name, timestamp=1605160116107, value=jack
  21. 13 row(s) in 0.4560 seconds