HBase-API-ForJava-简介

资料 https://hbase.apache.org/apidocs/index.html http://hbase.apache.org/book.html#external_apis

1、使用Java创建,修改和删除一个表-示例

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.HColumnDescriptor;
  6. import org.apache.hadoop.hbase.HConstants;
  7. import org.apache.hadoop.hbase.HTableDescriptor;
  8. import org.apache.hadoop.hbase.TableName;
  9. import org.apache.hadoop.hbase.client.Admin;
  10. import org.apache.hadoop.hbase.client.Connection;
  11. import org.apache.hadoop.hbase.client.ConnectionFactory;
  12. import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
  13. public class Example {
  14. private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";
  15. private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";
  16. public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
  17. if (admin.tableExists(table.getTableName())) {
  18. admin.disableTable(table.getTableName());
  19. admin.deleteTable(table.getTableName());
  20. }
  21. admin.createTable(table);
  22. }
  23. public static void createSchemaTables(Configuration config) throws IOException {
  24. try (Connection connection = ConnectionFactory.createConnection(config);
  25. Admin admin = connection.getAdmin()) {
  26. HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
  27. table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));
  28. System.out.print("Creating table. ");
  29. createOrOverwrite(admin, table);
  30. System.out.println(" Done.");
  31. }
  32. }
  33. public static void modifySchema (Configuration config) throws IOException {
  34. try (Connection connection = ConnectionFactory.createConnection(config);
  35. Admin admin = connection.getAdmin()) {
  36. TableName tableName = TableName.valueOf(TABLE_NAME);
  37. if (!admin.tableExists(tableName)) {
  38. System.out.println("Table does not exist.");
  39. System.exit(-1);
  40. }
  41. HTableDescriptor table = admin.getTableDescriptor(tableName);
  42. // Update existing table
  43. HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");
  44. newColumn.setCompactionCompressionType(Algorithm.GZ);
  45. newColumn.setMaxVersions(HConstants.ALL_VERSIONS);
  46. admin.addColumn(tableName, newColumn);
  47. // Update existing column family
  48. HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);
  49. existingColumn.setCompactionCompressionType(Algorithm.GZ);
  50. existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);
  51. table.modifyFamily(existingColumn);
  52. admin.modifyTable(tableName, table);
  53. // Disable an existing table
  54. admin.disableTable(tableName);
  55. // Delete an existing column family
  56. admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));
  57. // Delete a table (Need to be disabled first)
  58. admin.deleteTable(tableName);
  59. }
  60. }
  61. public static void main(String... args) throws IOException {
  62. Configuration config = HBaseConfiguration.create();
  63. //Add any necessary configuration files (hbase-site.xml, core-site.xml)
  64. config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));
  65. config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));
  66. createSchemaTables(config);
  67. modifySchema(config);
  68. }
  69. }

2、调用API,对hbase表进行保存|更新|删除操作

  1. 1、新增保存操作
  2. HTable hTable = new HTable(configInit(), Bytes.toBytes(tableName));
  3. //HTablePool hTablePool = new HTablePool(configInit(), 300);
  4. try {
  5. Put put = new Put(Bytes.toBytes(rowKey));
  6. put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(saveValue));
  7. //hTablePool.getTable(tableName).put(put);
  8. hTable.put(put);
  9. return true;
  10. } catch (Exception e) {
  11. log.error("save error:{}, columnQualifier:{}, saveValue:{} ", e.getMessage(), columnFamily, saveValue);
  12. } finally {
  13. //hTablePool.closeTablePool(tableName);
  14. Table.closeScanner(hTable, null);
  15. }
  16. 2、更新操作
  17. HTable hTable = new HTable(configInit(), TABLE_NAME);
  18. try {
  19. Put put = new Put(Bytes.toBytes(rowKey));
  20. put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(updateValue));
  21. hTable.put(put);
  22. return true;
  23. } catch (Exception e) {
  24. log.error("delete " + tableName + " error. save=" + columnQualifier, e);
  25. } finally {
  26. Table.closeScanner(hTable, null);
  27. }
  28. 3、删除操作
  29. HTable hTable = new HTable(configInit(), tableName);
  30. try {
  31. hTable.delete(delete);
  32. return true;
  33. } catch (Exception e) {
  34. log.error("delete " + tableName + " error. delete=" + delete, e);
  35. } finally {
  36. Table.closeScanner(hTable, null);
  37. }

