API简介

几个主要 Hbase API 类(以HBase-1.2.5版本为例)和数据模型之间的对应关系:

Java类 HBase数据模型
HBaseAdmin 数据库(DataBase)
HBaseConfiguration
HTable 表(Table)
HTableDescriptor 列簇(Column Family)
HColumnDescriptor 列(Column)
Put 列修饰符(Column Qualifier)
Get
Scanner

01. HBaseAdmin

  • 类名:org.apache.hadoop.hbase.client.HBaseAdmin
  • 作用:提供了一个接口来管理 HBase 数据库的表信息。它提供的方法包括:创建表,删 除表,列出表项,使表有效或无效,以及添加或删除表列族成员等。 | 返回值 | 函数 | 描述 | | —- | —- | —- | | void | addColumn(String tableName,HColumnDescriptor column) | 向一个已经存在的表添加列 | | void | checkHBaseAvailable(HBaseConfiguration conf) | 静态函数,查看HBase是否处于运行状态 | | void | createTable(HTableDescriptor desc) | 创建一个表(同步操作) | | void | deleteTable(byte[] tableName) | 删除一个已经存在的表 | | void | enableTable(byte[] tableName) | 使表处于有效状态 | | void | disableTable(byte[] tableName) | 使表处于无效状态 | | HTableDescriptor[] | listTables() | 列出所有列表项 | | void | modifyTable(byte[] tableName,HTableDescriptor htd) | 修改表的模式(异步操作) | | boolean | tableExists(String tableName) | 检查表是否存在 |

  • 用法示例

    1. HBaseAdmin admin = new HBaseAdmin(config);
    2. admin.disableTable("tableName");

    02. HBaseConfiguration

  • 类名:org.apache.hadoop.hbase.HBaseConfiguration

  • 作用:对 HBase 进行配置 | 返回值 | 函数 | 描述 | | —- | —- | —- | | void | addResource(Path file) | 通过给定的路径所指的文件来添加资源 | | void | clear() | 清空所有已设置的属性 | | String | get(String name) | 获取属性名对应的值 | | String | getBoolean(String name,boolean defaultValue) | 获取为boolean类型的属性值,如果其属性值类型不为boolean,则返回默认属性值 | | void | set(String name,String value) | 通过属性名来设置值 | | void | setBoolean(String name,boolean value) | 设置boolean类型的属性值 |

  • 用法示例

    1. HBaseConfiguration hconfig = new HBaseConfiguration();
    2. # 设置zk端口为2181。一般情况下,HBaseCOnfiguration会使用构造函数进行初始化,然后再使用其他方法。
    3. hconfig.set("hbase.zookeeper.property.clientPort","2181");

    03. HTableDescriptor

  • 类名:org.apache.hadoop.hbase.HTableDescriptor

  • 作用:包含了表的名字极其对应表的列族 | 返回值 | 函数 | 描述 | | —- | —- | —- | | void | addFamily(HColumnDescriptor column) | 添加一个列簇 | | HColumnDescriptor | removeFamily(byte[] column) | 移除一个列簇 | | byte[] | getName() | 获取表的名称 | | byte[] | getValue(byte[] key) | 获取属性的值 | | void | setValue(String key,String value) | 设置属性的值 |

  • 用法示例

    1. HTableDescriptor htd = new HTableDescriptor(tableName);
    2. # 添加列簇
    3. htd.addFamily(new HColumnDescriptor("family"));

    04. HColumnDescriptor

  • 类名:org.apache.hadoop.hbase.HColumnDescriptor

  • 作用:维护着关于列族的信息,例如版本号,压缩设置等。它通常在创建表或者为表添 加列族的时候使用。列族被创建后不能直接修改,只能通过删除然后重新创建的方式。列族被删除的时候,列族里面的数据也会同时被删除。 | 返回值 | 函数 | 描述 | | —- | —- | —- | | byte[] | getName() | 获取列簇的名称 | | byte[] | getValue(byte[] key) | 获取属性的值 | | void | setValue(String key,String value) | 设置属性的值 |

  • 用法示例

    1. HTableDescriptor htd = new HTableDescriptor(tableName);
    2. HColumnDescriptor col = new HColumnDescriptor("family2");
    3. # 添加列簇family2
    4. htd.addFamily(col);

    05. HTable

  • 类名:org.apache.hadoop.hbase.client.HTable

  • 作用:可以用来和 HBase 表直接通信。此方法对于更新操作来说是非线程安全的。 | 返回值 | 函数 | 描述 | | —- | :—- | :—- | | void | checkAdnPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put | 自动的检查row/family/qualifier是否与给定的值匹配 | | void | close() | 释放所有的资源或挂起内部缓冲区中的更新 | | Boolean | exists(Get get) | 检查Get实例所指定的值是否存在于HTable的列中 | | Result | get(Get get) | 获取指定行的某些单元格所对应的值 | | byte[][] | getEndKeys() | 获取当前一打开的表每个区域的结束键值 | | ResultScanner | getScanner(byte[] family) | 获取当前给定列族的scanner实例 | | HTableDescriptor | getTableDescriptor() | 获取当前表的HTableDescriptor实例 | | byte[] | getTableName() | 获取表名 | | static boolean | isTableEnabled(HBaseConfiguration conf, String tableName) | 检查表是否有效 | | void | put(Put put) | 向表中添加值 |

  • 用法示例

    1. HTable table = new HTable(conf, Bytes.toBytes(tableName));
    2. ResultScanner scanner = table.getScanner(family);

    06. Put

  • 类名:org.apache.hadoop.hbase.client.Put

  • 作用:用来对单个行执行添加操作 | 返回值 | 函数 | 描述 | | —- | —- | —- | | Put | add(byte[] family, byte[] qualifier, byte[] value) | 将指定的列和对应的值添加到Put实例中 | | Put | add(byte[] family, byte[] qualifier, long ts, byte[] value) | 将指定的列和对应的值及时间戳添加到Put实例中 | | byte[] | getRow() | 获取Put实例的行 | | RowLock | getRowLock() | 获取Put实例的行锁 | | long | getTimeStamp() | 获取Put实例的时间戳 | | boolean | isEmpty() | 检查familyMap是否为空 | | Put | setTimeStamp(long timeStamp) | 设置Put实例的时间戳 |

  • 用法示例

    1. HTable table = new HTable(conf,Bytes.toBytes(tableName));
    2. Put p = new Put(brow); // 为指定行创建一个Put操作
    3. p.add(family,qualifier,value);
    4. table.put(p);

    07. Get

  • 类名:org.apache.hadoop.hbase.client.Get

  • 作用:用来获取单个行的相关信息 | 返回值 | 函数 | 描述 | | —- | —- | —- | | Get | addColumn(byte[] family, byte[] qualifier) | 获取指定列族和列修饰符对应的列 | | Get | addFamily(byte[] family) | 通过指定的列族获取其对应列的所有列 | | Get | setTimeRange(long minStamp,long maxStamp) | 获取指定取件的列的版本号 | | Get | setFilter(Filter filter) | 当执行Get操作时设置服务器端的过滤器 |

  • 用法示例

    1. HTable table = new HTable(conf, Bytes.toBytes(tableName));
    2. Get g = new Get(Bytes.toBytes(row));

    08. Result

  • 类名:org.apache.hadoop.hbase.client.Result

  • 作用:存储 Get 或者 Scan 操作后获取表的单行值。使用此类提供的方法可以直接获取值 或者各种 Map 结构( key-value 对) | 返回值 | 函数 | 描述 | | —- | —- | —- | | boolean | containsColumn(byte[] family,byte[] qualifier) | 检查指定的列是否存在 | | NavigableMap | getFamilyMap(byte[] family) | 获取对应列簇所包含的修饰符与值的键值对 | | byte[] | getValue(byte[] family,byte[] qualifier) | 获取对应列的最新值 |

