回顾 :

shell-api

DDL

DML list_regions locate_region split

java-api

Connection

Admin

Table

* 预分region表

tools split flush

一、java-api

1 添加数据

1.1 put数据

  • 插入一个单元格
  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.Connection;
  5. import org.apache.hadoop.hbase.client.Put;
  6. import org.apache.hadoop.hbase.client.Table;
  7. /**
  8. * Author: Hang.Z
  9. * Date: 21/07/03
  10. * Description:
  11. * put 'tb_user' , 'rk' , 'cf:name' , 'zss'
  12. * Put对象 行键 列族 属性 值
  13. * Put对象中可以添加多个Cell , 一次插入一行数据
  14. * 1 插入一个单元格
  15. * 2 插入多个单元格
  16. * 3 插入多个列族下的多个单元格
  17. */
  18. public class Hbase07_AddData01 {
  19. public static void main(String[] args) throws Exception {
  20. Connection conn = HbaseUtils.getConnection();
  21. // 获取操作的表对象
  22. Table tb_user = conn.getTable(TableName.valueOf("tb_user"));
  23. // 创建Put对象
  24. Put put = new Put("rk001".getBytes());
  25. put.addColumn("cf".getBytes() , "name".getBytes() , "zss".getBytes()) ;
  26. put.addColumn("cf".getBytes() , "gender".getBytes() , "M".getBytes()) ;
  27. // 添加数据
  28. tb_user.put(put);
  29. //释放资源
  30. tb_user.close();
  31. conn.close();
  32. }
  33. }
  • 插入多个单元格
  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.Connection;
  5. import org.apache.hadoop.hbase.client.Put;
  6. import org.apache.hadoop.hbase.client.Table;
  7. /**
  8. * Author: Hang.Z
  9. * Date: 21/07/03
  10. * Description:
  11. * put 'tb_user' , 'rk' , 'cf:name' , 'zss'
  12. * Put对象 行键 列族 属性 值
  13. * Put对象中可以添加多个Cell , 一次插入一行数据
  14. * 1 插入一个单元格
  15. * 2 插入多个单元格
  16. * 3 插入多个列族下的多个单元格
  17. */
  18. public class Hbase07_AddData02 {
  19. public static void main(String[] args) throws Exception {
  20. Connection conn = HbaseUtils.getConnection();
  21. // 获取操作的表对象
  22. Table tb_a1 = conn.getTable(TableName.valueOf("tb_a1"));
  23. // 创建Put对象
  24. Put put = new Put("rk001".getBytes());
  25. put.addColumn("cf1".getBytes() , "name".getBytes() , "zss".getBytes()) ;
  26. put.addColumn("cf1".getBytes() , "gender".getBytes() , "M".getBytes()) ;
  27. put.addColumn("cf2".getBytes() , "job".getBytes() , "coder".getBytes()) ;
  28. // 添加数据
  29. tb_a1.put(put);
  30. //释放资源
  31. tb_a1.close();
  32. conn.close();
  33. }
  34. }
  • 插入多行数据
  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.Connection;
  5. import org.apache.hadoop.hbase.client.Put;
  6. import org.apache.hadoop.hbase.client.Table;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. /**
  11. * Author: Hang.Z
  12. * Date: 21/07/03
  13. * Description:
  14. * put 'tb_user' , 'rk' , 'cf:name' , 'zss'
  15. * Put对象 行键 列族 属性 值
  16. * 同时参入多行数据
  17. */
  18. public class Hbase07_AddData03 {
  19. public static void main(String[] args) throws Exception {
  20. Connection conn = HbaseUtils.getConnection();
  21. // 获取操作的表对象
  22. Table tb_a1 = conn.getTable(TableName.valueOf("tb_a1"));
  23. // 创建Put对象
  24. // rk002
  25. Put put2 = new Put("rk002".getBytes());
  26. put2.addColumn("cf1".getBytes() , "name".getBytes() , "lss".getBytes()) ;
  27. put2.addColumn("cf1".getBytes() , "gender".getBytes() , "F".getBytes()) ;
  28. put2.addColumn("cf2".getBytes() , "job".getBytes() , "coder".getBytes()) ;
  29. // rk003
  30. Put put3 = new Put("rk003".getBytes());
  31. put3.addColumn("cf1".getBytes() , "name".getBytes() ,Bytes.toBytes( "王五")) ;
  32. put3.addColumn("cf2".getBytes() , "sal".getBytes() , Bytes.toBytes(22222.22)) ;
  33. List<Put> ls = new ArrayList<>() ;
  34. ls.add(put2) ;
  35. ls.add(put3);
  36. // 添加数据
  37. tb_a1.put(ls);
  38. //释放资源
  39. tb_a1.close();
  40. conn.close();
  41. }
  42. }