3、使用hbaseTemplate, 对hbase表进行新增保存|更新|删除|查询操作

  1. 1、新增保存操作
  2. hbaseTemplate.execute(tableName, table -> {
  3. boolean flag = false;
  4. try{
  5. byte[] rowkey = rowKey.getBytes();
  6. Put put = new Put(rowkey);
  7. put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));
  8. table.put(put);
  9. flag = true;
  10. }catch(Exception e){
  11. log.error("保存hbase表数据失败,异常:{}, 列名:{}, 列值:{}", e.getMessage(), tagCode, tagValue);
  12. }
  13. return flag;
  14. });
  15. 2、更新操作:跟新增保存一样,会自动覆盖值
  16. hbaseTemplate.execute(tableName, table -> {
  17. boolean flag = false;
  18. try{
  19. byte[] rowkey = rowKey.getBytes();
  20. Put put = new Put(rowkey);
  21. put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));
  22. table.put(put);
  23. flag = true;
  24. }catch(Exception e){
  25. log.error("更新hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);
  26. }
  27. return flag;
  28. });
  29. 3、删除操作
  30. hbaseTemplate.execute(tableName, table -> {
  31. boolean flag = false;
  32. try{
  33. byte[] rowkey = rowKey.getBytes();
  34. //指定删除的rowKey
  35. Delete delete = new Delete(rowkey);
  36. //指定删除的列和列族
  37. delete.deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(tagCode));
  38. table.delete(delete);
  39. flag = true;
  40. }catch(Exception e){
  41. log.error("删除hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);
  42. }
  43. return flag;
  44. });
  45. 4、查询操作
  46. 4-1、点查查询
  47. hbaseTemplate.execute(tableName, table -> {
  48. Get get = new Get(Bytes.toBytes(rowName));
  49. //根据列族和列(接口参数传入的列)获取指定列的数据
  50. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnParam));
  51. Result result = null;
  52. try {
  53. result = table.get(get);
  54. } catch (IOException e) {
  55. e.getStackTrace();
  56. }
  57. List<KeyValue> keyValueList = result.list();
  58. if (!CollectionUtils.isEmpty(keyValueList)) {
  59. for (KeyValue keyValue : keyValueList) {
  60. String keyColumn = Bytes.toString(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
  61. }
  62. return dataMap;
  63. } else {
  64. return Collections.EMPTY_MAP;
  65. }
  66. });
  67. 4-2scan查询
  68. Scan scan = new Scan();
  69. //行键开始范围
  70. scan.setStartRow(Bytes.toBytes(ufeScanStart));
  71. //行键结束范围
  72. scan.setStopRow(Bytes.toBytes(ufeScanEnd));
  73. //最大结果数(客户端缓存的最大字节数)
  74. scan.setMaxResultSize(ufeScanLimit);
  75. //是否反向扫描(默认是正向扫描(false)[按照字典顺序从小到大的顺序读取的])
  76. scan.setReversed(ufeScanReversed);
  77. //......
  78. hbaseTemplate.find(tableName, scan, results -> {
  79. List<Map<String, Object>> resultList = new ArrayList<>();
  80. Iterator<Result> iterator = results.iterator();
  81. while (iterator.hasNext()) {
  82. Result result = iterator.next();
  83. List<KeyValue> keyValueScanList = result.list();
  84. //业务逻辑
  85. }
  86. }
  87. return resultList;
  88. });