HBase示例

pom.xml依赖:

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>1.2.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-server</artifactId>
  9. <version>1.2.5</version>
  10. </dependency>

01. 获取连接对象

  1. public static Configuration conf;
  2. static{
  3. // 使用HBaseConfiguration的单例方法实例化
  4. conf = HBaseConfiguration.create();
  5. conf.set("hbase.zookeeper.quorum", "192.168.0.101,192.168.0.102,192.168.0.103");
  6. conf.set("hbase.zookeeper.property.clientPort", "2181");
  7. }
  8. /**
  9. * 获得链接
  10. * @return
  11. */
  12. public static synchronized Connection getConnection() {
  13. try {
  14. if (conn == null || conn.isClosed()) {
  15. conn = ConnectionFactory.createConnection(conf);
  16. }
  17. } catch (IOException e) {
  18. System.err.println("HBase 建立链接失败 "+ e.getMessage());
  19. }
  20. return conn;
  21. }

02. 判断表是否存在(DDL)

  1. /**
  2. * 判断表是否存在(DDL)
  3. * create 't1', 'cf1', 'cf2'
  4. * @param tableName 表名
  5. * @return 是否创建成功
  6. * @throws IOException
  7. */
  8. public static boolean isTableExist(String tableName) throws IOException {
  9. boolean result = false;
  10. // 在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
  11. Connection connection = ConnectionFactory.createConnection(conf);
  12. HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
  13. try {
  14. result = admin.tableExists(tableName);
  15. } finally {
  16. admin.close();
  17. }
  18. return result;
  19. }

