Docker-HBase

1、安装

  1. docker pull harisekhon/hbase
  2. docker run -d -h hbase \
  3. -p 2181:2181 \
  4. -p 9090:9090 \
  5. -p 9095:9095 \
  6. -p 16000:16000 \
  7. -p 16010:16010 \
  8. -p 16020:16020 \
  9. -p 16201:16201 \
  10. -p 16301:16301 \
  11. --name hbase \
  12. --restart=always \
  13. --network main \
  14. harisekhon/hbase
  15. http://192.168.56.120:16010

2、Springboot

  • yaml

    1. hbase:
    2. zookeeper:
    3. quorum: 192.168.56.120
    4. property:
    5. clientPort: 2181
  • Maven

    1. <dependency>
    2. <groupId>org.apache.hbase</groupId>
    3. <artifactId>hbase-client</artifactId>
    4. <version>2.4.4</version>
    5. </dependency>
  • HbaseUtils ```java import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.; import org.apache.hadoop.hbase.client.; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;

import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.*;

/**

  • @author fengyuhao
  • @Description */ @Component public class HbaseTemplate {

    private final Connection con; private Admin admin = null;

    private HbaseTemplate(@Value(“${hbase.zookeeper.quorum}”) String zookeeperQuorum,

    1. @Value("${hbase.zookeeper.property.clientPort}") String clientPort) throws IOException {
    2. Configuration conf = HBaseConfiguration.create();
    3. conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
    4. conf.set("hbase.zookeeper.property.clientPort", clientPort);
    5. con = ConnectionFactory.createConnection(conf);

    }

    /**

    • 创建表 *
    • @param tableName 表名称
    • @param cf 列族 */ public void createTable(String tableName, String[] cf) { TableName tb = TableName.valueOf(tableName); try {

      1. admin = con.getAdmin();
      2. if (admin.tableExists(tb)) {
      3. System.out.println("talbe is exists!");
      4. } else {
      5. TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tb);
      6. List<ColumnFamilyDescriptor> list = new ArrayList<>();
      7. // 添加列族
      8. for (String entry : cf) {
      9. list.add(ColumnFamilyDescriptorBuilder.newBuilder(entry.getBytes(StandardCharsets.UTF_8)).build());
      10. }
      11. tableDescriptorBuilder.setColumnFamilies(list);
      12. admin.createTable(tableDescriptorBuilder.build());
      13. }

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      /**

    • 查看已有表 */ public TableName[] listTables() { TableName[] tableNames = new TableName[0]; try {

      1. admin = con.getAdmin();
      2. tableNames = admin.listTableNames();

      } catch (IOException e) {

      1. e.printStackTrace();

      } return tableNames; }

      /**

    • 删除表 */ public void deleteTable(String tableName) { try {

      1. TableName tn = TableName.valueOf(tableName);
      2. admin = con.getAdmin();
      3. if (admin.tableExists(tn)) {
      4. admin.disableTable(tn);
      5. admin.deleteTable(tn);
      6. }
      7. admin.close();

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      /**

    • 添加(更新)一条记录 *
    • @param tableName 表名
    • @param rowkey rowkey
    • @param cf columnfamily
    • @param column column
    • @param value value */ public void put(String tableName, String rowkey, String cf, String column, String value) { try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Put put = new Put(Bytes.toBytes(rowkey));
      3. put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
      4. table.put(put);
      5. table.close();

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      /**

    • 批量添加(更新)数据 *
    • @param tableName 表名
    • @param data 插入的数据,
    • Map 结构 —>Map(rowkey,Map(cf:column,value)) */ public void batchPut(String tableName, Map> data) { try { ObjectMapper objectMapper = new ObjectMapper(); Table table = con.getTable(TableName.valueOf(tableName)); List list = new ArrayList<>(); for (Map.Entry> entry : data.entrySet()) { System.out.println(entry.getKey()); // rowkey for (Map.Entry map : entry.getValue().entrySet()) { Put put = new Put(Bytes.toBytes(entry.getKey())); String[] keys = map.getKey().split(“:”); if (keys.length > 1) { put.addColumn(Bytes.toBytes(keys[0]), Bytes.toBytes(keys[1]), Bytes.toBytes(objectMapper.writeValueAsString(map.getValue()))); } else { put.addColumn(Bytes.toBytes(keys[0]), null, Bytes.toBytes(objectMapper.writeValueAsString(map.getValue()))); } list.add(put); } } table.put(list); table.close(); } catch (IOException e) { e.printStackTrace(); } }

      /**

    • 全表扫描 *
    • @param tableName 表名 */ public List> scanTable(String tableName) { TableName tb = TableName.valueOf(tableName); ResultScanner scanner = null; try {

      1. Table table = con.getTable(tb);
      2. Scan scan = new Scan();
      3. scanner = table.getScanner(scan);
      4. table.close();

      } catch (IOException e) {

      1. e.printStackTrace();

      } List> list = new ArrayList<>(); if (scanner != null) {

      1. for (Result result : scanner) {
      2. Map<String, Object> map = new HashMap<>(8);
      3. //展示数据
      4. for (Cell cell : result.rawCells()) {
      5. map.put("rowKey", Bytes.toString(CellUtil.cloneRow(cell)));
      6. map.put("family", Bytes.toString(CellUtil.cloneFamily(cell)));
      7. map.put("column", Bytes.toString(CellUtil.cloneQualifier(cell)));
      8. map.put("value", Bytes.toString(CellUtil.cloneValue(cell)));
      9. }
      10. list.add(map);
      11. }

      } return list; }

      /**

    • 根据 rowkey 查询一条数据 *
    • @param tableName 表
    • @param rowKey rewkey */ public Result getByRowkey(String tableName, String rowKey) { try {
      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
      3. Result result = table.get(get);
      4. table.close();
      5. if (!result.isEmpty()) {
      6. return result;
      7. }
      } catch (IOException e) {
      1. e.printStackTrace();
      } return null; } /**
    • 根据 rowkey 查询一条数据 *
    • @param tableName 表
    • @param rowKey rewkey
    • @param cf columnfamily */ public Result getByRowkey(String tableName, String rowKey,String cf) { try {
      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
      3. get.addFamily(cf.getBytes(StandardCharsets.UTF_8));
      4. Result result = table.get(get);
      5. table.close();
      6. if (!result.isEmpty()) {
      7. return result;
      8. }
      } catch (IOException e) {
      1. e.printStackTrace();
      } return null; } /**
    • 根据 rowkey 查询一条数据 *
    • @param tableName 表
    • @param rowKey rewkey
    • @param cf columnfamily
    • @param column column */ public Result getByRowkey(String tableName, String rowKey,String cf,String column) { try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Get get = new Get(rowKey.getBytes(StandardCharsets.UTF_8));
      3. get.addColumn(cf.getBytes(StandardCharsets.UTF_8),column.getBytes(StandardCharsets.UTF_8));
      4. Result result = table.get(get);
      5. table.close();
      6. if (!result.isEmpty()) {
      7. return result;
      8. }

      } catch (IOException e) {

      1. e.printStackTrace();

      } return null; }

      /**

    • rowkey 模糊查询
    • @param tableName 表
    • @param pattern 正则表达式 */ public Iterator getByRowkeyPattern(String tableName,String pattern){ try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Scan scan = new Scan();
      3. RowFilter mykey = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(pattern));
      4. scan.setFilter(mykey);
      5. Iterator<Result> iterator = table.getScanner(scan).iterator();
      6. table.close();
      7. return iterator;

      } catch (IOException e) {

      1. e.printStackTrace();

      } return null; }

      /**

    • rowkey 范围查询 ( start < result < end)
    • @param tableName 表
    • @param start 开始 key
    • @param end 结束 key */ public Iterator getByRowkeyRange(String tableName,String start,String end){ try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. List<Filter> list = new ArrayList<>();
      3. Scan scan = new Scan();
      4. RowFilter mykey1 = new RowFilter(CompareOperator.GREATER, new BinaryComparator(start.getBytes(StandardCharsets.UTF_8)));
      5. list.add(mykey1);
      6. RowFilter mykey2 = new RowFilter(CompareOperator.LESS, new BinaryComparator(end.getBytes(StandardCharsets.UTF_8)));
      7. list.add(mykey2);
      8. scan.setFilter(new FilterList(list));
      9. Iterator<Result> iterator = table.getScanner(scan).iterator();
      10. table.close();
      11. return iterator;

      } catch (IOException e) {

      1. e.printStackTrace();

      } return null; }

      /**

    • 时间戳范围查询 ( start <= result < end)
    • @param tableName 表
    • @param start 开始 key
    • @param end 结束 key */ public Iterator getByTimestamp(String tableName,Long start,Long end){ try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Scan scan = new Scan();
      3. scan.setTimeRange(start,end);
      4. Iterator<Result> iterator = table.getScanner(scan).iterator();
      5. table.close();
      6. return iterator;

      } catch (IOException e) {

      1. e.printStackTrace();

      } return null; }

      /**

    • 根据 rowkey删除一条记录
    • @param tableName 表
    • @param rowKey rewkey */ public void deleteByRowkey(String tableName, String rowKey){ try {

      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. Delete delete = new Delete(rowKey.getBytes(StandardCharsets.UTF_8));
      3. table.delete(delete);
      4. table.close();

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      /**

    • 根据 rowkey 删除多条记录
    • @param tableName 表
    • @param rowKey rewkey */ public void batchDeleteByRowkey(String tableName, String[] rowKey){ try {
      1. Table table = con.getTable(TableName.valueOf(tableName));
      2. List<Delete> list = new ArrayList<>();
      3. for(String s : rowKey){
      4. Delete delete = new Delete(s.getBytes(StandardCharsets.UTF_8));
      5. list.add(delete);
      6. }
      7. table.delete(list);
      8. table.close();
      } catch (IOException e) {
      1. e.printStackTrace();
      } } }
  1. - Test
  2. ```java
  3. import com.cug.demo1.utils.HbaseUtil;
  4. import org.apache.hadoop.hbase.CellUtil;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.Result;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8. import org.junit.jupiter.api.Test;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import java.util.*;
  11. @SpringBootTest
  12. class Demo1ApplicationTests {
  13. /**
  14. * 创建表
  15. */
  16. @Test
  17. public void createTable(){
  18. String[] cf = new String[2];
  19. cf[0] = "info";
  20. cf[1] = "address";
  21. HbaseUtil.createTable("test",cf);
  22. }
  23. /**
  24. * 查看表信息
  25. */
  26. @Test
  27. public void tableList(){
  28. TableName[] tableNames = HbaseUtil.listTables();
  29. for(TableName tableName: tableNames){
  30. System.out.println(tableName.getNameAsString());
  31. }
  32. }
  33. /**
  34. * 删除表
  35. */
  36. @Test
  37. public void deleteTable(){
  38. HbaseUtil.deleteTable("test");
  39. }
  40. /**
  41. * 添加一条数据
  42. */
  43. @Test
  44. public void put(){
  45. HbaseUtil.put("tbl_user","keyss","info","name","zhangsan");
  46. }
  47. /**
  48. * 批量添加数据
  49. */
  50. @Test
  51. public void batchPut(){
  52. Map<String,Map<String,Object>> data = new HashMap<>(8);
  53. Map<String,Object> list = new HashMap<>(4);
  54. list.put("address","zhangsan");
  55. Map<String,Object> list1 = new HashMap<>(4);
  56. list1.put("address","lisi");
  57. data.put("mykeys3",list);
  58. data.put("mykeys4",list1);
  59. HbaseUtil.batchPut("tbl_user",data);
  60. }
  61. /**
  62. * 查询数据
  63. */
  64. @Test
  65. public void getByRowkey(){
  66. // 根据 rowkey 查询一条数据
  67. System.out.println(HbaseUtil.getByRowkey("tbl_user", "mykeys1"));
  68. // rowkey 模糊查询
  69. Iterator<Result> tbl_user = HbaseUtil.getByRowkeyPattern("tbl_user", "mykey*");
  70. if(tbl_user != null){
  71. while (tbl_user.hasNext()){
  72. Result next = tbl_user.next();
  73. System.out.println(Bytes.toString(CellUtil.cloneRow(next.rawCells()[0])));
  74. System.out.println(Bytes.toString(CellUtil.cloneFamily(next.rawCells()[0])));
  75. System.out.println(Bytes.toString(CellUtil.cloneQualifier(next.rawCells()[0])));
  76. System.out.println(Bytes.toString(CellUtil.cloneValue(next.rawCells()[0])));
  77. }
  78. }
  79. // rowkey 范围
  80. System.out.println(HbaseUtil.getByRowkeyRange("tbl_user", "mykeys", "mykeys2"));
  81. // 时间戳范围查询
  82. System.out.println(HbaseUtil.getByTimestamp("tbl_user", 1626570049074L, 1626570273856L));
  83. }
  84. }

3、Snapshot

  1. ## 创建快照,数据保存在 --> hdfs:xxx/hbase/.hbase-snapshot/ docker容器中,默认 /hbase-data/.hbase-snapshot/
  2. hbase> snapshot 'src_table', 'snapshot_src_table'
  3. ## 查看快照
  4. hbase> list_snapshots
  5. ## 删除快照
  6. hbase> delete_snapshot 'snapshot_src_table'
  7. ## 数据迁移
  8. hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot snapshot_src_table -copy-to hdfs://192.168.1.132:8020/hbase -mappers 16 -bandwidth 20
  9. ## 数据恢复
  10. hbase> clone_snapshot 'snapshot_src_table' , 'new_table_name'