pom依赖
<!-- hbase-client --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.2.2</version></dependency>
配置Hosts文件
# 需要配置集群的ip和主机名映射# 注意:你的集群有多少台机器就要添加多少个127.0.0.1 master127.0.0.2 datanode1127.0.0.3 datanode2
工具类
HBaseConn.java - 连接工具类
package com.example.hbase.hbasedemo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Table;public class HBaseConn { private static final HBaseConn INSTANCE = new HBaseConn(); private static Configuration configuration; private static Connection connection; //无参构造 private HBaseConn(){ try{ if(configuration == null){ //创建配置文件对象 configuration = HBaseConfiguration.create(); //加载zookeeper配置 configuration.set("hbase.zookeeper.quorum", "127.0.0.1,127.0.0.2,127.0.0.3"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); } }catch (Exception e){ e.printStackTrace(); } } /** * 获取连接 * @return */ private Connection getConnection(){ if(connection == null || connection.isClosed()){ try { connection = ConnectionFactory.createConnection(configuration); }catch (Exception e){ e.printStackTrace(); } } return connection; } /** * 获取连接 - 静态方法 * @return */ public static Connection getHBaseConnection(){ return INSTANCE.getConnection(); } /** * 获取表 * @param tableName * @return * @throws Exception */ public static Table getTable(String tableName) throws Exception{ return INSTANCE.getConnection().getTable(TableName.valueOf(tableName)); } /** * 关闭连接 */ public static void closeConnection(){ if(connection != null){ try { connection.close(); }catch (Exception e){ e.printStackTrace(); } } }}
HBaseUtil.java - 操作hbase工具类
package com.example.hbase.hbasedemo;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.filter.FilterList;import org.apache.hadoop.hbase.util.Bytes;import java.util.Arrays;import java.util.List;public class HBaseUtil { /** * 创建表 * @param tableName 表名 * @param cfs 列祖列族 * @return */ public static boolean createTable(String tableName,String [] cfs){ try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){ if (admin.tableExists(TableName.valueOf(tableName))) { return false; } //定义表描述对象 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); //遍历列族数组 Arrays.stream(cfs).forEach(cf -> { //定义列族描述对象 ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf); //给表添加列族信息 tableDescriptorBuilder.setColumnFamily(columnFamily); }); //创建表 admin.createTable(tableDescriptorBuilder.build()); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 删除表 * @param tableName 表名 * @return */ public static boolean deleteTable(String tableName){ try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){ admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 插入数据 * @param tableName 表名 * @param rowkey 唯一标识 * @param cfName 列族名 * @param qualifer 列标识 * @param data 数据 * @return */ public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){ try (Table table = HBaseConn.getTable(tableName)){ Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data)); table.put(put); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 批量插入 * @param tableName 表名 * @param puts 数据 * @return */ public static boolean putRows(String tableName, List<Put> puts){ try (Table table = HBaseConn.getTable(tableName)){ table.put(puts); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 获取单条数据 * @param tableName 表名 * @param rowKey 唯一标识 * @return */ public static Result getRow(String tableName, String rowKey){ try (Table table = HBaseConn.getTable(tableName)){ Get get = new Get(Bytes.toBytes(rowKey)); return table.get(get); }catch (Exception e){ e.printStackTrace(); } return null; } /** * 使用过滤器 * @param tableName 表名 * @param rowKey 唯一标识 * @param filterList 过滤器集合 * @return */ public static Result getRow(String tableName, String rowKey, FilterList filterList){ try (Table table = HBaseConn.getTable(tableName)){ Get get = new Get(Bytes.toBytes(rowKey)); get.setFilter(filterList); return table.get(get); }catch (Exception e){ e.printStackTrace(); } return null; } /** * 全表扫描 * @param tableName 表名 * @return */ public static ResultScanner getScanner(String tableName){ try (Table table = HBaseConn.getTable(tableName)){ Scan scan = new Scan(); scan.setCaching(1000); return table.getScanner(scan); }catch (Exception e){ e.printStackTrace(); } return null; } /** * 批量检索数据 - 范围检索 * @param tableName 表名 * @param startRowKey 起始rowkey * @param endRowKey 终止rowkey * @return */ public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey){ try (Table table = HBaseConn.getTable(tableName)){ Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(endRowKey)); scan.setCaching(1000); //取出数据// ResultScanner resultScanner = table.getScanner(scan);// for (Result rs : resultScanner) {// Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列1"))));// Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列2"))));// } return table.getScanner(scan); }catch (Exception e){ e.printStackTrace(); } return null; } /** * 批量检索数据 - 使用过滤器 * @param tableName 表名 * @param startRowKey 起始rowkey * @param endRowKey 终止rowkey * @return */ public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey,FilterList filterList){ /** * FilterList用法 * FilterList.Operator.MUST_PASS_ALL - 满足所有条件 - 相当于sql中的and * FilterList.Operator.MUST_PASS_ONE - 满足一个条件 - 相当于sql中的or */// Scan scan = new Scan();// //创建过滤器集合// FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);// //条件过滤器// //CompareOperator.EQUAL - 绝对匹配// SingleColumnValueFilter filter = new SingleColumnValueFilter(// Bytes.toBytes("列族"),// Bytes.toBytes("列"),// CompareOperator.EQUAL,// Bytes.toBytes("条件")// );// //添加过滤器// filters.addFilter(filter);//// //前缀过滤器// PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("rowkey的前缀,比如井筒"));// filters.addFilter(filter2); try (Table table = HBaseConn.getTable(tableName)){ Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); scan.setCaching(1000); return table.getScanner(scan); }catch (Exception e){ e.printStackTrace(); } return null; } /** * 删除一行记录 * @param tableName 表名 * @param rowKey 唯一标识 * @return */ public static boolean deleteRow(String tableName,String rowKey){ try (Table table = HBaseConn.getTable(tableName)){ Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 删除列族 * @param tableName 表名 * @param cfName 列族 * @return */ public static boolean deleteColumnFamily(String tableName,String cfName){ try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){ admin.deleteColumn(TableName.valueOf(tableName),Bytes.toBytes(cfName)); }catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 删除列族 * @param tableName 表名 * @param cfName 列族 * @return */ public static boolean deleteQualifier(String tableName,String rowKey,String cfName,String qualifier){ try (Table table = HBaseConn.getTable(tableName)){ Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifier)); table.delete(delete); }catch (Exception e){ e.printStackTrace(); return false; } return true; }}
工具类的使用
package com.example.hbase.hbasedemo;import org.apache.hadoop.hbase.CompareOperator;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.filter.*;import org.apache.hadoop.hbase.util.Bytes;import java.text.SimpleDateFormat;import java.util.*;public class HBaseTest { private static Connection conn; private static String testTableName = "TableTest"; static { conn = HBaseConn.getHBaseConnection(); System.out.println("HBase连接 -> "+conn); } /** * 创建并插入数据测试 * 以70个测点为例 * 插入1小时3600条数据,耗时30秒 */ public static void createTableAndDataTest() { String [] cf = new String[]{"data","info"};//列族 - data数据 - info其他信息 boolean table = HBaseUtil.createTable(testTableName, cf);//创建表 String createTable = table?"成功":"失败"; System.out.println("创建表 -> " + createTable + " -> 开始插入"); //生成put,批量插入 List<Put> putList = new ArrayList<>(); long start = 1577808000000l; for (int i = 0;i < 1 * 60 * 60; i++){ start+=1000; String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - start); Put put = new Put(Bytes.toBytes(rowkey)); //生成list列 - 70个列为例子 put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("time"),Bytes.toBytes(timeStamp2Date(start,null))); put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("createTime"),Bytes.toBytes(timeStamp2Date(start,null))); for (int j = 1; j < 70; j++) { String key = "key"+j; String value = "4500.00"; put.addColumn(Bytes.toBytes("data"),Bytes.toBytes(key),Bytes.toBytes(value)); } putList.add(put); } long s = System.currentTimeMillis(); boolean putRows = HBaseUtil.putRows(testTableName, putList); long e = System.currentTimeMillis(); System.out.println("数据插入 -> " + putRows + " -> 耗时 -> " + (e-s)); HBaseConn.closeConnection(); } /** * 时间戳转字符串 - yyyy-MM-dd HH:mm:ss * @param seconds * @param format * @return */ public static String timeStamp2Date(long seconds,String format) { if(format == null || format.isEmpty()){ format = "yyyy-MM-dd HH:mm:ss"; } SimpleDateFormat sdf = new SimpleDateFormat(format); return sdf.format(new Date(seconds)); } /** * 删除表 */ public static boolean deleteTable() { boolean b = HBaseUtil.deleteTable(testTableName); System.out.println("删除表 -> " + b); return b; } /** * 根据rowkey获取单条数据 */ public static void getOne() { String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - 1577808002000l); //获取数据 Result result = HBaseUtil.getRow(testTableName, rowkey); Map<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data")); for(Map.Entry<byte[], byte[]> entry:familyMap.entrySet()){ System.out.println(Bytes.toString(entry.getKey())); System.out.println(Bytes.toString(entry.getValue())); } } /*----------------------------- 过滤器用法 --------------------------*/ /** * RowFilter 过滤器 * 比较过滤器 * @throws Exception */ public void rowFilter()throws Exception { //RowFilter过滤器使用 - CompareOperator比较运算符 Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("rowkey1"))); //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter)); } /** * prefixFilter 过滤器 * 以 rowkey2 为前缀的所有行 * @throws Exception */ public void prefixFilter()throws Exception { Filter filter = new PrefixFilter(Bytes.toBytes("rowkey2")); //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter)); } /** * keyOnlyFilter 过滤器 * 只返回 rowkey 和列的值 ,不会返回数据 * @throws Exception */ public void keyOnlyFilter()throws Exception { Filter filter = new KeyOnlyFilter(true); //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter)); } /** * columnPrefixFilter 过滤器 * 列标识前缀过滤 * @throws Exception */ public void columnPrefixFilter()throws Exception { Filter filter = new ColumnPrefixFilter(Bytes.toBytes("nam")); //Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter)); }}