pom依赖

  1. <!-- hbase-client -->
  2. <dependency>
  3. <groupId>org.apache.hbase</groupId>
  4. <artifactId>hbase-client</artifactId>
  5. <version>2.2.2</version>
  6. </dependency>

配置Hosts文件

  1. # 需要配置集群的ip和主机名映射
  2. # 注意:你的集群有多少台机器就要添加多少个
  3. 127.0.0.1 master
  4. 127.0.0.2 datanode1
  5. 127.0.0.3 datanode2

工具类

HBaseConn.java - 连接工具类

  1. package com.example.hbase.hbasedemo;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Table;
  8. public class HBaseConn {
  9. private static final HBaseConn INSTANCE = new HBaseConn();
  10. private static Configuration configuration;
  11. private static Connection connection;
  12. //无参构造
  13. private HBaseConn(){
  14. try{
  15. if(configuration == null){
  16. //创建配置文件对象
  17. configuration = HBaseConfiguration.create();
  18. //加载zookeeper配置
  19. configuration.set("hbase.zookeeper.quorum", "127.0.0.1,127.0.0.2,127.0.0.3");
  20. configuration.set("hbase.zookeeper.property.clientPort", "2181");
  21. }
  22. }catch (Exception e){
  23. e.printStackTrace();
  24. }
  25. }
  26. /**
  27. * 获取连接
  28. * @return
  29. */
  30. private Connection getConnection(){
  31. if(connection == null || connection.isClosed()){
  32. try {
  33. connection = ConnectionFactory.createConnection(configuration);
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }
  37. }
  38. return connection;
  39. }
  40. /**
  41. * 获取连接 - 静态方法
  42. * @return
  43. */
  44. public static Connection getHBaseConnection(){
  45. return INSTANCE.getConnection();
  46. }
  47. /**
  48. * 获取表
  49. * @param tableName
  50. * @return
  51. * @throws Exception
  52. */
  53. public static Table getTable(String tableName) throws Exception{
  54. return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
  55. }
  56. /**
  57. * 关闭连接
  58. */
  59. public static void closeConnection(){
  60. if(connection != null){
  61. try {
  62. connection.close();
  63. }catch (Exception e){
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. }

HBaseUtil.java - 操作hbase工具类

  1. package com.example.hbase.hbasedemo;
  2. import org.apache.hadoop.hbase.*;
  3. import org.apache.hadoop.hbase.client.*;
  4. import org.apache.hadoop.hbase.filter.FilterList;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. public class HBaseUtil {
  9. /**
  10. * 创建表
  11. * @param tableName 表名
  12. * @param cfs 列祖列族
  13. * @return
  14. */
  15. public static boolean createTable(String tableName,String [] cfs){
  16. try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
  17. if (admin.tableExists(TableName.valueOf(tableName))) {
  18. return false;
  19. }
  20. //定义表描述对象
  21. TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
  22. //遍历列族数组
  23. Arrays.stream(cfs).forEach(cf -> {
  24. //定义列族描述对象
  25. ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
  26. //给表添加列族信息
  27. tableDescriptorBuilder.setColumnFamily(columnFamily);
  28. });
  29. //创建表
  30. admin.createTable(tableDescriptorBuilder.build());
  31. }catch (Exception e){
  32. e.printStackTrace();
  33. return false;
  34. }
  35. return true;
  36. }
  37. /**
  38. * 删除表
  39. * @param tableName 表名
  40. * @return
  41. */
  42. public static boolean deleteTable(String tableName){
  43. try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
  44. admin.disableTable(TableName.valueOf(tableName));
  45. admin.deleteTable(TableName.valueOf(tableName));
  46. }catch (Exception e){
  47. e.printStackTrace();
  48. return false;
  49. }
  50. return true;
  51. }
  52. /**
  53. * 插入数据
  54. * @param tableName 表名
  55. * @param rowkey 唯一标识
  56. * @param cfName 列族名
  57. * @param qualifer 列标识
  58. * @param data 数据
  59. * @return
  60. */
  61. public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){
  62. try (Table table = HBaseConn.getTable(tableName)){
  63. Put put = new Put(Bytes.toBytes(rowkey));
  64. put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data));
  65. table.put(put);
  66. }catch (Exception e){
  67. e.printStackTrace();
  68. return false;
  69. }
  70. return true;
  71. }
  72. /**
  73. * 批量插入
  74. * @param tableName 表名
  75. * @param puts 数据
  76. * @return
  77. */
  78. public static boolean putRows(String tableName, List<Put> puts){
  79. try (Table table = HBaseConn.getTable(tableName)){
  80. table.put(puts);
  81. }catch (Exception e){
  82. e.printStackTrace();
  83. return false;
  84. }
  85. return true;
  86. }
  87. /**
  88. * 获取单条数据
  89. * @param tableName 表名
  90. * @param rowKey 唯一标识
  91. * @return
  92. */
  93. public static Result getRow(String tableName, String rowKey){
  94. try (Table table = HBaseConn.getTable(tableName)){
  95. Get get = new Get(Bytes.toBytes(rowKey));
  96. return table.get(get);
  97. }catch (Exception e){
  98. e.printStackTrace();
  99. }
  100. return null;
  101. }
  102. /**
  103. * 使用过滤器
  104. * @param tableName 表名
  105. * @param rowKey 唯一标识
  106. * @param filterList 过滤器集合
  107. * @return
  108. */
  109. public static Result getRow(String tableName, String rowKey, FilterList filterList){
  110. try (Table table = HBaseConn.getTable(tableName)){
  111. Get get = new Get(Bytes.toBytes(rowKey));
  112. get.setFilter(filterList);
  113. return table.get(get);
  114. }catch (Exception e){
  115. e.printStackTrace();
  116. }
  117. return null;
  118. }
  119. /**
  120. * 全表扫描
  121. * @param tableName 表名
  122. * @return
  123. */
  124. public static ResultScanner getScanner(String tableName){
  125. try (Table table = HBaseConn.getTable(tableName)){
  126. Scan scan = new Scan();
  127. scan.setCaching(1000);
  128. return table.getScanner(scan);
  129. }catch (Exception e){
  130. e.printStackTrace();
  131. }
  132. return null;
  133. }
  134. /**
  135. * 批量检索数据 - 范围检索
  136. * @param tableName 表名
  137. * @param startRowKey 起始rowkey
  138. * @param endRowKey 终止rowkey
  139. * @return
  140. */
  141. public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey){
  142. try (Table table = HBaseConn.getTable(tableName)){
  143. Scan scan = new Scan();
  144. scan.withStartRow(Bytes.toBytes(startRowKey));
  145. scan.withStopRow(Bytes.toBytes(endRowKey));
  146. scan.setCaching(1000);
  147. //取出数据
  148. // ResultScanner resultScanner = table.getScanner(scan);
  149. // for (Result rs : resultScanner) {
  150. // Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列1"))));
  151. // Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列2"))));
  152. // }
  153. return table.getScanner(scan);
  154. }catch (Exception e){
  155. e.printStackTrace();
  156. }
  157. return null;
  158. }
  159. /**
  160. * 批量检索数据 - 使用过滤器
  161. * @param tableName 表名
  162. * @param startRowKey 起始rowkey
  163. * @param endRowKey 终止rowkey
  164. * @return
  165. */
  166. public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey,FilterList filterList){
  167. /**
  168. * FilterList用法
  169. * FilterList.Operator.MUST_PASS_ALL - 满足所有条件 - 相当于sql中的and
  170. * FilterList.Operator.MUST_PASS_ONE - 满足一个条件 - 相当于sql中的or
  171. */
  172. // Scan scan = new Scan();
  173. // //创建过滤器集合
  174. // FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  175. // //条件过滤器
  176. // //CompareOperator.EQUAL - 绝对匹配
  177. // SingleColumnValueFilter filter = new SingleColumnValueFilter(
  178. // Bytes.toBytes("列族"),
  179. // Bytes.toBytes("列"),
  180. // CompareOperator.EQUAL,
  181. // Bytes.toBytes("条件")
  182. // );
  183. // //添加过滤器
  184. // filters.addFilter(filter);
  185. //
  186. // //前缀过滤器
  187. // PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("rowkey的前缀,比如井筒"));
  188. // filters.addFilter(filter2);
  189. try (Table table = HBaseConn.getTable(tableName)){
  190. Scan scan = new Scan();
  191. scan.withStartRow(Bytes.toBytes(startRowKey));
  192. scan.withStopRow(Bytes.toBytes(endRowKey));
  193. scan.setFilter(filterList);
  194. scan.setCaching(1000);
  195. return table.getScanner(scan);
  196. }catch (Exception e){
  197. e.printStackTrace();
  198. }
  199. return null;
  200. }
  201. /**
  202. * 删除一行记录
  203. * @param tableName 表名
  204. * @param rowKey 唯一标识
  205. * @return
  206. */
  207. public static boolean deleteRow(String tableName,String rowKey){
  208. try (Table table = HBaseConn.getTable(tableName)){
  209. Delete delete = new Delete(Bytes.toBytes(rowKey));
  210. table.delete(delete);
  211. }catch (Exception e){
  212. e.printStackTrace();
  213. return false;
  214. }
  215. return true;
  216. }
  217. /**
  218. * 删除列族
  219. * @param tableName 表名
  220. * @param cfName 列族
  221. * @return
  222. */
  223. public static boolean deleteColumnFamily(String tableName,String cfName){
  224. try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
  225. admin.deleteColumn(TableName.valueOf(tableName),Bytes.toBytes(cfName));
  226. }catch (Exception e){
  227. e.printStackTrace();
  228. return false;
  229. }
  230. return true;
  231. }
  232. /**
  233. * 删除列族
  234. * @param tableName 表名
  235. * @param cfName 列族
  236. * @return
  237. */
  238. public static boolean deleteQualifier(String tableName,String rowKey,String cfName,String qualifier){
  239. try (Table table = HBaseConn.getTable(tableName)){
  240. Delete delete = new Delete(Bytes.toBytes(rowKey));
  241. delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifier));
  242. table.delete(delete);
  243. }catch (Exception e){
  244. e.printStackTrace();
  245. return false;
  246. }
  247. return true;
  248. }
  249. }

