方式1:SpringBoot + HBaseUtil
1. 添加pom.xml依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.5</version>
</dependency>
2. 编写关键类
■ 常量类(Constant.java)
public class Constant {
/** HBase配置 **/
public static class HBaseConfig {
public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";
public final static String ZK_PORT="2181";
public final static String ZK_PATH="/hbase";
}
}
■ 工具类(HBaseUtil.java)
import com.lonton.bigdata.constant.Constant;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseUtil {
private static final Logger log = LoggerFactory.getLogger(HBaseUtil.class);
public static Configuration conf;
private static Connection conn;
static {
// 使用HBaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
// HBase使用ZooKeeper的地址
conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);
// ZooKeeper客户端端口
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);
// ZooKeeper存储HBase信息的路径
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);
// 所有客户端的默认scan缓存时间, 默认100
conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");
// 失败重试时等待时间, 默认100
conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");
// 一次RPC请求的超时时间, 默认60000
conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");
// 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)
conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");
// 重试次数3次,默认31
conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");
}
/**
* 获得链接
* @return
*/
public static synchronized Connection getConnection() {
try {
if (conn == null || conn.isClosed()) {
conn = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
log.error("HBase 建立链接失败 ", e);
}
return conn;
}
/**
* 关闭连接
* @throws IOException
*/
public static void closeConnect(Connection conn) {
if (null != conn) {
try {
conn.close();
} catch (Exception e) {
log.error("closeConnect failure !", e);
}
}
}
/**
* 判断表是否存在(DDL)
* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
* @param tableName 表名
* @return 是否创建成功
* @throws IOException
*/
public static boolean isTableExist(String tableName) throws IOException {
boolean result = false;
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
result = admin.tableExists(tableName);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
return result;
}
/**
* 创建表(DDL)
* create 't1', 'cf1', 'cf2'
* @param tableName 表名
* @param columnFamily 列簇
* @throws IOException
*/
public static void createTable(String tableName, String... columnFamily) throws IOException {
// 判断表是否存在
if (isTableExist(tableName)) {
log.error("Table:" + tableName + " already exists!");
return;
}
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
// 创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
// 根据对表的配置,创建表
admin.createTable(descriptor);
log.info("Table:" + tableName + " create successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 新增记录(DML)
* put 't1', '1001', 'cf1:name', 'zhangsan'
* put 't1', '1001', 'cf1:age', '23'
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列簇
* @param column 列
* @param value 值
* @throws IOException
*/
public static void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
// 向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
// 向Put对象中组装数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
log.info("insert successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
}
/**
* 全表扫描(Scan)
* scan "t1"
* @param tableName 表名
* @return
* @throws IOException
*/
public static ResultScanner getAllRows(String tableName) throws IOException {
ResultScanner results = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
// 得到用于扫描region的对象
Scan scan = new Scan();
// setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
scan.setCaching(1000);
// 使用HTable得到resultcanner实现类的对象
results = hTable.getScanner(scan);
/*for (Result result : results) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println();
}
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return results;
}
/**
* 获取单行记录(get)
* get "t1","1001"
* @param tableName 表名
* @param rowKey 行键
* @return
* @throws IOException
*/
public static Result getRow(String tableName, String rowKey) throws IOException {
Result result = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions(); // 显示所有版本
//get.setTimeStamp(); // 显示指定时间戳的版本
result = hTable.get(get);
/*for (Cell cell : result.rawCells()) {
System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
System.out.print("Timestamp:" + cell.getTimestamp());
System.out.println();
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return result;
}
/**
* 获取多行记录(get)
* @param tableName 表名
* @param rows 行键(可扩展参数)
* @param <T>
* @return
* @throws IOException
*/
public static <T> Result[] getRows(String tableName, List<T> rows) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
List<Get> gets = null;
Result[] results = null;
try {
if (hTable != null) {
gets = new ArrayList<Get>();
for (T row : rows) {
if (row != null) {
gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
} else {
throw new RuntimeException("hbase have no data");
}
}
}
if (gets.size() > 0) {
results = hTable.get(gets);
}
} catch (IOException e) {
log.error("getRows failure !", e);
} finally {
try {
hTable.close();
} catch (IOException e) {
log.error("table.close() failure !", e);
}
}
return results;
}
/**
* 根据限定符获取单行记录(get)
* get "t1","1001","cf1:name"
* @param tableName 表名
* @param rowKey 行键
* @param family 列簇
* @param qualifier 限定符
* @throws IOException
*/
public static Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
Result result = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
result = hTable.get(get);
/*for (Cell cell : result.rawCells()) {
System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
System.out.print("Timestamp:" + cell.getTimestamp());
System.out.println();
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return result;
}
/**
* 删除多行数据(DML)
* @param tableName 表名
* @param rows 行键(可扩展参数)
* @throws IOException
*/
public static void deleteMultiRow(String tableName, String... rows) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
List<Delete> deleteList = new ArrayList<>();
for (String row : rows) {
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
}
/**
* 删除表(DDL)
* @param tableName 表名
* @throws IOException
*/
public static void dropTable(String tableName) throws IOException {
if (!isTableExist(tableName)) {
log.error("Table:" + tableName + " not exist!");
return;
}
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
if (isTableExist(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
log.info("Table:" + tableName + " delete successfully!");
}
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
}
3. 测试
■ 测试类(HBaseBaseUtilTests.java)
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseBaseUtilTests {
private String tableName="t1";
private String rowKey="1001";
/** BeforeClass:会在所有方法被调用前被执行,
* 而且该方法是静态的,所有当测试类被加载后接着就会运行它,
* 而且在内存中它只会存在一份实例,它比较适合加载配置文件
**/
@BeforeClass
public static void setUpBeforeClass() {
System.out.println("this is @BeforeClass ...");
HBaseUtil.getConnection();
}
/** AfterClass:通常用来对资源的清理,如关闭数据库的连接 **/
@AfterClass
public static void tearDownAfterClass() {
System.out.println("this is @AfterClass ...");
HBaseUtil.closeConnect(HBaseUtil.getConnection());
}
/** Before:每个测试方法调用前执行一次 **/
@Before
public void setUp() {
System.out.println("this is @Before ...");
}
/** Before:每个测试方法调用后执行一次 **/
@After
public void tearDown() {
System.out.println("this is @After ...");
}
@Test
public void createTable() throws IOException {
HBaseUtil.createTable(tableName, "cf1", "cf2");
assert HBaseUtil.isTableExist(tableName);
}
@Test
public void isTableExist() throws IOException {
assert HBaseUtil.isTableExist(tableName);
}
@Test
public void addRowData() throws IOException {
String value="tom";
HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void getAllRows() throws IOException {
ResultScanner results =HBaseUtil.getAllRows(tableName);
Assert.assertNotNull(results);
for (Result result : results) {
Assert.assertNotNull(result);
}
}
@Test
public void getRow() throws IOException {
String value="tom";
HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=HBaseUtil.getRow(tableName,rowKey);
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void getRows() throws IOException {
String rowKey1="1003";
String rowKey2="1004";
String value1="tom1";
String value2="tom2";
HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
List<String> rows=new ArrayList<>();
rows.add(rowKey1);
rows.add(rowKey2);
Result[] results=HBaseUtil.getRows(tableName,rows);
Assert.assertNotNull(results);
Assert.assertTrue(results.length==2);
}
@Test
public void getRowQualifier() throws IOException {
String value="tom";
HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void deleteMultiRow() throws IOException {
String rowKey1="1003";
String rowKey2="1004";
String value1="tom1";
String value2="tom2";
HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
HBaseUtil.deleteMultiRow(tableName, rowKey1, rowKey2);
List<String> rows=new ArrayList<>();
rows.add(rowKey1);
rows.add(rowKey2);
Result[] results=HBaseUtil.getRows(tableName,rows);
Assert.assertNotNull(results);
for (Result result : results) {
Cell[] cells=result.rawCells();
Assert.assertTrue(cells.length==0);
}
}
@Test
public void dropTable() throws IOException {
assert HBaseUtil.isTableExist(tableName);
HBaseUtil.dropTable(tableName);
assert !HBaseUtil.isTableExist(tableName);
}
}
4. 完整示例代码
源码:hbase-springboot-example01-src.zip
方式2:SpringBoot + HBaseService
1. 添加pom.xml依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.5</version>
</dependency>
2. 编写关键类
■ 常量类(Constant.java)
public class Constant {
/** HBase配置 **/
public static class HBaseConfig {
public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";
public final static String ZK_PORT="2181";
public final static String ZK_PATH="/hbase";
}
}
■ 配置类(HBaseConfig.java)
import com.lonton.bigdata.constant.Constant;
import com.lonton.bigdata.service.HBaseService;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HBaseConfig {
@Bean
public HBaseService getHBaseService() {
// 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
//System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");
// 执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
// 使用HBaseConfiguration的单例方法实例化
// HBase使用ZooKeeper的地址
conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);
// ZooKeeper客户端端口
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);
// ZooKeeper存储HBase信息的路径
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);
// 所有客户端的默认scan缓存时间, 默认100
conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");
// 失败重试时等待时间, 默认100
conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");
// 一次RPC请求的超时时间, 默认60000
conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");
// 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)
conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");
// 重试次数3次,默认31
conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");
return new HBaseService(conf);
}
}
■ 服务类(HBaseService.java)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseService {
private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
public static Configuration conf;
private static Connection conn;
public HBaseService(Configuration conf){
try {
if (conn == null || conn.isClosed()) {
conn = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
log.error("HBase connect failure", e);
}
}
/**
* 获得链接
*
* @return
*/
public Connection getConnection() {
try {
if (conn == null || conn.isClosed()) {
conn = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
log.error("HBase connect failure", e);
}
return conn;
}
/**
* 关闭连接
*
* @throws IOException
*/
public void closeConnect(Connection conn) {
if (null != conn) {
try {
conn.close();
} catch (Exception e) {
log.error("closeConnect failure !", e);
}
}
}
/**
* 判断表是否存在(DDL)
* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
*
* @param tableName 表名
* @return 是否创建成功
* @throws IOException
*/
public boolean isTableExist(String tableName) throws IOException {
boolean result = false;
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
result = admin.tableExists(tableName);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
return result;
}
/**
* 创建表(DDL)
* create 't1', 'cf1', 'cf2'
*
* @param tableName 表名
* @param columnFamily 列簇
* @throws IOException
*/
public void createTable(String tableName, String... columnFamily) throws IOException {
// 判断表是否存在
if (isTableExist(tableName)) {
log.error("Table:" + tableName + " already exists!");
return;
}
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
// 创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
// 根据对表的配置,创建表
admin.createTable(descriptor);
log.info("Table:" + tableName + " create successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 新增记录(DML)
* put 't1', '1001', 'cf1:name', 'zhangsan'
* put 't1', '1001', 'cf1:age', '23'
*
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列簇
* @param column 列
* @param value 值
* @throws IOException
*/
public void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
// 向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
// 向Put对象中组装数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
log.info("insert successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
}
/**
* 全表扫描(Scan)
* scan "t1"
*
* @param tableName 表名
* @return
* @throws IOException
*/
public ResultScanner getAllRows(String tableName) throws IOException {
ResultScanner results = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
// 得到用于扫描region的对象
Scan scan = new Scan();
// setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
scan.setCaching(1000);
// 使用HTable得到resultcanner实现类的对象
results = hTable.getScanner(scan);
/*for (Result result : results) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println();
}
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return results;
}
/**
* 获取单行记录(get)
* get "t1","1001"
*
* @param tableName 表名
* @param rowKey 行键
* @return
* @throws IOException
*/
public Result getRow(String tableName, String rowKey) throws IOException {
Result result = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions(); // 显示所有版本
//get.setTimeStamp(); // 显示指定时间戳的版本
result = hTable.get(get);
/*for (Cell cell : result.rawCells()) {
System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
System.out.print("Timestamp:" + cell.getTimestamp());
System.out.println();
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return result;
}
/**
* 获取多行记录(get)
*
* @param tableName 表名
* @param rows 行键(可扩展参数)
* @param <T>
* @return
* @throws IOException
*/
public <T> Result[] getRows(String tableName, List<T> rows) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
List<Get> gets = null;
Result[] results = null;
try {
if (hTable != null) {
gets = new ArrayList<Get>();
for (T row : rows) {
if (row != null) {
gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
} else {
throw new RuntimeException("hbase have no data");
}
}
}
if (gets.size() > 0) {
results = hTable.get(gets);
}
} catch (IOException e) {
log.error("getRows failure !", e);
} finally {
try {
hTable.close();
} catch (IOException e) {
log.error("table.close() failure !", e);
}
}
return results;
}
/**
* 根据限定符获取单行记录(get)
* get "t1","1001","cf1:name"
*
* @param tableName 表名
* @param rowKey 行键
* @param family 列簇
* @param qualifier 限定符
* @throws IOException
*/
public Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
Result result = null;
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
result = hTable.get(get);
/*for (Cell cell : result.rawCells()) {
System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
System.out.print("Timestamp:" + cell.getTimestamp());
System.out.println();
}*/
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
return result;
}
/**
* 删除多行数据(DML)
*
* @param tableName 表名
* @param rows 行键(可扩展参数)
* @throws IOException
*/
public void deleteMultiRow(String tableName, String... rows) throws IOException {
Connection conn = getConnection();
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
try {
List<Delete> deleteList = new ArrayList<>();
for (String row : rows) {
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
hTable.close();
}
}
/**
* 删除表(DDL)
*
* @param tableName 表名
* @throws IOException
*/
public void dropTable(String tableName) throws IOException {
if (!isTableExist(tableName)) {
log.error("Table:" + tableName + " not exist!");
return;
}
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
if (isTableExist(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
log.info("Table:" + tableName + " delete successfully!");
}
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
}
3. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import com.lonton.bigdata.service.HBaseService;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseServiceApplicationTests {
private String tableName="t1";
private String rowKey="1001";
@Resource
private HBaseService service;
@Test
public void isTableExist() throws IOException {
assert service.isTableExist(tableName);
}
@Test
public void addRowData() throws IOException {
String value="tom";
service.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void getAllRows() throws IOException {
ResultScanner results =service.getAllRows(tableName);
Assert.assertNotNull(results);
for (Result result : results) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println();
}
Assert.assertNotNull(result);
}
}
@Test
public void getRow() throws IOException {
String value="tom";
service.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=service.getRow(tableName,rowKey);
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void getRows() throws IOException {
String rowKey1="1003";
String rowKey2="1004";
String value1="tom1";
String value2="tom2";
service.addRowData(tableName, rowKey1, "cf1", "name", value1);
service.addRowData(tableName, rowKey2, "cf1", "name", value2);
List<String> rows=new ArrayList<>();
rows.add(rowKey1);
rows.add(rowKey2);
Result[] results=service.getRows(tableName,rows);
Assert.assertNotNull(results);
Assert.assertTrue(results.length==2);
}
@Test
public void getRowQualifier() throws IOException {
String value="tom";
service.addRowData(tableName, rowKey, "cf1", "name", value);
Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");
Cell[] cells=result.rawCells();
Assert.assertNotNull(cells);
Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
}
@Test
public void deleteMultiRow() throws IOException {
String rowKey1="1003";
String rowKey2="1004";
String value1="tom1";
String value2="tom2";
service.addRowData(tableName, rowKey1, "cf1", "name", value1);
service.addRowData(tableName, rowKey2, "cf1", "name", value2);
service.deleteMultiRow(tableName, rowKey1, rowKey2);
List<String> rows=new ArrayList<>();
rows.add(rowKey1);
rows.add(rowKey2);
Result[] results=service.getRows(tableName,rows);
Assert.assertNotNull(results);
for (Result result : results) {
Cell[] cells=result.rawCells();
Assert.assertTrue(cells.length==0);
}
}
@Test
public void dropTable() throws IOException {
assert service.isTableExist(tableName);
service.dropTable(tableName);
assert !service.isTableExist(tableName);
}
}
4. 完整示例代码
源码:hbase-springboot-example02-src.zip
方式3:SpringBoot + HbaseTemplate
1. 添加pom.xml依赖
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.5</version>
</dependency>
2. 工程配置
■ SpringBoot配置(application.yml)
hbase:
config:
hbase.zookeeper.quorum: 192.168.0.101,192.168.0.102,192.168.0.103
hbase.zookeeper.property.clientPort: 2181
zookeeper.znode.parent: /hbase
hbase.client.scanner.caching: 500
hbase.client.pause: 50
hbase.rpc.timeout: 2000
hbase.client.operation.timeout: 3000
hbase.client.retries.number: 3
hbase.rootdir: hdfs://bigdata-node1:9000/hbase
3. 编写关键类
■ 配置映射类(HBaseProperties.java)
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map;
@ConfigurationProperties(prefix = "hbase")
public class HBaseProperties {
private Map<String, String> config;
public Map<String, String> getConfig() {
return config;
}
public void setConfig(Map<String, String> config) {
this.config = config;
}
}
■ 配置类(HBaseConfig.java)
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import java.util.Map;
import java.util.Set;
@Configuration
@EnableConfigurationProperties(HBaseProperties.class)
public class HBaseConfig {
private final HBaseProperties properties;
public HBaseConfig(HBaseProperties properties) {
this.properties = properties;
}
@Bean
public HbaseTemplate hbaseTemplate() {
HbaseTemplate hbaseTemplate = new HbaseTemplate();
hbaseTemplate.setConfiguration(configuration());
hbaseTemplate.setAutoFlush(true);
return hbaseTemplate;
}
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
Map<String, String> config = properties.getConfig();
Set<String> keySet = config.keySet();
for (String key : keySet) {
configuration.set(key, config.get(key));
}
return configuration;
}
}
其他方式:
@Configuration
public class HBaseConfig {
@Value("${hbase.zookeeper.quorum}")
private String zookeeperQuorum;
@Value("${hbase.zookeeper.property.clientPort}")
private String clientPort;
@Value("${zookeeper.znode.parent}")
private String znodeParent;
@Bean
public HbaseTemplate hbaseTemplate() {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", clientPort);
conf.set("zookeeper.znode.parent", znodeParent);
return new HbaseTemplate(conf);
}
}
■ 服务类(HBaseService.java)
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HBaseService {
private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
@Autowired
private HbaseTemplate hbaseTemplate;
/**
* 判断表是否存在(DDL)
* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
*
* @param tableName 表名
* @return 是否创建成功
* @throws IOException
*/
public boolean isTableExist(String tableName) throws IOException {
boolean result = false;
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
result = admin.tableExists(tableName);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
return result;
}
/**
* 创建表(DDL)
* create 't1', 'cf1', 'cf2'
*
* @param tableName 表名
* @param columnFamily 列簇
* @throws IOException
*/
public void createTable(String tableName, String... columnFamily) throws IOException {
// 判断表是否存在
if (isTableExist(tableName)) {
log.error("Table:" + tableName + " already exists!");
return;
}
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
// 创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
// 根据对表的配置,创建表
admin.createTable(descriptor);
log.info("Table:" + tableName + " create successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 新增记录(DML)
* put 't1', '1001', 'cf1:name', 'zhangsan'
* put 't1', '1001', 'cf1:age', '23'
*
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列簇
* @param column 列
* @param value 值
*/
public void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) {
hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));
}
/**
* 全表扫描(Scan)
* scan 't1'
* [
* {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},
* {cf2={name=lisi, age=24}}
* ]
*
* @param tableName 表名
* @return
*/
public List<Map<String, Map<String, String>>> getAllRows(String tableName) {
Scan scan = new Scan();
List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {
Cell[] cells = result.rawCells();
Map<String, Map<String, String>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, String> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
data.put(columnFamily, obj);
}
return data;
});
return list;
}
/**
* 根据RowKey获取单行记录(get)
* get 't1','1001'
* {cf1_name=tom, cf1_age=23}
*
* @param tableName 表名
* @param rowKey 行键
* @return
*/
public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {
return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
Cell[] cells = result.rawCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, Object> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
data.put(columnFamily, obj);
}
return data;
}
});
}
/**
* 获取多行记录(get)
*
* @param tableName 表名
* @param rowKeys 行键(可扩展参数)
* @return
*/
public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {
List<Map<String, Map<String, Object>>> rows = new ArrayList<>();
for (String rowKey : rowKeys) {
Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
Cell[] cells = result.rawCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, Object> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
obj.put("rowKey", rowKey);
data.put(columnFamily, obj);
}
return data;
}
});
rows.add(row);
}
return rows;
}
/**
* 根据限定符获取指定数据(get)
*
* @param tableName 表名
* @param rowName 行键
* @param familyName 列簇
* @param qualifier 限定符
* @return
*/
public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {
return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {
List<Cell> ceList = result.listCells();
String res = "";
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
return res;
});
}
/**
* 根据StartKey和EndKey获取数据(scan),前闭后开区间
*
* @param tableName 表名
* @param startRow 起始rowKey
* @param stopRow 结束rowKey
* @return
*/
public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {
Scan scan = new Scan();
if (startRow == null) {
startRow = "";
}
if (stopRow == null) {
stopRow = "";
} else {
stopRow += "|";
}
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
/* PageFilter filter = new PageFilter(5);
scan.setFilter(filter);*/
return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
List<Cell> ceList = result.listCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
Map<String, Object> obj = data.get(family);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put("rowKey", rowKey);
obj.put(qualifier, value);
data.put(family, obj);
}
}
return data;
}
});
}
/**
* 删除列簇(DML)
*
* @param tableName 表名
* @param rowKey 行键
* @param familyName 列簇
*/
public void deleteRow(String tableName, String rowKey, String familyName) {
hbaseTemplate.delete(tableName, rowKey, familyName);
}
/**
* 删除限定符(DML)
*
* @param tableName 表名
* @param rowKey 行键
* @param familyName 列簇
* @param qualifier 限定符
*/
public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {
hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);
}
/**
* 删除多行数据(DML)
*
* @param tableName 表名
* @param familyNames 列簇
* @param rowKeys 行键(可扩展参数)
*/
public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {
for (String rowKey : rowKeys) {
for (String familyName : familyNames) {
hbaseTemplate.delete(tableName, rowKey, familyName);
}
}
}
/**
* 根据rowKey删除记录(DML)
*
* @param tableName 表名
* @param rowKey 行键
*/
public void deleteAll(String tableName, final String rowKey) {
hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
});
}
/**
* 删除表(DDL)
*
* @param tableName 表名
* @throws IOException
*/
public void dropTable(String tableName) throws IOException {
if (!isTableExist(tableName)) {
log.error("Table:" + tableName + " not exist!");
return;
}
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
if (isTableExist(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
log.info("Table:" + tableName + " delete successfully!");
}
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 通用方法(一般可用于更新操作,如:delete、put)
示例:deleteAll
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
示例:put
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
* @param tableName
* @param action
*/
public void execute(String tableName, TableCallback action) {
hbaseTemplate.execute(tableName, action);
}
}
4. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseServiceApplicationTests {
private String tableName = "t1";
private String rowKey = "1001";
private String rowKey2 = "1002";
private String rowKey3 = "1003";
private String CF_01 = "cf1";
private String CF1_QUALIFIER_NAME = "name";
private String CF1_QUALIFIER_AGE = "age";
private String CF1_NAME_VALUE = "tom";
private String CF1_AGE_VALUE = "23";
private String CF_02 = "cf2";
private String CF2_QUALIFIER_ADDR = "address";
private String CF2_ADDR_VALUE = "beijing";
@Autowired
private HBaseService service;
@Test
public void isTableExist() throws IOException {
assert service.isTableExist(tableName);
}
@Test
public void createTable() throws IOException {
service.createTable(tableName, CF_01, CF_02);
assert service.isTableExist(tableName);
}
@Test
public void addRowData() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
}
@Test
public void getAllRows() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);
Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));
System.out.println(rows);
}
@Test
public void getRowByRowKey() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
Assert.assertNotNull(row);
Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));
Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));
System.out.println(row);
}
@Test
public void getRows() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
System.out.println(rows);
}
@Test
public void getRowQualifier() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
}
@Test
public void getRowsByKey() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);
Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
System.out.println(rows);
}
@Test
public void deleteQualifier() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));
}
@Test
public void deleteRow() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
service.deleteRow(tableName, rowKey, CF_01);
service.deleteRow(tableName, rowKey, CF_02);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void deleteMultiRow() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
System.out.println("----------------------- test 1 -------------------------");
System.out.println(rows);
String[] familyNames = {CF_01, CF_02};
service.deleteRows(tableName, familyNames, rowKey2, rowKey3);
rows = service.getRows(tableName, rowKey2, rowKey3);
System.out.println("----------------------- test 2 -------------------------");
System.out.println(rows); // [{}, {}]
Assert.assertTrue(rows.get(0).isEmpty());
}
@Test
public void deleteAll() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
service.deleteAll(tableName, rowKey);
row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void dropTable() throws IOException {
if (!service.isTableExist(tableName)) {
service.createTable(tableName);
}
assert service.isTableExist(tableName);
service.dropTable(tableName);
assert !service.isTableExist(tableName);
}
@Test
public void execute_deleteAll() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void execute_put() {
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));
}
}
5. 完整示例代码
源码:hbase-springboot-example03-src.zip
方式4:SpringBoot + HbaseTemplate(Spring配置)
1. 添加pom.xml依赖
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.5</version>
</dependency>
2. 工程配置
■ 参数配置(resources/config/hbase.properties)
hbase.zk.host=192.168.0.101,192.168.0.102,192.168.0.103
hbase.zk.port=2181
■ Spring配置(resources/config/hbase-spring.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
<context:property-placeholder location="classpath:/config/hbase.properties"/>
<hdp:configuration id="hadoopConfiguration">fs.default.name=hdfs://bigdata-node1:9000</hdp:configuration>
<!-- HA -->
<!--<hdp:configuration id="hadoopConfiguration">fs.defaultFS=hdfs://bigdata-node1:9000</hdp:configuration>-->
<hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}" delete-connection="false" />
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
<property name="configuration" ref="hbaseConfiguration"/>
</bean>
</beans>
3. 编写关键类
■ 启动类(Application.java)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication
@ImportResource(locations = {"classpath:/config/hbase-spring.xml"})
public class Application {
public static void main(String[] args) {
// 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
//System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");
SpringApplication.run(Application.class, args);
}
}
■ 服务类(HBaseService.java)
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class HBaseService {
private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
@Autowired
private HbaseTemplate hbaseTemplate;
/**
* 判断表是否存在(DDL)
* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
*
* @param tableName 表名
* @return 是否创建成功
* @throws IOException
*/
public boolean isTableExist(String tableName) throws IOException {
boolean result = false;
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
result = admin.tableExists(tableName);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
return result;
}
/**
* 创建表(DDL)
* create 't1', 'cf1', 'cf2'
*
* @param tableName 表名
* @param columnFamily 列簇
* @throws IOException
*/
public void createTable(String tableName, String... columnFamily) throws IOException {
// 判断表是否存在
if (isTableExist(tableName)) {
log.error("Table:" + tableName + " already exists!");
return;
}
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
// 创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
// 根据对表的配置,创建表
admin.createTable(descriptor);
log.info("Table:" + tableName + " create successfully!");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 新增记录(DML)
* put 't1', '1001', 'cf1:name', 'zhangsan'
* put 't1', '1001', 'cf1:age', '23'
*
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列簇
* @param column 列
* @param value 值
*/
public void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) {
hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));
}
/**
* 全表扫描(Scan)
* scan 't1'
* [
* {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},
* {cf2={name=lisi, age=24}}
* ]
*
* @param tableName 表名
* @return
*/
public List<Map<String, Map<String, String>>> getAllRows(String tableName) {
Scan scan = new Scan();
List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {
Cell[] cells = result.rawCells();
Map<String, Map<String, String>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, String> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
data.put(columnFamily, obj);
}
return data;
});
return list;
}
/**
* 根据RowKey获取单行记录(get)
* get 't1','1001'
* {cf1_name=tom, cf1_age=23}
*
* @param tableName 表名
* @param rowKey 行键
* @return
*/
public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {
return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
Cell[] cells = result.rawCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, Object> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
data.put(columnFamily, obj);
}
return data;
}
});
}
/**
* 获取多行记录(get)
*
* @param tableName 表名
* @param rowKeys 行键(可扩展参数)
* @return
*/
public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {
List<Map<String, Map<String, Object>>> rows = new ArrayList<>();
for (String rowKey : rowKeys) {
Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
Cell[] cells = result.rawCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
Map<String, Object> obj = data.get(columnFamily);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put(rowName, value);
obj.put("rowKey", rowKey);
data.put(columnFamily, obj);
}
return data;
}
});
rows.add(row);
}
return rows;
}
/**
* 根据限定符获取指定数据(get)
*
* @param tableName 表名
* @param rowName 行键
* @param familyName 列簇
* @param qualifier 限定符
* @return
*/
public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {
return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {
List<Cell> ceList = result.listCells();
String res = "";
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
return res;
});
}
/**
* 根据StartKey和EndKey获取数据(scan),前闭后开区间
*
* @param tableName 表名
* @param startRow 起始rowKey
* @param stopRow 结束rowKey
* @return
*/
public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {
Scan scan = new Scan();
if (startRow == null) {
startRow = "";
}
if (stopRow == null) {
stopRow = "";
} else {
stopRow += "|";
}
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
/* PageFilter filter = new PageFilter(5);
scan.setFilter(filter);*/
return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> mapRow(Result result, int i) {
List<Cell> ceList = result.listCells();
Map<String, Map<String, Object>> data = new HashMap<>(16);
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
Map<String, Object> obj = data.get(family);
if (null == obj) {
obj = new HashMap<>(16);
}
obj.put("rowKey", rowKey);
obj.put(qualifier, value);
data.put(family, obj);
}
}
return data;
}
});
}
/**
* 删除列簇(DML)
*
* @param tableName 表名
* @param rowKey 行键
* @param familyName 列簇
*/
public void deleteRow(String tableName, String rowKey, String familyName) {
hbaseTemplate.delete(tableName, rowKey, familyName);
}
/**
* 删除限定符(DML)
*
* @param tableName 表名
* @param rowKey 行键
* @param familyName 列簇
* @param qualifier 限定符
*/
public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {
hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);
}
/**
* 删除多行数据(DML)
*
* @param tableName 表名
* @param familyNames 列簇
* @param rowKeys 行键(可扩展参数)
*/
public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {
for (String rowKey : rowKeys) {
for (String familyName : familyNames) {
hbaseTemplate.delete(tableName, rowKey, familyName);
}
}
}
/**
* 根据rowKey删除记录(DML)
*
* @param tableName 表名
* @param rowKey 行键
*/
public void deleteAll(String tableName, final String rowKey) {
hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
});
}
/**
* 删除表(DDL)
*
* @param tableName 表名
* @throws IOException
*/
public void dropTable(String tableName) throws IOException {
if (!isTableExist(tableName)) {
log.error("Table:" + tableName + " not exist!");
return;
}
HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
try {
if (isTableExist(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
log.info("Table:" + tableName + " delete successfully!");
}
} catch (Exception e) {
log.error(e.getMessage());
} finally {
admin.close();
}
}
/**
* 通用方法(一般可用于更新操作,如:delete、put)
示例:deleteAll
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
示例:put
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
* @param tableName
* @param action
*/
public void execute(String tableName, TableCallback action) {
hbaseTemplate.execute(tableName, action);
}
}
4. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseServiceApplicationTests {
private String tableName = "t1";
private String rowKey = "1001";
private String rowKey2 = "1002";
private String rowKey3 = "1003";
private String CF_01 = "cf1";
private String CF1_QUALIFIER_NAME = "name";
private String CF1_QUALIFIER_AGE = "age";
private String CF1_NAME_VALUE = "tom";
private String CF1_AGE_VALUE = "23";
private String CF_02 = "cf2";
private String CF2_QUALIFIER_ADDR = "address";
private String CF2_ADDR_VALUE = "beijing";
@Autowired
private HBaseService service;
@Test
public void isTableExist() throws IOException {
assert service.isTableExist(tableName);
}
@Test
public void createTable() throws IOException {
service.createTable(tableName, CF_01, CF_02);
assert service.isTableExist(tableName);
}
@Test
public void addRowData() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
}
@Test
public void getAllRows() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);
Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));
System.out.println(rows);
}
@Test
public void getRowByRowKey() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
Assert.assertNotNull(row);
Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));
Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));
System.out.println(row);
}
@Test
public void getRows() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
System.out.println(rows);
}
@Test
public void getRowQualifier() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
}
@Test
public void getRowsByKey() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);
Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
System.out.println(rows);
}
@Test
public void deleteQualifier() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));
Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));
}
@Test
public void deleteRow() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
service.deleteRow(tableName, rowKey, CF_01);
service.deleteRow(tableName, rowKey, CF_02);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void deleteMultiRow() {
service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
System.out.println("----------------------- test 1 -------------------------");
System.out.println(rows);
String[] familyNames = {CF_01, CF_02};
service.deleteRows(tableName, familyNames, rowKey2, rowKey3);
rows = service.getRows(tableName, rowKey2, rowKey3);
System.out.println("----------------------- test 2 -------------------------");
System.out.println(rows); // [{}, {}]
Assert.assertTrue(rows.get(0).isEmpty());
}
@Test
public void deleteAll() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
service.deleteAll(tableName, rowKey);
row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void dropTable() throws IOException {
if (!service.isTableExist(tableName)) {
service.createTable(tableName);
}
assert service.isTableExist(tableName);
service.dropTable(tableName);
assert !service.isTableExist(tableName);
}
@Test
public void execute_deleteAll() {
service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
row = service.getRowByRowKey(tableName, rowKey);
System.out.println(row);
Assert.assertTrue(row.isEmpty());
}
@Test
public void execute_put() {
TableCallback action = new TableCallback<Boolean>() {
public Boolean doInTable(HTableInterface table) {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));
}
}
■ Runner测试类(HBaseServiceRunnerTest.java)
import com.lonton.bigdata.service.HBaseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class HBaseServiceRunnerTest implements ApplicationRunner {
private final static Logger log = LoggerFactory.getLogger(HBaseServiceRunnerTest.class);
private String tableName = "t1";
@Autowired
private HBaseService service;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("------------------------- HBaseServiceTest ApplicationRunner --------------------------------");
log.info(String.valueOf(service.isTableExist(tableName)));
}
}
5. 完整示例代码
源码:hbase-springboot-example04-src.zip
方式5:SpringBoot + hbase starter
spring-boot-starter-hbase是spring-boot自定义的starter,为hbase的query和更新等操作提供简易的api并集成spring-boot的auto configuration。如果HbaseTemplate操作不满足需求,完全可以使用hbaseTemplate的getConnection()方法,获取连接。进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能**。**
源码:https://github.com/SpringForAll/spring-boot-starter-hbase
示例:https://github.com/JeffLi1993/springboot-learning-example
参考:Spring Boot 2.x:通过spring-boot-starter-hbase集成HBase
1. 添加pom.xml依赖
<!-- Spring Boot HBase 依赖 -->
<!-- https://mvnrepository.com/artifact/com.spring4all/spring-boot-starter-hbase -->
<dependency>
<groupId>com.spring4all</groupId>
<artifactId>spring-boot-starter-hbase</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
补充:安装spring-boot-starter-hbase组件依赖(因为不在公共仓库,只能自行安装。如果有maven私库,可以考虑安装到私库)
# 下载项目到本地
git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
cd spring-boot-starter-hbase
# 安装依赖
mvn clean install
源码:spring-boot-starter-hbase-master.zip
2. 工程配置
■ SpringBoot配置(application.properties)
## HBase Configuration
spring.data.hbase.quorum=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
spring.data.hbase.rootDir=hdfs://bigdata-node1:9000/hbase
spring.data.hbase.nodeParent=/hbase
具体配置项信息如下:
- spring.data.hbase.quorum:指定HBase的zk地址
- spring.data.hbase.rootDir:指定HBase在HDFS上存储的路径
- spring.data.hbase.nodeParent:指定ZK中HBase的根ZNode
3. 编写关键类
■ Service接口(HBaseService.java) ```java import com.spring4all.spring.boot.starter.hbase.api.TableCallback; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import java.util.List;
/**
Description HBase服务接口 */ public interface HBaseService {
/**
- 根据rowKey获取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param
泛型 @return */
T get(String tableName, String rowKey); /**
- 根据rowKey和列簇取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param familyName 列簇
- @param
泛型 @return */
T get(String tableName, String rowKey, String familyName); /**
- 根据限定符获取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param familyName 列簇
- @param qualifier 限定符
- @param
泛型 @return */
T get(String tableName, String rowKey, String familyName, String qualifier); /**
- 新增或修改
- 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
- @param tableName 表名
@param mutation 执行put update or delete */ void saveOrUpdate(String tableName, Mutation mutation);
/**
- 批量新增或修改
- 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
- @param tableName 表名
@param mutations 批量执行put update or delete */ void saveOrUpdates(String tableName, List
mutations); /**
- 通用方法(一般可用于更新操作,如:delete、put)
// 执行操作(deleteAll) TableCallback action = new TableCallback() {
}; // 执行操作(put update) TableCallback action = new TableCallback@Override
public Boolean doInTable(Table table) throws Throwable {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
() {
};@Override
public Boolean doInTable(Table table) throws Throwable {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
- @param tableName 表名
- @param action 执行put update or delete
- @param
泛型 @return */
T execute(String tableName, TableCallback action); /**
- 扫描 *
- @param tableName 表名
- @param scan 扫描对象
- @param
泛型 @return */
List find(String tableName, Scan scan); /**
- 扫描(根据列簇)
- @param tableName
- @param family
- @param
@return */
List find(String tableName, String family); /**
- 扫描(根据限定符)
- @param tableName
- @param family
- @param qualifier
- @param
@return */
List find(String tableName, String family, String qualifier); /**
- 如果HbaseTemplate操作不满足需求,可以使用hbaseTemplate的getConnection()方法,
- 获取连接,进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能
@return */ Connection getConnection();
/**
- 关闭连接
- @param connection */ void closeConnection(Connection connection);
}
<a name="Bi7lI"></a>
## 4. 测试
**■ DTO定义(Student.java)**
```java
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Description DTO-学生实体类
* create 'student','base','other'
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
private BaseInfo base;
private OtherInfo other;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class BaseInfo{
private String name;
private Integer age;
@Override
public String toString() {
return "BaseInfo{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class OtherInfo{
private Integer fv;
@Override
public String toString() {
return "OtherInfo{" +
"fv=" + fv +
'}';
}
}
@Override
public String toString() {
return "Student{" +
"base=" + base +
", other=" + other +
'}';
}
}
■ RowMapper定义(StudentRowMapper.java)
import com.lonton.bigdata.domain.Student;
import com.spring4all.spring.boot.starter.hbase.api.RowMapper;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Description HBase Mapper
*/
public class StudentRowMapper implements RowMapper<Student> {
private static final byte[] BASE_FAMILY = "base".getBytes();
private static final byte[] OTHER_FAMILY = "other".getBytes();
private static final byte[] NAME = "name".getBytes();
private static final byte[] AGE = "age".getBytes();
private static final byte[] FV = "fv".getBytes();
/*方法1:ok*/
/*
@Override
public Student mapRow(Result result, int rowNum) throws Exception {
Student.BaseInfo baseInfo = new Student.BaseInfo();
Student.OtherInfo otherInfo = new Student.OtherInfo();
Cell[] cells = result.rawCells();
for (Cell c : cells) {
String columnFamily = new String(CellUtil.cloneFamily(c));
String rowName = new String(CellUtil.cloneQualifier(c));
String value = new String(CellUtil.cloneValue(c));
if(value!=null){
if(Bytes.toString(BASE_FAMILY).equals(columnFamily)){
if(Bytes.toString(NAME).equals(rowName)){
baseInfo.setName(value);
}else if(Bytes.toString(AGE).equals(rowName)){
Integer age=Integer.parseInt(value);
baseInfo.setAge(age);
}
} else if(Bytes.toString(OTHER_FAMILY).equals(columnFamily)){
if(Bytes.toString(FV).equals(rowName)){
Integer fv=Integer.parseInt(value);
otherInfo.setFv(fv);
}
}
}
}
Student dto = new Student(baseInfo, otherInfo);
return dto;
}*/
/*方法2:ok*/
@Override
public Student mapRow(Result result, int rowNum) throws Exception {
Student.BaseInfo baseInfo = new Student.BaseInfo();
if (result.containsColumn(BASE_FAMILY, NAME)) {
String name = Bytes.toString(result.getValue(BASE_FAMILY, NAME));
baseInfo.setName(name);
}
if (result.containsColumn(BASE_FAMILY, AGE)) {
Integer age = Integer.parseInt(Bytes.toString(result.getValue(BASE_FAMILY, AGE)));
baseInfo.setAge(age);
}
Student.OtherInfo otherInfo = new Student.OtherInfo();
if (result.containsColumn(OTHER_FAMILY, FV)) {
Integer fv = Integer.parseInt(Bytes.toString(result.getValue(OTHER_FAMILY, FV)));
otherInfo.setFv(fv);
}
Student dto = new Student(baseInfo, otherInfo);
return dto;
}
}
■ Service实现类(StudentHBaseServiceImpl.java)
import com.lonton.bigdata.domain.Student;
import com.lonton.bigdata.mapper.StudentRowMapper;
import com.lonton.bigdata.service.HBaseService;
import com.spring4all.spring.boot.starter.hbase.api.HbaseTemplate;
import com.spring4all.spring.boot.starter.hbase.api.TableCallback;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
/**
* Description HBase服务接口实现类
*/
@Service
public class StudentHBaseServiceImpl implements HBaseService {
@Autowired
private HbaseTemplate hbaseTemplate;
@Override
public Student get(String tableName, String rowKey) {
return hbaseTemplate.get(tableName,rowKey,new StudentRowMapper());
}
@Override
public Student get(String tableName, String rowKey, String familyName) {
return hbaseTemplate.get(tableName,rowKey,familyName,new StudentRowMapper());
}
@Override
public Student get(String tableName, String rowKey, String familyName, String qualifier) {
return hbaseTemplate.get(tableName,rowKey,familyName,qualifier,new StudentRowMapper());
}
@Override
public void saveOrUpdate(String tableName, Mutation mutation) {
hbaseTemplate.saveOrUpdate(tableName,mutation);
}
@Override
public void saveOrUpdates(String tableName, List<Mutation> mutations) {
hbaseTemplate.saveOrUpdates(tableName,mutations);
}
@Override
public <T> T execute(String tableName, TableCallback<T> action) {
return hbaseTemplate.execute(tableName,action);
}
@Override
public List<Student> find(String tableName, Scan scan) {
return hbaseTemplate.find(tableName,scan,new StudentRowMapper());
}
@Override
public List<Student> find(String tableName, String family) {
return hbaseTemplate.find(tableName,family,new StudentRowMapper());
}
@Override
public List<Student> find(String tableName, String family, String qualifier) {
return hbaseTemplate.find(tableName,family,qualifier,new StudentRowMapper());
}
@Override
public Connection getConnection() {
return hbaseTemplate.getConnection();
}
@Override
public void closeConnection(Connection connection) {
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
■ 测试类(HBaseServiceApplicationTests.java)
import com.lonton.bigdata.domain.Student;
import com.spring4all.spring.boot.starter.hbase.api.TableCallback;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/****
* @Description HBase单元测试类
* create 'student','base','other'
* truncate 'student'
* put 'student','1001','base:name','zhangsan'
* put 'student','1001','base:age',23
* put 'student','1001','other:fv',3
* scan 'student'
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseServiceApplicationTests {
private String tableName = "student";
private String rowKey = "1001";
private String CF_BASE = "base";
private String CF_OTHER = "other";
private String CF_BASE_QUALIFIER_NAME = "name";
private String CF_BASE_QUALIFIER_AGE = "age";
private String CF_OTHER_QUALIFIER_FV = "fv";
private String CF_BASE_VALUE_NAME = "zhangsan";
private Integer CF_BASE_VALUE_AGE = 23;
private Integer CF_OTHER_VALUE_FV = 3;
@Autowired
private HBaseService service;
/** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=3}} **/
@Test
public void get1() {
// 新增测试数据
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 获取记录
Student dto=service.get(tableName,rowKey);
// 验证
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
}
/** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=null}} **/
@Test
public void get2() {
// 新增测试数据
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 获取记录
Student dto=service.get(tableName,rowKey,CF_BASE);
// 验证
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertNull(dto.getOther().getFv());
}
/** Student{base=BaseInfo{name='zhangsan', age=null}, other=OtherInfo{fv=null}} **/
@Test
public void get3() {
// 新增测试数据
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 获取记录
Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);
// 验证
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertNull(dto.getBase().getAge());
Assert.assertNull(dto.getOther().getFv());
}
@Test
public void saveOrUpdate() {
// 新增测试数据
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 验证
Student dto=service.get(tableName,rowKey);
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
}
@Test
public void saveOrUpdates() {
List<Mutation> mutations=new ArrayList<>();
// 新增${CF_BASE_QUALIFIER_NAME}
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutations.add(mutation);
// 新增${CF_BASE_QUALIFIER_AGE}
mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutations.add(mutation);
// 新增${CF_OTHER_QUALIFIER_FV}
mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
mutations.add(mutation);
// 批量新增测试数据
service.saveOrUpdates(tableName, mutations);
// 验证
Student dto=service.get(tableName,rowKey);
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
}
@Test
public void execute_deleteAll() {
// 新增测试数据
Put mutation = new Put(Bytes.toBytes(rowKey));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 获取记录
Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);
// 验证
System.out.println(dto);
// 执行操作(deleteAll)
TableCallback action = new TableCallback<Boolean>() {
@Override
public Boolean doInTable(Table table) throws Throwable {
boolean flag = false;
try {
List<Delete> list = new ArrayList<>();
Delete d1 = new Delete(rowKey.getBytes());
list.add(d1);
table.delete(list);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
// 验证
dto=service.get(tableName,rowKey);
System.out.println(dto);
Assert.assertNull(dto.getBase().getName());
Assert.assertNull(dto.getBase().getAge());
Assert.assertNull(dto.getOther().getFv());
}
@Test
public void execute_put() {
// 执行操作(put update)
TableCallback action = new TableCallback<Boolean>() {
@Override
public Boolean doInTable(Table table) throws Throwable {
boolean flag = false;
try {
Put put = new Put(rowKey.getBytes());
put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
table.put(put);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
};
service.execute(tableName, action);
// 获取记录
Student dto=service.get(tableName,rowKey);
// 验证
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
}
@Test
public void find_scan() {
// 新增测试数据
String startRow=rowKey+"-01";
String stopRow=rowKey+"-02";
Put mutation = new Put(Bytes.toBytes(startRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
mutation = new Put(Bytes.toBytes(stopRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 范围扫描(scan为空则为全表扫描)
Scan scan = new Scan();
if (startRow == null) {
startRow = "";
}
if (stopRow == null) {
stopRow = "";
} else {
stopRow += "|";
}
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(stopRow));
scan.setCaching(5000);
List<Student> dtoList=service.find(tableName,scan);
// 验证
for (Student dto : dtoList) {
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
}
}
@Test
public void find_family() {
// 新增测试数据
String startRow=rowKey+"-01";
String stopRow=rowKey+"-02";
Put mutation = new Put(Bytes.toBytes(startRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
mutation = new Put(Bytes.toBytes(stopRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 范围扫描
List<Student> dtoList=service.find(tableName,CF_BASE);
// 验证
for (Student dto : dtoList) {
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
Assert.assertNull(dto.getOther().getFv());
}
}
@Test
public void find_qualifier() {
// 新增测试数据
String startRow=rowKey+"-01";
String stopRow=rowKey+"-02";
Put mutation = new Put(Bytes.toBytes(startRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
mutation = new Put(Bytes.toBytes(stopRow));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
service.saveOrUpdate(tableName, mutation);
// 范围扫描
List<Student> dtoList=service.find(tableName,CF_BASE,CF_BASE_QUALIFIER_NAME);
// 验证
for (Student dto : dtoList) {
System.out.println(dto);
Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
Assert.assertNull(dto.getBase().getAge());
Assert.assertNull(dto.getOther().getFv());
}
}
@Test
public void getAndCloseConnection() {
Connection connection=service.getConnection();
System.out.println(connection);
Assert.assertFalse(connection.isClosed());
service.closeConnection(connection);
Assert.assertTrue(connection.isClosed());
}
}
■ 控制类(StudentController.java)
import com.lonton.bigdata.domain.Student;
import com.lonton.bigdata.service.HBaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
/**
* Description 控制类<br/>
* put 'student','1001','base:name','zhangsan'
* put 'student','1001','base:age',23
* put 'student','1001','other:fv',3
* @RestController等价于@Controller + @ResponseBody
*/
@RestController
public class StudentController {
@Autowired
private HBaseService service;
/** http://localhost:8080/hbaseservice/student/1001 **/
/**
* {"base":{"name":"zhangsan","age":23},"other":{"fv":3}}
**/
@RequestMapping(value = "/hbaseservice/{tableName}/{rowKey}", method = RequestMethod.GET)
public Student getStudent(@PathVariable String tableName, @PathVariable String rowKey) {
return service.get(tableName, rowKey);
}
}