03. 创建表(DDL)

  1. public static void createTable(String tableName, String... columnFamily) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
  4. //判断表是否存在
  5. if (isTableExist(tableName)) {
  6. System.out.println("Table:" + tableName + " already exists!");
  7. } else {
  8. // 创建表属性对象,表名需要转字节
  9. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  10. // 创建多个列族
  11. for (String cf : columnFamily) {
  12. descriptor.addFamily(new HColumnDescriptor(cf));
  13. }
  14. // 根据对表的配置,创建表
  15. admin.createTable(descriptor);
  16. System.out.println("Table:" + tableName + " create successfully!");
  17. }
  18. }

04. 新增记录(DML)

  1. public static void addRowData(String tableName, String rowKey, String columnFamily, String
  2. column, String value) throws IOException {
  3. // 创建HTable对象
  4. //HTable hTable = new HTable(conf, tableName);
  5. Connection connection = ConnectionFactory.createConnection(conf);
  6. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  7. // 向表中插入数据
  8. Put put = new Put(Bytes.toBytes(rowKey));
  9. // 向Put对象中组装数据
  10. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
  11. hTable.put(put);
  12. hTable.close();
  13. System.out.println("insert successfully!");
  14. }

05. 全表扫描(Scan)

  1. public static void getAllRows(String tableName) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  4. // 得到用于扫描region的对象
  5. Scan scan = new Scan();
  6. // 使用HTable得到resultcanner实现类的对象
  7. ResultScanner resultScanner = hTable.getScanner(scan);
  8. for(Result result : resultScanner){
  9. Cell[] cells = result.rawCells();
  10. for(Cell cell : cells){
  11. System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell))+"\t");
  12. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell))+"\t");
  13. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))+"\t");
  14. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  15. System.out.println();
  16. }
  17. }
  18. }

06. 获取单行记录(get)

  1. public static void getRow(String tableName, String rowKey) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  4. Get get = new Get(Bytes.toBytes(rowKey));
  5. //get.setMaxVersions(); // 显示所有版本
  6. //get.setTimeStamp(); // 显示指定时间戳的版本
  7. Result result = hTable.get(get);
  8. for(Cell cell : result.rawCells()){
  9. System.out.print("RowKey:" + Bytes.toString(result.getRow())+"\t");
  10. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell))+"\t");
  11. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))+"\t");
  12. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell))+"\t");
  13. System.out.print("Timestamp:" + cell.getTimestamp());
  14. System.out.println();
  15. }
  16. }