1.2 缓存,批次处理数据

  1. BufferedMutator bm = conn.getBufferedMutator(TableName.valueOf("tb_user"));
  2. // 周期性刷写数据到hbase中 , 并不是一行写一次
  3. bm.setWriteBufferPeriodicFlush(2000);
  4. bm.mutate(ls);
  5. bm.flush ;

案例:

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.BufferedMutator;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.Put;
  7. import org.apache.hadoop.hbase.client.Table;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. /**
  12. * Author: Hang.Z
  13. * Date: 21/07/03
  14. * Description:
  15. * put 'tb_user' , 'rk' , 'cf:name' , 'zss'
  16. * BufferedMutator 批次的写入数据
  17. * 1 时间间隔
  18. * 2 缓存大小
  19. * 任何一个条件满足都会执行一次RPC请求批次写
  20. */
  21. public class Hbase07_AddData04 {
  22. public static void main(String[] args) throws Exception {
  23. Connection conn = HbaseUtils.getConnection();
  24. // 获取一个操作数据的对象 , 包含表信息
  25. BufferedMutator bm = conn.getBufferedMutator(TableName.valueOf("tb_user"));
  26. Put put1 = new Put(Bytes.toBytes("rk004"));
  27. put1.addColumn(Bytes.toBytes("cf") ,Bytes.toBytes("name"),Bytes.toBytes("ww3")) ;
  28. Put put2 = new Put(Bytes.toBytes("rk005"));
  29. put2.addColumn(Bytes.toBytes("cf") ,Bytes.toBytes("name"),Bytes.toBytes("ww4")) ;
  30. List<Put> ls = new ArrayList<>() ;
  31. ls.add(put1);
  32. ls.add(put2);
  33. // 周期性刷写数据到hbase中 , 并不是一行写一次
  34. bm.setWriteBufferPeriodicFlush(2000);
  35. // 大小达到阈值 请求写数据
  36. long size = bm.getWriteBufferSize();
  37. System.out.println(size); // 2M 2048K
  38. // 执行插入
  39. bm.mutate(ls);
  40. // 手动强制刷写
  41. // bm.flush();
  42. bm.close();
  43. conn.close();
  44. }
  45. }

1.3 bulkload方式(MR)

将静态数据转换成hfile , 将hfile导入到指定的表中

  • shell脚本

  • MR编程

1 数据

  1. 1,zss,23,BJ
  2. 2,lss,13,NJ
  3. 3,wbb,33,SH
  4. 4,lyy,12,BJ

2 创建hbase的表 create ‘tb_teacher’ , ‘cf’

3 操作的数据在HDFS上

注意参数不要写错:列族名、hdfs的文件路径等。

  1. hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf:name,cf:age,cf:city' -Dimporttsv.bulk.output=/teacher/output tb_teacher /csv/teacher.csv

4 将生成的hfile文件导入到hbase 的指定表中

  1. hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /teacher/output tb_teacher

2 查看表信息

2.1 获取数据(scan一个表)

  1. package com._51doit.hbase.day02;
  2. import org.apache.hadoop.hbase.Cell;
  3. import org.apache.hadoop.hbase.CellUtil;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.*;
  6. import org.apache.hadoop.hbase.util.Bytes;
  7. import java.util.Iterator;
  8. /**
  9. * Author: Hang.Z
  10. * Date: 21/07/03
  11. * Description:
  12. */
  13. public class Hbase08_ReadData {
  14. public static void main(String[] args) throws Exception {
  15. Connection connection = HbaseUtils.getConnection();
  16. // 获取表对象
  17. Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
  18. //所有的数据
  19. ResultScanner results = tb_a1.getScanner(new Scan());
  20. // 获取每行
  21. for (Result result : results) {
  22. // 遍历单元格
  23. while (result.advance()){
  24. // 获取单元格
  25. Cell cell = result.current();
  26. // 获取行键 列族 属性 值
  27. // 工具类 解析CEll单元格
  28. //   rk001cf1gender zi�M
  29. byte[] cloneRow = CellUtil.cloneRow(cell);
  30. byte[] cloneFamily = CellUtil.cloneFamily(cell);
  31. byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
  32. byte[] cloneValue = CellUtil.cloneValue(cell);
  33. System.out.println(
  34. new String(cloneRow)+"-"+
  35. new String(cloneFamily)+"-"+
  36. new String(cloneQualifier)+"-"+
  37. (new String(cloneQualifier).equals("sal") ? Bytes.toDouble(cloneValue):new String(cloneValue))
  38. );
  39. }
  40. }
  41. connection.close();
  42. }
  43. }