工具类的使用

  1. package com.example.hbase.hbasedemo;
  2. import org.apache.hadoop.hbase.CompareOperator;
  3. import org.apache.hadoop.hbase.client.Connection;
  4. import org.apache.hadoop.hbase.client.Put;
  5. import org.apache.hadoop.hbase.client.Result;
  6. import org.apache.hadoop.hbase.filter.*;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8. import java.text.SimpleDateFormat;
  9. import java.util.*;
  10. public class HBaseTest {
  11. private static Connection conn;
  12. private static String testTableName = "TableTest";
  13. static {
  14. conn = HBaseConn.getHBaseConnection();
  15. System.out.println("HBase连接 -> "+conn);
  16. }
  17. /**
  18. * 创建并插入数据测试
  19. * 以70个测点为例
  20. * 插入1小时3600条数据,耗时30秒
  21. */
  22. public static void createTableAndDataTest() {
  23. String [] cf = new String[]{"data","info"};//列族 - data数据 - info其他信息
  24. boolean table = HBaseUtil.createTable(testTableName, cf);//创建表
  25. String createTable = table?"成功":"失败";
  26. System.out.println("创建表 -> " + createTable + " -> 开始插入");
  27. //生成put,批量插入
  28. List<Put> putList = new ArrayList<>();
  29. long start = 1577808000000l;
  30. for (int i = 0;i < 1 * 60 * 60; i++){
  31. start+=1000;
  32. String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - start);
  33. Put put = new Put(Bytes.toBytes(rowkey));
  34. //生成list列 - 70个列为例子
  35. put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("time"),Bytes.toBytes(timeStamp2Date(start,null)));
  36. put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("createTime"),Bytes.toBytes(timeStamp2Date(start,null)));
  37. for (int j = 1; j < 70; j++) {
  38. String key = "key"+j;
  39. String value = "4500.00";
  40. put.addColumn(Bytes.toBytes("data"),Bytes.toBytes(key),Bytes.toBytes(value));
  41. }
  42. putList.add(put);
  43. }
  44. long s = System.currentTimeMillis();
  45. boolean putRows = HBaseUtil.putRows(testTableName, putList);
  46. long e = System.currentTimeMillis();
  47. System.out.println("数据插入 -> " + putRows + " -> 耗时 -> " + (e-s));
  48. HBaseConn.closeConnection();
  49. }
  50. /**
  51. * 时间戳转字符串 - yyyy-MM-dd HH:mm:ss
  52. * @param seconds
  53. * @param format
  54. * @return
  55. */
  56. public static String timeStamp2Date(long seconds,String format) {
  57. if(format == null || format.isEmpty()){
  58. format = "yyyy-MM-dd HH:mm:ss";
  59. }
  60. SimpleDateFormat sdf = new SimpleDateFormat(format);
  61. return sdf.format(new Date(seconds));
  62. }
  63. /**
  64. * 删除表
  65. */
  66. public static boolean deleteTable() {
  67. boolean b = HBaseUtil.deleteTable(testTableName);
  68. System.out.println("删除表 -> " + b);
  69. return b;
  70. }
  71. /**
  72. * 根据rowkey获取单条数据
  73. */
  74. public static void getOne() {
  75. String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - 1577808002000l);
  76. //获取数据
  77. Result result = HBaseUtil.getRow(testTableName, rowkey);
  78. Map<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
  79. for(Map.Entry<byte[], byte[]> entry:familyMap.entrySet()){
  80. System.out.println(Bytes.toString(entry.getKey()));
  81. System.out.println(Bytes.toString(entry.getValue()));
  82. }
  83. }
  84. /*----------------------------- 过滤器用法 --------------------------*/
  85. /**
  86. * RowFilter 过滤器
  87. * 比较过滤器
  88. * @throws Exception
  89. */
  90. public void rowFilter()throws Exception {
  91. //RowFilter过滤器使用 - CompareOperator比较运算符
  92. Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("rowkey1")));
  93. //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
  94. FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
  95. }
  96. /**
  97. * prefixFilter 过滤器
  98. * 以 rowkey2 为前缀的所有行
  99. * @throws Exception
  100. */
  101. public void prefixFilter()throws Exception {
  102. Filter filter = new PrefixFilter(Bytes.toBytes("rowkey2"));
  103. //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
  104. FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
  105. }
  106. /**
  107. * keyOnlyFilter 过滤器
  108. * 只返回 rowkey 和列的值 ,不会返回数据
  109. * @throws Exception
  110. */
  111. public void keyOnlyFilter()throws Exception {
  112. Filter filter = new KeyOnlyFilter(true);
  113. //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
  114. FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
  115. }
  116. /**
  117. * columnPrefixFilter 过滤器
  118. * 列标识前缀过滤
  119. * @throws Exception
  120. */
  121. public void columnPrefixFilter()throws Exception {
  122. Filter filter = new ColumnPrefixFilter(Bytes.toBytes("nam"));
  123. //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
  124. FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
  125. }
  126. }