07. 获取多行记录(get)

  1. /**
  2. * 获取多行记录(get)
  3. * @param tableName 表名
  4. * @param rows 行键(可扩展参数)
  5. * @throws IOException
  6. */
  7. public static void getRow(String tableName, String... rows) throws IOException {
  8. Connection connection = ConnectionFactory.createConnection(conf);
  9. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  10. List<Get> gets = null;
  11. Result[] results;
  12. try {
  13. if (hTable != null) {
  14. gets = new ArrayList<>();
  15. for (String row : rows) {
  16. if(row!=null){
  17. gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
  18. }else{
  19. throw new RuntimeException("hbase have no data");
  20. }
  21. }
  22. }
  23. if (gets.size() > 0) {
  24. results = hTable.get(gets);
  25. if(results!=null && results.length>0){
  26. for (Result result : results) {
  27. for(Cell cell : result.rawCells()){
  28. System.out.print("RowKey:" + Bytes.toString(result.getRow())+"\t");
  29. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell))+"\t");
  30. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))+"\t");
  31. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell))+"\t");
  32. System.out.print("Timestamp:" + cell.getTimestamp());
  33. System.out.println();
  34. }
  35. }
  36. }
  37. }
  38. } catch (IOException e) {
  39. System.err.print(e);
  40. } finally {
  41. try {
  42. hTable.close();
  43. } catch (IOException e) {
  44. System.err.print(e);
  45. }
  46. }
  47. }

08. 根据限定符获取单行记录(get)

  1. public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  4. Get get = new Get(Bytes.toBytes(rowKey));
  5. get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
  6. Result result = hTable.get(get);
  7. for (Cell cell : result.rawCells()) {
  8. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  9. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  10. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  11. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  12. System.out.print("Timestamp:" + cell.getTimestamp());
  13. System.out.println();
  14. }
  15. }

09. 删除多行数据(DML)

  1. public static void deleteMultiRow(String tableName, String... rows) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
  4. List<Delete> deleteList = new ArrayList<>();
  5. for (String row : rows) {
  6. Delete delete = new Delete(Bytes.toBytes(row));
  7. deleteList.add(delete);
  8. }
  9. hTable.delete(deleteList);
  10. hTable.close();
  11. }

10. 删除表(DDL)

  1. public static void dropTable(String tableName) throws IOException {
  2. Connection connection = ConnectionFactory.createConnection(conf);
  3. HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
  4. if(isTableExist(tableName)){
  5. admin.disableTable(tableName);
  6. admin.deleteTable(tableName);
  7. System.out.println("Table:" + tableName + " delete successfully!");
  8. }else{
  9. System.out.println("Table:" + tableName + " not exist!");
  10. }
  11. }

完整示例

■ 常量类(Constant.java)

  1. public class Constant {
  2. /** HBase配置 **/
  3. public static class HBaseConfig {
  4. public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";
  5. public final static String ZK_PORT="2181";
  6. public final static String ZK_PATH="/hbase";
  7. }
  8. }