2.2 获取某行数据(get)

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.CellUtil;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.*;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8. /**
  9. * Author: Hang.Z
  10. * Date: 21/07/03
  11. * Description:获取第一行的所有数据
  12. */
  13. public class Hbase08_ReadData2 {
  14. public static void main(String[] args) throws Exception {
  15. Connection connection = HbaseUtils.getConnection();
  16. // 获取表对象
  17. Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
  18. Get get = new Get(Bytes.toBytes("rk001"));
  19. // 行数据
  20. Result result = tb_a1.get(get);
  21. HbaseUtils.showData(result);
  22. connection.close();
  23. }
  24. }

2.3 获取某行的某个列族的数据

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. /**
  7. * Author: Hang.Z
  8. * Date: 21/07/03
  9. * Description:
  10. * 扫描表的一个列族的数据
  11. * 总结
  12. * 1 get一行数据
  13. * 2 List<Get></> 多行
  14. * 3 扫描整个表的所有数据
  15. * 4 扫描整个表的某个列族的所有数据
  16. */
  17. public class Hbase08_ReadData3 {
  18. public static void main(String[] args) throws Exception {
  19. Connection connection = HbaseUtils.getConnection();
  20. // 获取表对象
  21. Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
  22. ResultScanner scanner = tb_a1.getScanner("cf2".getBytes());
  23. for (Result result : scanner) {
  24. HbaseUtils.showData(result);
  25. }
  26. connection.close();
  27. }
  28. }

2.4 过滤查看数据

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.*;
  5. /**
  6. * Author: Hang.Z
  7. * Date: 21/07/03
  8. * Description:
  9. * 扫描表的一个列族的数据
  10. * 总结:
  11. * 1 get一行数据
  12. * 2 List<Get></> 多行
  13. * 3 扫描整个表的所有数据
  14. * 4 扫描整个表的某个列族的所有数据
  15. */
  16. public class Hbase08_ReadData4 {
  17. public static void main(String[] args) throws Exception {
  18. Connection connection = HbaseUtils.getConnection();
  19. // 获取表对象
  20. Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
  21. Scan scan = new Scan();
  22. // 设置过滤器 , 过滤符合条件的结果
  23. // scan.setFilter()
  24. // 设置扫描范围
  25. scan.withStartRow("rk002".getBytes()) ;
  26. // scan.withStopRow()
  27. /* Get get = new Get("".getBytes());
  28. get.setFilter()*/
  29. //描整个表的所有数据
  30. ResultScanner scanner = tb_a1.getScanner(scan);
  31. for (Result result : scanner) {
  32. HbaseUtils.showData(result);
  33. }
  34. connection.close();
  35. }
  36. }

3 删除数据

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.Connection;
  5. import org.apache.hadoop.hbase.client.Delete;
  6. import org.apache.hadoop.hbase.client.Table;
  7. /**
  8. * Author: Hang.Z
  9. * Date: 21/07/03
  10. * Put put数据
  11. * new Put("rk01101") ;
  12. * put.addColumn("cf" , "name","zss")
  13. * put.addColumn("cf" , "age","zss")
  14. * put.addColumn("cf" , "gender","zss")
  15. * put.addColumn("cf" , "addr","zss")
  16. * put.addColumn("cf2" , "job","zss")
  17. * Description:
  18. * 删除数据
  19. * 1 删除1行
  20. * 2 删除一个属性
  21. * 3 删除多个属性
  22. */
  23. public class Hbase09_DeleteData {
  24. public static void main(String[] args) throws Exception {
  25. Connection conn = HbaseUtils.getConnection();
  26. Table tb_teacher = conn.getTable(TableName.valueOf("tb_teacher"));
  27. Delete delete = new Delete("2".getBytes());
  28. // 设置 列族下的属性
  29. delete.addColumn("cf".getBytes(),"name".getBytes()) ;
  30. delete.addColumn("cf".getBytes(),"city".getBytes()) ;
  31. tb_teacher.delete(delete);
  32. conn.close();
  33. }
  34. }

