pom依赖
<!-- hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>
配置Hosts文件
# 需要配置集群的ip和主机名映射
# 注意:你的集群有多少台机器就要添加多少个
127.0.0.1 master
127.0.0.2 datanode1
127.0.0.3 datanode2
工具类
HBaseConn.java - 连接工具类
package com.example.hbase.hbasedemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
public class HBaseConn {
private static final HBaseConn INSTANCE = new HBaseConn();
private static Configuration configuration;
private static Connection connection;
//无参构造
private HBaseConn(){
try{
if(configuration == null){
//创建配置文件对象
configuration = HBaseConfiguration.create();
//加载zookeeper配置
configuration.set("hbase.zookeeper.quorum", "127.0.0.1,127.0.0.2,127.0.0.3");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 获取连接
* @return
*/
private Connection getConnection(){
if(connection == null || connection.isClosed()){
try {
connection = ConnectionFactory.createConnection(configuration);
}catch (Exception e){
e.printStackTrace();
}
}
return connection;
}
/**
* 获取连接 - 静态方法
* @return
*/
public static Connection getHBaseConnection(){
return INSTANCE.getConnection();
}
/**
* 获取表
* @param tableName
* @return
* @throws Exception
*/
public static Table getTable(String tableName) throws Exception{
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}
/**
* 关闭连接
*/
public static void closeConnection(){
if(connection != null){
try {
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
HBaseUtil.java - 操作hbase工具类
package com.example.hbase.hbasedemo;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.List;
public class HBaseUtil {
/**
* 创建表
* @param tableName 表名
* @param cfs 列祖列族
* @return
*/
public static boolean createTable(String tableName,String [] cfs){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
//定义表描述对象
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
//遍历列族数组
Arrays.stream(cfs).forEach(cf -> {
//定义列族描述对象
ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
//给表添加列族信息
tableDescriptorBuilder.setColumnFamily(columnFamily);
});
//创建表
admin.createTable(tableDescriptorBuilder.build());
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 删除表
* @param tableName 表名
* @return
*/
public static boolean deleteTable(String tableName){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 插入数据
* @param tableName 表名
* @param rowkey 唯一标识
* @param cfName 列族名
* @param qualifer 列标识
* @param data 数据
* @return
*/
public static boolean putRow(String tableName,String rowkey,String cfName,String qualifer,String data){
try (Table table = HBaseConn.getTable(tableName)){
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifer),Bytes.toBytes(data));
table.put(put);
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 批量插入
* @param tableName 表名
* @param puts 数据
* @return
*/
public static boolean putRows(String tableName, List<Put> puts){
try (Table table = HBaseConn.getTable(tableName)){
table.put(puts);
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 获取单条数据
* @param tableName 表名
* @param rowKey 唯一标识
* @return
*/
public static Result getRow(String tableName, String rowKey){
try (Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 使用过滤器
* @param tableName 表名
* @param rowKey 唯一标识
* @param filterList 过滤器集合
* @return
*/
public static Result getRow(String tableName, String rowKey, FilterList filterList){
try (Table table = HBaseConn.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowKey));
get.setFilter(filterList);
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 全表扫描
* @param tableName 表名
* @return
*/
public static ResultScanner getScanner(String tableName){
try (Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.setCaching(1000);
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 批量检索数据 - 范围检索
* @param tableName 表名
* @param startRowKey 起始rowkey
* @param endRowKey 终止rowkey
* @return
*/
public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey){
try (Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setCaching(1000);
//取出数据
// ResultScanner resultScanner = table.getScanner(scan);
// for (Result rs : resultScanner) {
// Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列1"))));
// Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("列族"),Bytes.toBytes("列2"))));
// }
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 批量检索数据 - 使用过滤器
* @param tableName 表名
* @param startRowKey 起始rowkey
* @param endRowKey 终止rowkey
* @return
*/
public static ResultScanner getScanner(String tableName,String startRowKey,String endRowKey,FilterList filterList){
/**
* FilterList用法
* FilterList.Operator.MUST_PASS_ALL - 满足所有条件 - 相当于sql中的and
* FilterList.Operator.MUST_PASS_ONE - 满足一个条件 - 相当于sql中的or
*/
// Scan scan = new Scan();
// //创建过滤器集合
// FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// //条件过滤器
// //CompareOperator.EQUAL - 绝对匹配
// SingleColumnValueFilter filter = new SingleColumnValueFilter(
// Bytes.toBytes("列族"),
// Bytes.toBytes("列"),
// CompareOperator.EQUAL,
// Bytes.toBytes("条件")
// );
// //添加过滤器
// filters.addFilter(filter);
//
// //前缀过滤器
// PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("rowkey的前缀,比如井筒"));
// filters.addFilter(filter2);
try (Table table = HBaseConn.getTable(tableName)){
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
scan.setCaching(1000);
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
* 删除一行记录
* @param tableName 表名
* @param rowKey 唯一标识
* @return
*/
public static boolean deleteRow(String tableName,String rowKey){
try (Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 删除列族
* @param tableName 表名
* @param cfName 列族
* @return
*/
public static boolean deleteColumnFamily(String tableName,String cfName){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConn.getHBaseConnection().getAdmin()){
admin.deleteColumn(TableName.valueOf(tableName),Bytes.toBytes(cfName));
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 删除列族
* @param tableName 表名
* @param cfName 列族
* @return
*/
public static boolean deleteQualifier(String tableName,String rowKey,String cfName,String qualifier){
try (Table table = HBaseConn.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(cfName),Bytes.toBytes(qualifier));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
}
工具类的使用
package com.example.hbase.hbasedemo;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.text.SimpleDateFormat;
import java.util.*;
public class HBaseTest {
private static Connection conn;
private static String testTableName = "TableTest";
static {
conn = HBaseConn.getHBaseConnection();
System.out.println("HBase连接 -> "+conn);
}
/**
* 创建并插入数据测试
* 以70个测点为例
* 插入1小时3600条数据,耗时30秒
*/
public static void createTableAndDataTest() {
String [] cf = new String[]{"data","info"};//列族 - data数据 - info其他信息
boolean table = HBaseUtil.createTable(testTableName, cf);//创建表
String createTable = table?"成功":"失败";
System.out.println("创建表 -> " + createTable + " -> 开始插入");
//生成put,批量插入
List<Put> putList = new ArrayList<>();
long start = 1577808000000l;
for (int i = 0;i < 1 * 60 * 60; i++){
start+=1000;
String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - start);
Put put = new Put(Bytes.toBytes(rowkey));
//生成list列 - 70个列为例子
put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("time"),Bytes.toBytes(timeStamp2Date(start,null)));
put.addColumn(Bytes.toBytes("data"),Bytes.toBytes("createTime"),Bytes.toBytes(timeStamp2Date(start,null)));
for (int j = 1; j < 70; j++) {
String key = "key"+j;
String value = "4500.00";
put.addColumn(Bytes.toBytes("data"),Bytes.toBytes(key),Bytes.toBytes(value));
}
putList.add(put);
}
long s = System.currentTimeMillis();
boolean putRows = HBaseUtil.putRows(testTableName, putList);
long e = System.currentTimeMillis();
System.out.println("数据插入 -> " + putRows + " -> 耗时 -> " + (e-s));
HBaseConn.closeConnection();
}
/**
* 时间戳转字符串 - yyyy-MM-dd HH:mm:ss
* @param seconds
* @param format
* @return
*/
public static String timeStamp2Date(long seconds,String format) {
if(format == null || format.isEmpty()){
format = "yyyy-MM-dd HH:mm:ss";
}
SimpleDateFormat sdf = new SimpleDateFormat(format);
return sdf.format(new Date(seconds));
}
/**
* 删除表
*/
public static boolean deleteTable() {
boolean b = HBaseUtil.deleteTable(testTableName);
System.out.println("删除表 -> " + b);
return b;
}
/**
* 根据rowkey获取单条数据
*/
public static void getOne() {
String rowkey = "wellboreIdTest-"+(Long.MAX_VALUE - 1577808002000l);
//获取数据
Result result = HBaseUtil.getRow(testTableName, rowkey);
Map<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
for(Map.Entry<byte[], byte[]> entry:familyMap.entrySet()){
System.out.println(Bytes.toString(entry.getKey()));
System.out.println(Bytes.toString(entry.getValue()));
}
}
/*----------------------------- 过滤器用法 --------------------------*/
/**
* RowFilter 过滤器
* 比较过滤器
* @throws Exception
*/
public void rowFilter()throws Exception {
//RowFilter过滤器使用 - CompareOperator比较运算符
Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("rowkey1")));
//Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
}
/**
* prefixFilter 过滤器
* 以 rowkey2 为前缀的所有行
* @throws Exception
*/
public void prefixFilter()throws Exception {
Filter filter = new PrefixFilter(Bytes.toBytes("rowkey2"));
//Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
}
/**
* keyOnlyFilter 过滤器
* 只返回 rowkey 和列的值 ,不会返回数据
* @throws Exception
*/
public void keyOnlyFilter()throws Exception {
Filter filter = new KeyOnlyFilter(true);
//Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
}
/**
* columnPrefixFilter 过滤器
* 列标识前缀过滤
* @throws Exception
*/
public void columnPrefixFilter()throws Exception {
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("nam"));
//Operator.MUST_PASS_ALL 集合中的filter必须全部生效通过,才算通过
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(filter));
}
}