■ 工具类(HBaseBaseUtil.java)

  1. import com.lonton.bigdata.constant.Constant;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.*;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.io.IOException;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. /**
  12. * Description HBase Base API
  13. * Version 1.0.0
  14. */
  15. public class HBaseBaseUtil {
  16. private static final Logger log = LoggerFactory.getLogger(HBaseBaseUtil.class);
  17. public static Configuration conf;
  18. private static Connection conn;
  19. static {
  20. // 使用HBaseConfiguration的单例方法实例化
  21. conf = HBaseConfiguration.create();
  22. conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);
  23. conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);
  24. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);
  25. }
  26. /**
  27. * 获得链接
  28. * @return
  29. */
  30. public static synchronized Connection getConnection() {
  31. try {
  32. if (conn == null || conn.isClosed()) {
  33. conn = ConnectionFactory.createConnection(conf);
  34. }
  35. } catch (IOException e) {
  36. log.error("HBase 建立链接失败 ", e);
  37. }
  38. return conn;
  39. }
  40. /**
  41. * 关闭连接
  42. * @throws IOException
  43. */
  44. public static void closeConnect(Connection conn) {
  45. if (null != conn) {
  46. try {
  47. conn.close();
  48. } catch (Exception e) {
  49. log.error("closeConnect failure !", e);
  50. }
  51. }
  52. }
  53. /**
  54. * 判断表是否存在(DDL)
  55. * create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
  56. * @param tableName 表名
  57. * @return 是否创建成功
  58. * @throws IOException
  59. */
  60. public static boolean isTableExist(String tableName) throws IOException {
  61. boolean result = false;
  62. Connection conn = getConnection();
  63. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  64. try {
  65. result = admin.tableExists(tableName);
  66. } catch (Exception e) {
  67. log.error(e.getMessage());
  68. } finally {
  69. admin.close();
  70. }
  71. return result;
  72. }
  73. /**
  74. * 创建表(DDL)
  75. * create 't1', 'cf1', 'cf2'
  76. * @param tableName 表名
  77. * @param columnFamily 列簇
  78. * @throws IOException
  79. */
  80. public static void createTable(String tableName, String... columnFamily) throws IOException {
  81. // 判断表是否存在
  82. if (isTableExist(tableName)) {
  83. log.error("Table:" + tableName + " already exists!");
  84. return;
  85. }
  86. Connection conn = getConnection();
  87. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  88. try {
  89. // 创建表属性对象,表名需要转字节
  90. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  91. // 创建多个列族
  92. for (String cf : columnFamily) {
  93. descriptor.addFamily(new HColumnDescriptor(cf));
  94. }
  95. // 根据对表的配置,创建表
  96. admin.createTable(descriptor);
  97. log.info("Table:" + tableName + " create successfully!");
  98. } catch (Exception e) {
  99. log.error(e.getMessage());
  100. } finally {
  101. admin.close();
  102. }
  103. }
  104. /**
  105. * 新增记录(DML)
  106. * put 't1', '1001', 'cf1:name', 'zhangsan'
  107. * put 't1', '1001', 'cf1:age', '23'
  108. * @param tableName 表名
  109. * @param rowKey 行键
  110. * @param columnFamily 列簇
  111. * @param column 列
  112. * @param value 值
  113. * @throws IOException
  114. */
  115. public static void addRowData(String tableName, String rowKey, String columnFamily, String
  116. column, String value) throws IOException {
  117. Connection conn = getConnection();
  118. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  119. try {
  120. // 向表中插入数据
  121. Put put = new Put(Bytes.toBytes(rowKey));
  122. // 向Put对象中组装数据
  123. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
  124. hTable.put(put);
  125. log.info("insert successfully!");
  126. } catch (Exception e) {
  127. log.error(e.getMessage());
  128. } finally {
  129. hTable.close();
  130. }
  131. }
  132. /**
  133. * 全表扫描(Scan)
  134. * scan "t1"
  135. * @param tableName 表名
  136. * @return
  137. * @throws IOException
  138. */
  139. public static ResultScanner getAllRows(String tableName) throws IOException {
  140. ResultScanner results = null;
  141. Connection conn = getConnection();
  142. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  143. try {
  144. // 得到用于扫描region的对象
  145. Scan scan = new Scan();
  146. // setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
  147. scan.setCaching(1000);
  148. // 使用HTable得到resultcanner实现类的对象
  149. results = hTable.getScanner(scan);
  150. /*for (Result result : results) {
  151. Cell[] cells = result.rawCells();
  152. for (Cell cell : cells) {
  153. System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
  154. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  155. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  156. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  157. System.out.println();
  158. }
  159. }*/
  160. } catch (Exception e) {
  161. log.error(e.getMessage());
  162. } finally {
  163. hTable.close();
  164. }
  165. return results;
  166. }
  167. /**
  168. * 获取单行记录(get)
  169. * get "t1","1001"
  170. * @param tableName 表名
  171. * @param rowKey 行键
  172. * @return
  173. * @throws IOException
  174. */
  175. public static Result getRow(String tableName, String rowKey) throws IOException {
  176. Result result = null;
  177. Connection conn = getConnection();
  178. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  179. try {
  180. Get get = new Get(Bytes.toBytes(rowKey));
  181. //get.setMaxVersions(); // 显示所有版本
  182. //get.setTimeStamp(); // 显示指定时间戳的版本
  183. result = hTable.get(get);
  184. /*for (Cell cell : result.rawCells()) {
  185. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  186. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  187. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  188. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  189. System.out.print("Timestamp:" + cell.getTimestamp());
  190. System.out.println();
  191. }*/
  192. } catch (Exception e) {
  193. log.error(e.getMessage());
  194. } finally {
  195. hTable.close();
  196. }
  197. return result;
  198. }
  199. /**
  200. * 获取多行记录(get)
  201. * @param tableName 表名
  202. * @param rows 行键(可扩展参数)
  203. * @param <T>
  204. * @return
  205. * @throws IOException
  206. */
  207. public static <T> Result[] getRows(String tableName, List<T> rows) throws IOException {
  208. Connection conn = getConnection();
  209. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  210. List<Get> gets = null;
  211. Result[] results = null;
  212. try {
  213. if (hTable != null) {
  214. gets = new ArrayList<Get>();
  215. for (T row : rows) {
  216. if (row != null) {
  217. gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
  218. } else {
  219. throw new RuntimeException("hbase have no data");
  220. }
  221. }
  222. }
  223. if (gets.size() > 0) {
  224. results = hTable.get(gets);
  225. }
  226. } catch (IOException e) {
  227. log.error("getRows failure !", e);
  228. } finally {
  229. try {
  230. hTable.close();
  231. } catch (IOException e) {
  232. log.error("table.close() failure !", e);
  233. }
  234. }
  235. return results;
  236. }
  237. /**
  238. * 根据限定符获取单行记录(get)
  239. * get "t1","1001","cf1:name"
  240. * @param tableName 表名
  241. * @param rowKey 行键
  242. * @param family 列簇
  243. * @param qualifier 限定符
  244. * @throws IOException
  245. */
  246. public static Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
  247. Result result = null;
  248. Connection conn = getConnection();
  249. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  250. try {
  251. Get get = new Get(Bytes.toBytes(rowKey));
  252. get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
  253. result = hTable.get(get);
  254. /*for (Cell cell : result.rawCells()) {
  255. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  256. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  257. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  258. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  259. System.out.print("Timestamp:" + cell.getTimestamp());
  260. System.out.println();
  261. }*/
  262. } catch (Exception e) {
  263. log.error(e.getMessage());
  264. } finally {
  265. hTable.close();
  266. }
  267. return result;
  268. }
  269. /**
  270. * 删除多行数据(DML)
  271. * @param tableName 表名
  272. * @param rows 行键(可扩展参数)
  273. * @throws IOException
  274. */
  275. public static void deleteMultiRow(String tableName, String... rows) throws IOException {
  276. Connection conn = getConnection();
  277. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  278. try {
  279. List<Delete> deleteList = new ArrayList<>();
  280. for (String row : rows) {
  281. Delete delete = new Delete(Bytes.toBytes(row));
  282. deleteList.add(delete);
  283. }
  284. hTable.delete(deleteList);
  285. } catch (Exception e) {
  286. log.error(e.getMessage());
  287. } finally {
  288. hTable.close();
  289. }
  290. }
  291. /**
  292. * 删除表(DDL)
  293. * @param tableName 表名
  294. * @throws IOException
  295. */
  296. public static void dropTable(String tableName) throws IOException {
  297. if (!isTableExist(tableName)) {
  298. log.error("Table:" + tableName + " not exist!");
  299. return;
  300. }
  301. Connection conn = getConnection();
  302. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  303. try {
  304. if (isTableExist(tableName)) {
  305. admin.disableTable(tableName);
  306. admin.deleteTable(tableName);
  307. log.info("Table:" + tableName + " delete successfully!");
  308. }
  309. } catch (Exception e) {
  310. log.error(e.getMessage());
  311. } finally {
  312. admin.close();
  313. }
  314. }
  315. }