4 创建名称空间

  1. package doit.day03;
  2. import doit.day02.HbaseUtils;
  3. import org.apache.hadoop.hbase.NamespaceDescriptor;
  4. import org.apache.hadoop.hbase.client.Admin;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
  7. /**
  8. * Author: Hang.Z
  9. * Date: 21/07/03
  10. * Description:
  11. */
  12. public class Hbase11_CreateNS {
  13. public static void main(String[] args) throws Exception {
  14. Connection conn = HbaseUtils.getConnection();
  15. Admin admin = conn.getAdmin();
  16. NamespaceDescriptor.Builder builder = NamespaceDescriptor.create("ns24");
  17. NamespaceDescriptor namespaceDescriptor = builder.build();
  18. admin.createNamespace(namespaceDescriptor);
  19. admin.close();
  20. conn.close();
  21. }
  22. }

二、快照

  1. Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot
  1. snapshot 'tb_teacher' , 'teacher_20210703' -- 创建快照
  2. restore_snapshot 'teacher_20210703' -- 恢复快照
  3. clone_snapshot 'teacher_20210703' , 'ns24:new_tb_teacher' -- 备份数据
  4. list_snapshots -- 查看当前所有的快照

三、写数据流程

java-api - 图1

  1. 客户端请求写数据
  2. zookeeper收到请求,根据要写入的行号,返回该数据要请求的regionserver
  3. regionserver收到请求返回(META)元数据给客户端,到客户端的缓存中
  4. 客户端解析定位到将数据插入到哪个region,请求向它写数据
  5. 每个列族对应一个cf-store类来写入,cfstore在memorystore中不断生成StoreFile对象,同时将数据进行了排序,同时将写入的操作写入日志中。
  6. 当添加数据到一定程度,storefile对象将会上传(刷新)到hdfs中生成hfile文件,每个对象生成一个hfile。

flush的条件(常问)

  1. 内存数据达到128M
  2. 手动强制刷新
  3. 操作次数/操作时间达到阈值
  4. 当前节点整体的内存达到阈值

什么时候才算更新了一次数据?(面试题)

  1. 假如对同一个数据put,delete先后两个操作,会在hdfs中写入两个hfile文件,当更新或者删除数据的时候,会将这俩个文件合并成一个hfile文件,此时才是真正的跟新了一次数据。

四、读数据的流程

java-api - 图2

java-api - 图3

  1. 客户端请求get rowkey的值,向zookeeper请求
  2. zookeeper返回给客户端数据所在的机器
  3. 客户端向该机器请求下载META数据
  4. regionserver返回给客户端META数据,客户端写到缓存中
  5. 客户端解析META数据,定位到具体的region
  6. 开始读数据,查看cfstore的memorystore中查看是否有该数据,如果没有同时从本机的缓存和hdfs中查找该数据。
  7. hdfs查看中查找数据利用了布隆过滤器,可以快速筛选出可能有该数据的Hfile文件。布隆过滤器的基本原理是,在生成hfile时,使用很小的空间,存入一个字节数组,根据行号取哈希地址,此地址对应的值设置为1,其他没有值得设置为0。在用户查看当前hfile中是否有想要的rowkey的时候,根据行号计算出哈希值,可以直接判断出该文件中是否有该行,如果没有此行号,那么这个文件一定没有这个值,如果有进一步查看hfile的内容
  8. 进一步查看hfile中rowkey对应的值,hfile文件生成的时候,在对数据块排序后,对他们加上了索引,存储了一个rowkey-index的哈希表,过滤器得到rowkey后,直接根据这个哈希表得到值得索引,进而返回rowkey对应的数据。
  9. 由于可能对同一行的数据做多个操作,就会生成多个hfile文件,同一个数据的不同操作也就对应有多个数据块,此时,读取数据的时候,会将region缓存中的数据块和hdfs中的数据块合并,然后在新的hfile中返回数据

hfile文件的生成过程

  1. memorystore刷新操作,生成hfile文件,发送到hdfs上。
  2. 生成过程:
  3. 首先对数据排序:rowkey、列族、属性、值
  4. 封装成数据块
  5. 生成rowkey-index的索引表
  6. 生成布隆过滤器:rowkey-标记数据

五、数据的合并

由于每次对数据操作一次,都会生成一个hfile文件,hdfs端会在空闲时间,把多个hfile文件进行合并。

最后,如果合并之后,多个region中的数据分配的不合理,会自动的合并region。

合并过程

  1. 1. HDFS底层数据以regionname为文件夹
  2. 2. 原来的region文件夹不存在,生成新的文件夹
  3. 3. 内部hfile的移动

shell

  1. #合并region
  2. merge_region '28eea66d36680849e4abdbfb36d25f7a','04980472336896c55bd96bae4271a909'
  3. # 移动region
  4. move '04980472336896c55bd96bae4271a909' , 'linux02,16020,1625467987938'
  5. # 自动负载均衡
  6. balancer