■ 测试类(HBaseBaseUtilTest.java)

  1. import org.apache.hadoop.hbase.Cell;
  2. import org.apache.hadoop.hbase.CellUtil;
  3. import org.apache.hadoop.hbase.client.Result;
  4. import org.apache.hadoop.hbase.client.ResultScanner;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.junit.*;
  7. import java.io.IOException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class HBaseBaseUtilTest {
  11. private String tableName="t1";
  12. private String rowKey="1001";
  13. /** BeforeClass:会在所有方法被调用前被执行,
  14. * 而且该方法是静态的,所有当测试类被加载后接着就会运行它,
  15. * 而且在内存中它只会存在一份实例,它比较适合加载配置文件
  16. **/
  17. @BeforeClass
  18. public static void setUpBeforeClass() {
  19. System.out.println("this is @BeforeClass ...");
  20. HBaseBaseUtil.getConnection();
  21. }
  22. /** AfterClass:通常用来对资源的清理,如关闭数据库的连接 **/
  23. @AfterClass
  24. public static void tearDownAfterClass() {
  25. System.out.println("this is @AfterClass ...");
  26. HBaseBaseUtil.closeConnect(HBaseBaseUtil.getConnection());
  27. }
  28. /** Before:每个测试方法调用前执行一次 **/
  29. @Before
  30. public void setUp() {
  31. System.out.println("this is @Before ...");
  32. }
  33. /** Before:每个测试方法调用后执行一次 **/
  34. @After
  35. public void tearDown() {
  36. System.out.println("this is @After ...");
  37. }
  38. @Test
  39. public void createTable() throws IOException {
  40. HBaseBaseUtil.createTable(tableName, "cf1", "cf2");
  41. assert HBaseBaseUtil.isTableExist(tableName);
  42. }
  43. @Test
  44. public void isTableExist() throws IOException {
  45. assert HBaseBaseUtil.isTableExist(tableName);
  46. }
  47. @Test
  48. public void addRowData() throws IOException {
  49. String value="tom";
  50. HBaseBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  51. Result result=HBaseBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
  52. Cell[] cells=result.rawCells();
  53. Assert.assertNotNull(cells);
  54. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  55. }
  56. @Test
  57. public void getAllRows() throws IOException {
  58. ResultScanner results =HBaseBaseUtil.getAllRows(tableName);
  59. Assert.assertNotNull(results);
  60. for (Result result : results) {
  61. Assert.assertNotNull(result);
  62. }
  63. }
  64. @Test
  65. public void getRow() throws IOException {
  66. String value="tom";
  67. HBaseBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  68. Result result=HBaseBaseUtil.getRow(tableName,rowKey);
  69. Cell[] cells=result.rawCells();
  70. Assert.assertNotNull(cells);
  71. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  72. }
  73. @Test
  74. public void getRows() throws IOException {
  75. String rowKey1="1003";
  76. String rowKey2="1004";
  77. String value1="tom1";
  78. String value2="tom2";
  79. HBaseBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
  80. HBaseBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
  81. List<String> rows=new ArrayList<>();
  82. rows.add(rowKey1);
  83. rows.add(rowKey2);
  84. Result[] results=HBaseBaseUtil.getRows(tableName,rows);
  85. Assert.assertNotNull(results);
  86. Assert.assertTrue(results.length==2);
  87. }
  88. @Test
  89. public void getRowQualifier() throws IOException {
  90. String value="tom";
  91. HBaseBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  92. Result result=HBaseBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
  93. Cell[] cells=result.rawCells();
  94. Assert.assertNotNull(cells);
  95. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  96. }
  97. @Test
  98. public void deleteMultiRow() throws IOException {
  99. String rowKey1="1003";
  100. String rowKey2="1004";
  101. String value1="tom1";
  102. String value2="tom2";
  103. HBaseBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
  104. HBaseBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
  105. HBaseBaseUtil.deleteMultiRow(tableName, rowKey1, rowKey2);
  106. List<String> rows=new ArrayList<>();
  107. rows.add(rowKey1);
  108. rows.add(rowKey2);
  109. Result[] results=HBaseBaseUtil.getRows(tableName,rows);
  110. Assert.assertNotNull(results);
  111. for (Result result : results) {
  112. Cell[] cells=result.rawCells();
  113. Assert.assertTrue(cells.length==0);
  114. }
  115. }
  116. @Test
  117. public void dropTable() throws IOException {
  118. assert HBaseBaseUtil.isTableExist(tableName);
  119. HBaseBaseUtil.dropTable(tableName);
  120. assert !HBaseBaseUtil.isTableExist(tableName);
  121. }
  122. }

附件

完整案例代码:hbase-example-src.zip