方式1:SpringBoot + HBaseUtil
1. 添加pom.xml依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.5</version></dependency>
2. 编写关键类
■ 常量类(Constant.java)
public class Constant {/** HBase配置 **/public static class HBaseConfig {public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";public final static String ZK_PORT="2181";public final static String ZK_PATH="/hbase";}}
■ 工具类(HBaseUtil.java)
import com.lonton.bigdata.constant.Constant;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class HBaseUtil {private static final Logger log = LoggerFactory.getLogger(HBaseUtil.class);public static Configuration conf;private static Connection conn;static {// 使用HBaseConfiguration的单例方法实例化conf = HBaseConfiguration.create();// HBase使用ZooKeeper的地址conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);// ZooKeeper客户端端口conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);// ZooKeeper存储HBase信息的路径conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);// 所有客户端的默认scan缓存时间, 默认100conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");// 失败重试时等待时间, 默认100conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");// 一次RPC请求的超时时间, 默认60000conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");// 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");// 重试次数3次,默认31conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");}/*** 获得链接* @return*/public static synchronized Connection getConnection() {try {if (conn == null || conn.isClosed()) {conn = ConnectionFactory.createConnection(conf);}} catch (IOException e) {log.error("HBase 建立链接失败 ", e);}return conn;}/*** 关闭连接* @throws IOException*/public static void closeConnect(Connection conn) {if (null != conn) {try {conn.close();} catch (Exception e) {log.error("closeConnect failure !", e);}}}/*** 判断表是否存在(DDL)* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}* @param tableName 表名* @return 是否创建成功* @throws IOException*/public static boolean isTableExist(String tableName) throws IOException {boolean result = false;Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {result = admin.tableExists(tableName);} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}return result;}/*** 创建表(DDL)* create 't1', 'cf1', 'cf2'* @param tableName 表名* @param columnFamily 列簇* @throws IOException*/public static void createTable(String tableName, String... columnFamily) throws IOException {// 判断表是否存在if (isTableExist(tableName)) {log.error("Table:" + tableName + " already exists!");return;}Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {// 创建表属性对象,表名需要转字节HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));// 创建多个列族for (String cf : columnFamily) {descriptor.addFamily(new HColumnDescriptor(cf));}// 根据对表的配置,创建表admin.createTable(descriptor);log.info("Table:" + tableName + " create successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 新增记录(DML)* put 't1', '1001', 'cf1:name', 'zhangsan'* put 't1', '1001', 'cf1:age', '23'* @param tableName 表名* @param rowKey 行键* @param columnFamily 列簇* @param column 列* @param value 值* @throws IOException*/public static void addRowData(String tableName, String rowKey, String columnFamily, Stringcolumn, String value) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {// 向表中插入数据Put put = new Put(Bytes.toBytes(rowKey));// 向Put对象中组装数据put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));hTable.put(put);log.info("insert successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}}/*** 全表扫描(Scan)* scan "t1"* @param tableName 表名* @return* @throws IOException*/public static ResultScanner getAllRows(String tableName) throws IOException {ResultScanner results = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {// 得到用于扫描region的对象Scan scan = new Scan();// setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。scan.setCaching(1000);// 使用HTable得到resultcanner实现类的对象results = hTable.getScanner(scan);/*for (Result result : results) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));System.out.println();}}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return results;}/*** 获取单行记录(get)* get "t1","1001"* @param tableName 表名* @param rowKey 行键* @return* @throws IOException*/public static Result getRow(String tableName, String rowKey) throws IOException {Result result = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {Get get = new Get(Bytes.toBytes(rowKey));//get.setMaxVersions(); // 显示所有版本//get.setTimeStamp(); // 显示指定时间戳的版本result = hTable.get(get);/*for (Cell cell : result.rawCells()) {System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");System.out.print("Timestamp:" + cell.getTimestamp());System.out.println();}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return result;}/*** 获取多行记录(get)* @param tableName 表名* @param rows 行键(可扩展参数)* @param <T>* @return* @throws IOException*/public static <T> Result[] getRows(String tableName, List<T> rows) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));List<Get> gets = null;Result[] results = null;try {if (hTable != null) {gets = new ArrayList<Get>();for (T row : rows) {if (row != null) {gets.add(new Get(Bytes.toBytes(String.valueOf(row))));} else {throw new RuntimeException("hbase have no data");}}}if (gets.size() > 0) {results = hTable.get(gets);}} catch (IOException e) {log.error("getRows failure !", e);} finally {try {hTable.close();} catch (IOException e) {log.error("table.close() failure !", e);}}return results;}/*** 根据限定符获取单行记录(get)* get "t1","1001","cf1:name"* @param tableName 表名* @param rowKey 行键* @param family 列簇* @param qualifier 限定符* @throws IOException*/public static Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {Result result = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));result = hTable.get(get);/*for (Cell cell : result.rawCells()) {System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");System.out.print("Timestamp:" + cell.getTimestamp());System.out.println();}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return result;}/*** 删除多行数据(DML)* @param tableName 表名* @param rows 行键(可扩展参数)* @throws IOException*/public static void deleteMultiRow(String tableName, String... rows) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {List<Delete> deleteList = new ArrayList<>();for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));deleteList.add(delete);}hTable.delete(deleteList);} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}}/*** 删除表(DDL)* @param tableName 表名* @throws IOException*/public static void dropTable(String tableName) throws IOException {if (!isTableExist(tableName)) {log.error("Table:" + tableName + " not exist!");return;}Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {if (isTableExist(tableName)) {admin.disableTable(tableName);admin.deleteTable(tableName);log.info("Table:" + tableName + " delete successfully!");}} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}}
3. 测试
■ 测试类(HBaseBaseUtilTests.java)
import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.util.Bytes;import org.junit.*;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class HBaseBaseUtilTests {private String tableName="t1";private String rowKey="1001";/** BeforeClass:会在所有方法被调用前被执行,* 而且该方法是静态的,所有当测试类被加载后接着就会运行它,* 而且在内存中它只会存在一份实例,它比较适合加载配置文件**/@BeforeClasspublic static void setUpBeforeClass() {System.out.println("this is @BeforeClass ...");HBaseUtil.getConnection();}/** AfterClass:通常用来对资源的清理,如关闭数据库的连接 **/@AfterClasspublic static void tearDownAfterClass() {System.out.println("this is @AfterClass ...");HBaseUtil.closeConnect(HBaseUtil.getConnection());}/** Before:每个测试方法调用前执行一次 **/@Beforepublic void setUp() {System.out.println("this is @Before ...");}/** Before:每个测试方法调用后执行一次 **/@Afterpublic void tearDown() {System.out.println("this is @After ...");}@Testpublic void createTable() throws IOException {HBaseUtil.createTable(tableName, "cf1", "cf2");assert HBaseUtil.isTableExist(tableName);}@Testpublic void isTableExist() throws IOException {assert HBaseUtil.isTableExist(tableName);}@Testpublic void addRowData() throws IOException {String value="tom";HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void getAllRows() throws IOException {ResultScanner results =HBaseUtil.getAllRows(tableName);Assert.assertNotNull(results);for (Result result : results) {Assert.assertNotNull(result);}}@Testpublic void getRow() throws IOException {String value="tom";HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);Result result=HBaseUtil.getRow(tableName,rowKey);Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void getRows() throws IOException {String rowKey1="1003";String rowKey2="1004";String value1="tom1";String value2="tom2";HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);List<String> rows=new ArrayList<>();rows.add(rowKey1);rows.add(rowKey2);Result[] results=HBaseUtil.getRows(tableName,rows);Assert.assertNotNull(results);Assert.assertTrue(results.length==2);}@Testpublic void getRowQualifier() throws IOException {String value="tom";HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void deleteMultiRow() throws IOException {String rowKey1="1003";String rowKey2="1004";String value1="tom1";String value2="tom2";HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);HBaseUtil.deleteMultiRow(tableName, rowKey1, rowKey2);List<String> rows=new ArrayList<>();rows.add(rowKey1);rows.add(rowKey2);Result[] results=HBaseUtil.getRows(tableName,rows);Assert.assertNotNull(results);for (Result result : results) {Cell[] cells=result.rawCells();Assert.assertTrue(cells.length==0);}}@Testpublic void dropTable() throws IOException {assert HBaseUtil.isTableExist(tableName);HBaseUtil.dropTable(tableName);assert !HBaseUtil.isTableExist(tableName);}}
4. 完整示例代码
源码:hbase-springboot-example01-src.zip
方式2:SpringBoot + HBaseService
1. 添加pom.xml依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.5</version></dependency>
2. 编写关键类
■ 常量类(Constant.java)
public class Constant {/** HBase配置 **/public static class HBaseConfig {public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";public final static String ZK_PORT="2181";public final static String ZK_PATH="/hbase";}}
■ 配置类(HBaseConfig.java)
import com.lonton.bigdata.constant.Constant;import com.lonton.bigdata.service.HBaseService;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HConstants;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class HBaseConfig {@Beanpublic HBaseService getHBaseService() {// 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到//System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");// 执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xmlorg.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();// 使用HBaseConfiguration的单例方法实例化// HBase使用ZooKeeper的地址conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);// ZooKeeper客户端端口conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);// ZooKeeper存储HBase信息的路径conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);// 所有客户端的默认scan缓存时间, 默认100conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");// 失败重试时等待时间, 默认100conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");// 一次RPC请求的超时时间, 默认60000conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");// 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");// 重试次数3次,默认31conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");return new HBaseService(conf);}}
■ 服务类(HBaseService.java)
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class HBaseService {private static final Logger log = LoggerFactory.getLogger(HBaseService.class);public static Configuration conf;private static Connection conn;public HBaseService(Configuration conf){try {if (conn == null || conn.isClosed()) {conn = ConnectionFactory.createConnection(conf);}} catch (IOException e) {log.error("HBase connect failure", e);}}/*** 获得链接** @return*/public Connection getConnection() {try {if (conn == null || conn.isClosed()) {conn = ConnectionFactory.createConnection(conf);}} catch (IOException e) {log.error("HBase connect failure", e);}return conn;}/*** 关闭连接** @throws IOException*/public void closeConnect(Connection conn) {if (null != conn) {try {conn.close();} catch (Exception e) {log.error("closeConnect failure !", e);}}}/*** 判断表是否存在(DDL)* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}** @param tableName 表名* @return 是否创建成功* @throws IOException*/public boolean isTableExist(String tableName) throws IOException {boolean result = false;Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {result = admin.tableExists(tableName);} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}return result;}/*** 创建表(DDL)* create 't1', 'cf1', 'cf2'** @param tableName 表名* @param columnFamily 列簇* @throws IOException*/public void createTable(String tableName, String... columnFamily) throws IOException {// 判断表是否存在if (isTableExist(tableName)) {log.error("Table:" + tableName + " already exists!");return;}Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {// 创建表属性对象,表名需要转字节HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));// 创建多个列族for (String cf : columnFamily) {descriptor.addFamily(new HColumnDescriptor(cf));}// 根据对表的配置,创建表admin.createTable(descriptor);log.info("Table:" + tableName + " create successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 新增记录(DML)* put 't1', '1001', 'cf1:name', 'zhangsan'* put 't1', '1001', 'cf1:age', '23'** @param tableName 表名* @param rowKey 行键* @param columnFamily 列簇* @param column 列* @param value 值* @throws IOException*/public void addRowData(String tableName, String rowKey, String columnFamily, Stringcolumn, String value) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {// 向表中插入数据Put put = new Put(Bytes.toBytes(rowKey));// 向Put对象中组装数据put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));hTable.put(put);log.info("insert successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}}/*** 全表扫描(Scan)* scan "t1"** @param tableName 表名* @return* @throws IOException*/public ResultScanner getAllRows(String tableName) throws IOException {ResultScanner results = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {// 得到用于扫描region的对象Scan scan = new Scan();// setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。scan.setCaching(1000);// 使用HTable得到resultcanner实现类的对象results = hTable.getScanner(scan);/*for (Result result : results) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));System.out.println();}}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return results;}/*** 获取单行记录(get)* get "t1","1001"** @param tableName 表名* @param rowKey 行键* @return* @throws IOException*/public Result getRow(String tableName, String rowKey) throws IOException {Result result = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {Get get = new Get(Bytes.toBytes(rowKey));//get.setMaxVersions(); // 显示所有版本//get.setTimeStamp(); // 显示指定时间戳的版本result = hTable.get(get);/*for (Cell cell : result.rawCells()) {System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");System.out.print("Timestamp:" + cell.getTimestamp());System.out.println();}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return result;}/*** 获取多行记录(get)** @param tableName 表名* @param rows 行键(可扩展参数)* @param <T>* @return* @throws IOException*/public <T> Result[] getRows(String tableName, List<T> rows) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));List<Get> gets = null;Result[] results = null;try {if (hTable != null) {gets = new ArrayList<Get>();for (T row : rows) {if (row != null) {gets.add(new Get(Bytes.toBytes(String.valueOf(row))));} else {throw new RuntimeException("hbase have no data");}}}if (gets.size() > 0) {results = hTable.get(gets);}} catch (IOException e) {log.error("getRows failure !", e);} finally {try {hTable.close();} catch (IOException e) {log.error("table.close() failure !", e);}}return results;}/*** 根据限定符获取单行记录(get)* get "t1","1001","cf1:name"** @param tableName 表名* @param rowKey 行键* @param family 列簇* @param qualifier 限定符* @throws IOException*/public Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {Result result = null;Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));result = hTable.get(get);/*for (Cell cell : result.rawCells()) {System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");System.out.print("Timestamp:" + cell.getTimestamp());System.out.println();}*/} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}return result;}/*** 删除多行数据(DML)** @param tableName 表名* @param rows 行键(可扩展参数)* @throws IOException*/public void deleteMultiRow(String tableName, String... rows) throws IOException {Connection conn = getConnection();HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));try {List<Delete> deleteList = new ArrayList<>();for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));deleteList.add(delete);}hTable.delete(deleteList);} catch (Exception e) {log.error(e.getMessage());} finally {hTable.close();}}/*** 删除表(DDL)** @param tableName 表名* @throws IOException*/public void dropTable(String tableName) throws IOException {if (!isTableExist(tableName)) {log.error("Table:" + tableName + " not exist!");return;}Connection conn = getConnection();HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();try {if (isTableExist(tableName)) {admin.disableTable(tableName);admin.deleteTable(tableName);log.info("Table:" + tableName + " delete successfully!");}} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}}
3. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import com.lonton.bigdata.service.HBaseService;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Assert;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import java.io.IOException;import java.util.ArrayList;import java.util.List;@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclass HBaseServiceApplicationTests {private String tableName="t1";private String rowKey="1001";@Resourceprivate HBaseService service;@Testpublic void isTableExist() throws IOException {assert service.isTableExist(tableName);}@Testpublic void addRowData() throws IOException {String value="tom";service.addRowData(tableName, rowKey, "cf1", "name", value);Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void getAllRows() throws IOException {ResultScanner results =service.getAllRows(tableName);Assert.assertNotNull(results);for (Result result : results) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));System.out.println();}Assert.assertNotNull(result);}}@Testpublic void getRow() throws IOException {String value="tom";service.addRowData(tableName, rowKey, "cf1", "name", value);Result result=service.getRow(tableName,rowKey);Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void getRows() throws IOException {String rowKey1="1003";String rowKey2="1004";String value1="tom1";String value2="tom2";service.addRowData(tableName, rowKey1, "cf1", "name", value1);service.addRowData(tableName, rowKey2, "cf1", "name", value2);List<String> rows=new ArrayList<>();rows.add(rowKey1);rows.add(rowKey2);Result[] results=service.getRows(tableName,rows);Assert.assertNotNull(results);Assert.assertTrue(results.length==2);}@Testpublic void getRowQualifier() throws IOException {String value="tom";service.addRowData(tableName, rowKey, "cf1", "name", value);Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");Cell[] cells=result.rawCells();Assert.assertNotNull(cells);Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));}@Testpublic void deleteMultiRow() throws IOException {String rowKey1="1003";String rowKey2="1004";String value1="tom1";String value2="tom2";service.addRowData(tableName, rowKey1, "cf1", "name", value1);service.addRowData(tableName, rowKey2, "cf1", "name", value2);service.deleteMultiRow(tableName, rowKey1, rowKey2);List<String> rows=new ArrayList<>();rows.add(rowKey1);rows.add(rowKey2);Result[] results=service.getRows(tableName,rows);Assert.assertNotNull(results);for (Result result : results) {Cell[] cells=result.rawCells();Assert.assertTrue(cells.length==0);}}@Testpublic void dropTable() throws IOException {assert service.isTableExist(tableName);service.dropTable(tableName);assert !service.isTableExist(tableName);}}
4. 完整示例代码
源码:hbase-springboot-example02-src.zip
方式3:SpringBoot + HbaseTemplate
1. 添加pom.xml依赖
<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-hadoop</artifactId><version>2.5.0.RELEASE</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.5</version></dependency>
2. 工程配置
■ SpringBoot配置(application.yml)
hbase:config:hbase.zookeeper.quorum: 192.168.0.101,192.168.0.102,192.168.0.103hbase.zookeeper.property.clientPort: 2181zookeeper.znode.parent: /hbasehbase.client.scanner.caching: 500hbase.client.pause: 50hbase.rpc.timeout: 2000hbase.client.operation.timeout: 3000hbase.client.retries.number: 3hbase.rootdir: hdfs://bigdata-node1:9000/hbase
3. 编写关键类
■ 配置映射类(HBaseProperties.java)
import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.Map;@ConfigurationProperties(prefix = "hbase")public class HBaseProperties {private Map<String, String> config;public Map<String, String> getConfig() {return config;}public void setConfig(Map<String, String> config) {this.config = config;}}
■ 配置类(HBaseConfig.java)
import org.apache.hadoop.hbase.HBaseConfiguration;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.hadoop.hbase.HbaseTemplate;import java.util.Map;import java.util.Set;@Configuration@EnableConfigurationProperties(HBaseProperties.class)public class HBaseConfig {private final HBaseProperties properties;public HBaseConfig(HBaseProperties properties) {this.properties = properties;}@Beanpublic HbaseTemplate hbaseTemplate() {HbaseTemplate hbaseTemplate = new HbaseTemplate();hbaseTemplate.setConfiguration(configuration());hbaseTemplate.setAutoFlush(true);return hbaseTemplate;}public org.apache.hadoop.conf.Configuration configuration() {org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();Map<String, String> config = properties.getConfig();Set<String> keySet = config.keySet();for (String key : keySet) {configuration.set(key, config.get(key));}return configuration;}}
其他方式:
@Configurationpublic class HBaseConfig {@Value("${hbase.zookeeper.quorum}")private String zookeeperQuorum;@Value("${hbase.zookeeper.property.clientPort}")private String clientPort;@Value("${zookeeper.znode.parent}")private String znodeParent;@Beanpublic HbaseTemplate hbaseTemplate() {org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("hbase.zookeeper.quorum", zookeeperQuorum);conf.set("hbase.zookeeper.property.clientPort", clientPort);conf.set("zookeeper.znode.parent", znodeParent);return new HbaseTemplate(conf);}}
■ 服务类(HBaseService.java)
import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.hadoop.hbase.HbaseTemplate;import org.springframework.data.hadoop.hbase.RowMapper;import org.springframework.data.hadoop.hbase.TableCallback;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;@Servicepublic class HBaseService {private static final Logger log = LoggerFactory.getLogger(HBaseService.class);@Autowiredprivate HbaseTemplate hbaseTemplate;/*** 判断表是否存在(DDL)* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}** @param tableName 表名* @return 是否创建成功* @throws IOException*/public boolean isTableExist(String tableName) throws IOException {boolean result = false;HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {result = admin.tableExists(tableName);} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}return result;}/*** 创建表(DDL)* create 't1', 'cf1', 'cf2'** @param tableName 表名* @param columnFamily 列簇* @throws IOException*/public void createTable(String tableName, String... columnFamily) throws IOException {// 判断表是否存在if (isTableExist(tableName)) {log.error("Table:" + tableName + " already exists!");return;}HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {// 创建表属性对象,表名需要转字节HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));// 创建多个列族for (String cf : columnFamily) {descriptor.addFamily(new HColumnDescriptor(cf));}// 根据对表的配置,创建表admin.createTable(descriptor);log.info("Table:" + tableName + " create successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 新增记录(DML)* put 't1', '1001', 'cf1:name', 'zhangsan'* put 't1', '1001', 'cf1:age', '23'** @param tableName 表名* @param rowKey 行键* @param columnFamily 列簇* @param column 列* @param value 值*/public void addRowData(String tableName, String rowKey, String columnFamily, Stringcolumn, String value) {hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));}/*** 全表扫描(Scan)* scan 't1'* [* {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},* {cf2={name=lisi, age=24}}* ]** @param tableName 表名* @return*/public List<Map<String, Map<String, String>>> getAllRows(String tableName) {Scan scan = new Scan();List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {Cell[] cells = result.rawCells();Map<String, Map<String, String>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, String> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);data.put(columnFamily, obj);}return data;});return list;}/*** 根据RowKey获取单行记录(get)* get 't1','1001'* {cf1_name=tom, cf1_age=23}** @param tableName 表名* @param rowKey 行键* @return*/public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {Cell[] cells = result.rawCells();Map<String, Map<String, Object>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, Object> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);data.put(columnFamily, obj);}return data;}});}/*** 获取多行记录(get)** @param tableName 表名* @param rowKeys 行键(可扩展参数)* @return*/public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {List<Map<String, Map<String, Object>>> rows = new ArrayList<>();for (String rowKey : rowKeys) {Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {Cell[] cells = result.rawCells();Map<String, Map<String, Object>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, Object> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);obj.put("rowKey", rowKey);data.put(columnFamily, obj);}return data;}});rows.add(row);}return rows;}/*** 根据限定符获取指定数据(get)** @param tableName 表名* @param rowName 行键* @param familyName 列簇* @param qualifier 限定符* @return*/public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {List<Cell> ceList = result.listCells();String res = "";if (ceList != null && ceList.size() > 0) {for (Cell cell : ceList) {res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}return res;});}/*** 根据StartKey和EndKey获取数据(scan),前闭后开区间** @param tableName 表名* @param startRow 起始rowKey* @param stopRow 结束rowKey* @return*/public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {Scan scan = new Scan();if (startRow == null) {startRow = "";}if (stopRow == null) {stopRow = "";} else {stopRow += "|";}scan.setStartRow(Bytes.toBytes(startRow));scan.setStopRow(Bytes.toBytes(stopRow));/* PageFilter filter = new PageFilter(5);scan.setFilter(filter);*/return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {List<Cell> ceList = result.listCells();Map<String, Map<String, Object>> data = new HashMap<>(16);if (ceList != null && ceList.size() > 0) {for (Cell cell : ceList) {String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());Map<String, Object> obj = data.get(family);if (null == obj) {obj = new HashMap<>(16);}obj.put("rowKey", rowKey);obj.put(qualifier, value);data.put(family, obj);}}return data;}});}/*** 删除列簇(DML)** @param tableName 表名* @param rowKey 行键* @param familyName 列簇*/public void deleteRow(String tableName, String rowKey, String familyName) {hbaseTemplate.delete(tableName, rowKey, familyName);}/*** 删除限定符(DML)** @param tableName 表名* @param rowKey 行键* @param familyName 列簇* @param qualifier 限定符*/public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);}/*** 删除多行数据(DML)** @param tableName 表名* @param familyNames 列簇* @param rowKeys 行键(可扩展参数)*/public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {for (String rowKey : rowKeys) {for (String familyName : familyNames) {hbaseTemplate.delete(tableName, rowKey, familyName);}}}/*** 根据rowKey删除记录(DML)** @param tableName 表名* @param rowKey 行键*/public void deleteAll(String tableName, final String rowKey) {hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}});}/*** 删除表(DDL)** @param tableName 表名* @throws IOException*/public void dropTable(String tableName) throws IOException {if (!isTableExist(tableName)) {log.error("Table:" + tableName + " not exist!");return;}HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {if (isTableExist(tableName)) {admin.disableTable(tableName);admin.deleteTable(tableName);log.info("Table:" + tableName + " delete successfully!");}} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 通用方法(一般可用于更新操作,如:delete、put)示例:deleteAllTableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};示例:putTableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};* @param tableName* @param action*/public void execute(String tableName, TableCallback action) {hbaseTemplate.execute(tableName, action);}}
4. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Assert;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.hadoop.hbase.TableCallback;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclass HBaseServiceApplicationTests {private String tableName = "t1";private String rowKey = "1001";private String rowKey2 = "1002";private String rowKey3 = "1003";private String CF_01 = "cf1";private String CF1_QUALIFIER_NAME = "name";private String CF1_QUALIFIER_AGE = "age";private String CF1_NAME_VALUE = "tom";private String CF1_AGE_VALUE = "23";private String CF_02 = "cf2";private String CF2_QUALIFIER_ADDR = "address";private String CF2_ADDR_VALUE = "beijing";@Autowiredprivate HBaseService service;@Testpublic void isTableExist() throws IOException {assert service.isTableExist(tableName);}@Testpublic void createTable() throws IOException {service.createTable(tableName, CF_01, CF_02);assert service.isTableExist(tableName);}@Testpublic void addRowData() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));}@Testpublic void getAllRows() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));System.out.println(rows);}@Testpublic void getRowByRowKey() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);Assert.assertNotNull(row);Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));System.out.println(row);}@Testpublic void getRows() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));System.out.println(rows);}@Testpublic void getRowQualifier() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));}@Testpublic void getRowsByKey() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));System.out.println(rows);}@Testpublic void deleteQualifier() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));}@Testpublic void deleteRow() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);service.deleteRow(tableName, rowKey, CF_01);service.deleteRow(tableName, rowKey, CF_02);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void deleteMultiRow() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);System.out.println("----------------------- test 1 -------------------------");System.out.println(rows);String[] familyNames = {CF_01, CF_02};service.deleteRows(tableName, familyNames, rowKey2, rowKey3);rows = service.getRows(tableName, rowKey2, rowKey3);System.out.println("----------------------- test 2 -------------------------");System.out.println(rows); // [{}, {}]Assert.assertTrue(rows.get(0).isEmpty());}@Testpublic void deleteAll() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);service.deleteAll(tableName, rowKey);row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void dropTable() throws IOException {if (!service.isTableExist(tableName)) {service.createTable(tableName);}assert service.isTableExist(tableName);service.dropTable(tableName);assert !service.isTableExist(tableName);}@Testpublic void execute_deleteAll() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);TableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void execute_put() {TableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));}}
5. 完整示例代码
源码:hbase-springboot-example03-src.zip
方式4:SpringBoot + HbaseTemplate(Spring配置)
1. 添加pom.xml依赖
<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-hadoop</artifactId><version>2.5.0.RELEASE</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.5</version></dependency>
2. 工程配置
■ 参数配置(resources/config/hbase.properties)
hbase.zk.host=192.168.0.101,192.168.0.102,192.168.0.103hbase.zk.port=2181
■ Spring配置(resources/config/hbase-spring.xml)
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:hdp="http://www.springframework.org/schema/hadoop"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"><context:property-placeholder location="classpath:/config/hbase.properties"/><hdp:configuration id="hadoopConfiguration">fs.default.name=hdfs://bigdata-node1:9000</hdp:configuration><!-- HA --><!--<hdp:configuration id="hadoopConfiguration">fs.defaultFS=hdfs://bigdata-node1:9000</hdp:configuration>--><hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}" delete-connection="false" /><bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate"><property name="configuration" ref="hbaseConfiguration"/></bean></beans>
3. 编写关键类
■ 启动类(Application.java)
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.ImportResource;@SpringBootApplication@ImportResource(locations = {"classpath:/config/hbase-spring.xml"})public class Application {public static void main(String[] args) {// 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到//System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");SpringApplication.run(Application.class, args);}}
■ 服务类(HBaseService.java)
import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.hadoop.hbase.HbaseTemplate;import org.springframework.data.hadoop.hbase.RowMapper;import org.springframework.data.hadoop.hbase.TableCallback;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;@Servicepublic class HBaseService {private static final Logger log = LoggerFactory.getLogger(HBaseService.class);@Autowiredprivate HbaseTemplate hbaseTemplate;/*** 判断表是否存在(DDL)* create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}** @param tableName 表名* @return 是否创建成功* @throws IOException*/public boolean isTableExist(String tableName) throws IOException {boolean result = false;HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {result = admin.tableExists(tableName);} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}return result;}/*** 创建表(DDL)* create 't1', 'cf1', 'cf2'** @param tableName 表名* @param columnFamily 列簇* @throws IOException*/public void createTable(String tableName, String... columnFamily) throws IOException {// 判断表是否存在if (isTableExist(tableName)) {log.error("Table:" + tableName + " already exists!");return;}HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {// 创建表属性对象,表名需要转字节HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));// 创建多个列族for (String cf : columnFamily) {descriptor.addFamily(new HColumnDescriptor(cf));}// 根据对表的配置,创建表admin.createTable(descriptor);log.info("Table:" + tableName + " create successfully!");} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 新增记录(DML)* put 't1', '1001', 'cf1:name', 'zhangsan'* put 't1', '1001', 'cf1:age', '23'** @param tableName 表名* @param rowKey 行键* @param columnFamily 列簇* @param column 列* @param value 值*/public void addRowData(String tableName, String rowKey, String columnFamily, Stringcolumn, String value) {hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));}/*** 全表扫描(Scan)* scan 't1'* [* {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},* {cf2={name=lisi, age=24}}* ]** @param tableName 表名* @return*/public List<Map<String, Map<String, String>>> getAllRows(String tableName) {Scan scan = new Scan();List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {Cell[] cells = result.rawCells();Map<String, Map<String, String>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, String> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);data.put(columnFamily, obj);}return data;});return list;}/*** 根据RowKey获取单行记录(get)* get 't1','1001'* {cf1_name=tom, cf1_age=23}** @param tableName 表名* @param rowKey 行键* @return*/public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {Cell[] cells = result.rawCells();Map<String, Map<String, Object>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, Object> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);data.put(columnFamily, obj);}return data;}});}/*** 获取多行记录(get)** @param tableName 表名* @param rowKeys 行键(可扩展参数)* @return*/public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {List<Map<String, Map<String, Object>>> rows = new ArrayList<>();for (String rowKey : rowKeys) {Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {Cell[] cells = result.rawCells();Map<String, Map<String, Object>> data = new HashMap<>(16);for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));Map<String, Object> obj = data.get(columnFamily);if (null == obj) {obj = new HashMap<>(16);}obj.put(rowName, value);obj.put("rowKey", rowKey);data.put(columnFamily, obj);}return data;}});rows.add(row);}return rows;}/*** 根据限定符获取指定数据(get)** @param tableName 表名* @param rowName 行键* @param familyName 列簇* @param qualifier 限定符* @return*/public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {List<Cell> ceList = result.listCells();String res = "";if (ceList != null && ceList.size() > 0) {for (Cell cell : ceList) {res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}return res;});}/*** 根据StartKey和EndKey获取数据(scan),前闭后开区间** @param tableName 表名* @param startRow 起始rowKey* @param stopRow 结束rowKey* @return*/public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {Scan scan = new Scan();if (startRow == null) {startRow = "";}if (stopRow == null) {stopRow = "";} else {stopRow += "|";}scan.setStartRow(Bytes.toBytes(startRow));scan.setStopRow(Bytes.toBytes(stopRow));/* PageFilter filter = new PageFilter(5);scan.setFilter(filter);*/return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {@Overridepublic Map<String, Map<String, Object>> mapRow(Result result, int i) {List<Cell> ceList = result.listCells();Map<String, Map<String, Object>> data = new HashMap<>(16);if (ceList != null && ceList.size() > 0) {for (Cell cell : ceList) {String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());Map<String, Object> obj = data.get(family);if (null == obj) {obj = new HashMap<>(16);}obj.put("rowKey", rowKey);obj.put(qualifier, value);data.put(family, obj);}}return data;}});}/*** 删除列簇(DML)** @param tableName 表名* @param rowKey 行键* @param familyName 列簇*/public void deleteRow(String tableName, String rowKey, String familyName) {hbaseTemplate.delete(tableName, rowKey, familyName);}/*** 删除限定符(DML)** @param tableName 表名* @param rowKey 行键* @param familyName 列簇* @param qualifier 限定符*/public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);}/*** 删除多行数据(DML)** @param tableName 表名* @param familyNames 列簇* @param rowKeys 行键(可扩展参数)*/public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {for (String rowKey : rowKeys) {for (String familyName : familyNames) {hbaseTemplate.delete(tableName, rowKey, familyName);}}}/*** 根据rowKey删除记录(DML)** @param tableName 表名* @param rowKey 行键*/public void deleteAll(String tableName, final String rowKey) {hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}});}/*** 删除表(DDL)** @param tableName 表名* @throws IOException*/public void dropTable(String tableName) throws IOException {if (!isTableExist(tableName)) {log.error("Table:" + tableName + " not exist!");return;}HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());try {if (isTableExist(tableName)) {admin.disableTable(tableName);admin.deleteTable(tableName);log.info("Table:" + tableName + " delete successfully!");}} catch (Exception e) {log.error(e.getMessage());} finally {admin.close();}}/*** 通用方法(一般可用于更新操作,如:delete、put)示例:deleteAllTableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};示例:putTableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};* @param tableName* @param action*/public void execute(String tableName, TableCallback action) {hbaseTemplate.execute(tableName, action);}}
4. 测试
■ 测试类(HBaseServiceApplicationTests.java)
import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Assert;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.hadoop.hbase.TableCallback;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclass HBaseServiceApplicationTests {private String tableName = "t1";private String rowKey = "1001";private String rowKey2 = "1002";private String rowKey3 = "1003";private String CF_01 = "cf1";private String CF1_QUALIFIER_NAME = "name";private String CF1_QUALIFIER_AGE = "age";private String CF1_NAME_VALUE = "tom";private String CF1_AGE_VALUE = "23";private String CF_02 = "cf2";private String CF2_QUALIFIER_ADDR = "address";private String CF2_ADDR_VALUE = "beijing";@Autowiredprivate HBaseService service;@Testpublic void isTableExist() throws IOException {assert service.isTableExist(tableName);}@Testpublic void createTable() throws IOException {service.createTable(tableName, CF_01, CF_02);assert service.isTableExist(tableName);}@Testpublic void addRowData() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));}@Testpublic void getAllRows() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));System.out.println(rows);}@Testpublic void getRowByRowKey() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);Assert.assertNotNull(row);Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));System.out.println(row);}@Testpublic void getRows() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));System.out.println(rows);}@Testpublic void getRowQualifier() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));}@Testpublic void getRowsByKey() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));System.out.println(rows);}@Testpublic void deleteQualifier() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));}@Testpublic void deleteRow() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);service.deleteRow(tableName, rowKey, CF_01);service.deleteRow(tableName, rowKey, CF_02);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void deleteMultiRow() {service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);System.out.println("----------------------- test 1 -------------------------");System.out.println(rows);String[] familyNames = {CF_01, CF_02};service.deleteRows(tableName, familyNames, rowKey2, rowKey3);rows = service.getRows(tableName, rowKey2, rowKey3);System.out.println("----------------------- test 2 -------------------------");System.out.println(rows); // [{}, {}]Assert.assertTrue(rows.get(0).isEmpty());}@Testpublic void deleteAll() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);service.deleteAll(tableName, rowKey);row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void dropTable() throws IOException {if (!service.isTableExist(tableName)) {service.createTable(tableName);}assert service.isTableExist(tableName);service.dropTable(tableName);assert !service.isTableExist(tableName);}@Testpublic void execute_deleteAll() {service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);TableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);row = service.getRowByRowKey(tableName, rowKey);System.out.println(row);Assert.assertTrue(row.isEmpty());}@Testpublic void execute_put() {TableCallback action = new TableCallback<Boolean>() {public Boolean doInTable(HTableInterface table) {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));}}
■ Runner测试类(HBaseServiceRunnerTest.java)
import com.lonton.bigdata.service.HBaseService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;@Componentpublic class HBaseServiceRunnerTest implements ApplicationRunner {private final static Logger log = LoggerFactory.getLogger(HBaseServiceRunnerTest.class);private String tableName = "t1";@Autowiredprivate HBaseService service;@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("------------------------- HBaseServiceTest ApplicationRunner --------------------------------");log.info(String.valueOf(service.isTableExist(tableName)));}}
5. 完整示例代码
源码:hbase-springboot-example04-src.zip
方式5:SpringBoot + hbase starter
spring-boot-starter-hbase是spring-boot自定义的starter,为hbase的query和更新等操作提供简易的api并集成spring-boot的auto configuration。如果HbaseTemplate操作不满足需求,完全可以使用hbaseTemplate的getConnection()方法,获取连接。进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能**。**
源码:https://github.com/SpringForAll/spring-boot-starter-hbase
示例:https://github.com/JeffLi1993/springboot-learning-example
参考:Spring Boot 2.x:通过spring-boot-starter-hbase集成HBase
1. 添加pom.xml依赖
<!-- Spring Boot HBase 依赖 --><!-- https://mvnrepository.com/artifact/com.spring4all/spring-boot-starter-hbase --><dependency><groupId>com.spring4all</groupId><artifactId>spring-boot-starter-hbase</artifactId><version>1.0.0.RELEASE</version></dependency>
补充:安装spring-boot-starter-hbase组件依赖(因为不在公共仓库,只能自行安装。如果有maven私库,可以考虑安装到私库)
# 下载项目到本地git clone https://github.com/SpringForAll/spring-boot-starter-hbase.gitcd spring-boot-starter-hbase# 安装依赖mvn clean install
源码:spring-boot-starter-hbase-master.zip
2. 工程配置
■ SpringBoot配置(application.properties)
## HBase Configurationspring.data.hbase.quorum=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181spring.data.hbase.rootDir=hdfs://bigdata-node1:9000/hbasespring.data.hbase.nodeParent=/hbase
具体配置项信息如下:
- spring.data.hbase.quorum:指定HBase的zk地址
- spring.data.hbase.rootDir:指定HBase在HDFS上存储的路径
- spring.data.hbase.nodeParent:指定ZK中HBase的根ZNode
3. 编写关键类
■ Service接口(HBaseService.java) ```java import com.spring4all.spring.boot.starter.hbase.api.TableCallback; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import java.util.List;
/**
Description HBase服务接口 */ public interface HBaseService {
/**
- 根据rowKey获取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param
泛型 @return */
T get(String tableName, String rowKey); /**
- 根据rowKey和列簇取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param familyName 列簇
- @param
泛型 @return */
T get(String tableName, String rowKey, String familyName); /**
- 根据限定符获取单行记录,数据由mapper映射 *
- @param tableName 表名
- @param rowKey 行键
- @param familyName 列簇
- @param qualifier 限定符
- @param
泛型 @return */
T get(String tableName, String rowKey, String familyName, String qualifier); /**
- 新增或修改
- 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
- @param tableName 表名
@param mutation 执行put update or delete */ void saveOrUpdate(String tableName, Mutation mutation);
/**
- 批量新增或修改
- 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
- @param tableName 表名
@param mutations 批量执行put update or delete */ void saveOrUpdates(String tableName, List
mutations); /**
- 通用方法(一般可用于更新操作,如:delete、put)
// 执行操作(deleteAll) TableCallback action = new TableCallback() {
}; // 执行操作(put update) TableCallback action = new TableCallback@Overridepublic Boolean doInTable(Table table) throws Throwable {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
() {
};@Overridepublic Boolean doInTable(Table table) throws Throwable {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
- @param tableName 表名
- @param action 执行put update or delete
- @param
泛型 @return */
T execute(String tableName, TableCallback action); /**
- 扫描 *
- @param tableName 表名
- @param scan 扫描对象
- @param
泛型 @return */
List find(String tableName, Scan scan); /**
- 扫描(根据列簇)
- @param tableName
- @param family
- @param
@return */
List find(String tableName, String family); /**
- 扫描(根据限定符)
- @param tableName
- @param family
- @param qualifier
- @param
@return */
List find(String tableName, String family, String qualifier); /**
- 如果HbaseTemplate操作不满足需求,可以使用hbaseTemplate的getConnection()方法,
- 获取连接,进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能
@return */ Connection getConnection();
/**
- 关闭连接
- @param connection */ void closeConnection(Connection connection);
}
<a name="Bi7lI"></a>## 4. 测试**■ DTO定义(Student.java)**```javaimport lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/*** Description DTO-学生实体类* create 'student','base','other'*/@Data@NoArgsConstructor@AllArgsConstructorpublic class Student {private BaseInfo base;private OtherInfo other;@Data@NoArgsConstructor@AllArgsConstructorpublic static class BaseInfo{private String name;private Integer age;@Overridepublic String toString() {return "BaseInfo{" +"name='" + name + '\'' +", age=" + age +'}';}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class OtherInfo{private Integer fv;@Overridepublic String toString() {return "OtherInfo{" +"fv=" + fv +'}';}}@Overridepublic String toString() {return "Student{" +"base=" + base +", other=" + other +'}';}}
■ RowMapper定义(StudentRowMapper.java)
import com.lonton.bigdata.domain.Student;import com.spring4all.spring.boot.starter.hbase.api.RowMapper;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;/*** Description HBase Mapper*/public class StudentRowMapper implements RowMapper<Student> {private static final byte[] BASE_FAMILY = "base".getBytes();private static final byte[] OTHER_FAMILY = "other".getBytes();private static final byte[] NAME = "name".getBytes();private static final byte[] AGE = "age".getBytes();private static final byte[] FV = "fv".getBytes();/*方法1:ok*//*@Overridepublic Student mapRow(Result result, int rowNum) throws Exception {Student.BaseInfo baseInfo = new Student.BaseInfo();Student.OtherInfo otherInfo = new Student.OtherInfo();Cell[] cells = result.rawCells();for (Cell c : cells) {String columnFamily = new String(CellUtil.cloneFamily(c));String rowName = new String(CellUtil.cloneQualifier(c));String value = new String(CellUtil.cloneValue(c));if(value!=null){if(Bytes.toString(BASE_FAMILY).equals(columnFamily)){if(Bytes.toString(NAME).equals(rowName)){baseInfo.setName(value);}else if(Bytes.toString(AGE).equals(rowName)){Integer age=Integer.parseInt(value);baseInfo.setAge(age);}} else if(Bytes.toString(OTHER_FAMILY).equals(columnFamily)){if(Bytes.toString(FV).equals(rowName)){Integer fv=Integer.parseInt(value);otherInfo.setFv(fv);}}}}Student dto = new Student(baseInfo, otherInfo);return dto;}*//*方法2:ok*/@Overridepublic Student mapRow(Result result, int rowNum) throws Exception {Student.BaseInfo baseInfo = new Student.BaseInfo();if (result.containsColumn(BASE_FAMILY, NAME)) {String name = Bytes.toString(result.getValue(BASE_FAMILY, NAME));baseInfo.setName(name);}if (result.containsColumn(BASE_FAMILY, AGE)) {Integer age = Integer.parseInt(Bytes.toString(result.getValue(BASE_FAMILY, AGE)));baseInfo.setAge(age);}Student.OtherInfo otherInfo = new Student.OtherInfo();if (result.containsColumn(OTHER_FAMILY, FV)) {Integer fv = Integer.parseInt(Bytes.toString(result.getValue(OTHER_FAMILY, FV)));otherInfo.setFv(fv);}Student dto = new Student(baseInfo, otherInfo);return dto;}}
■ Service实现类(StudentHBaseServiceImpl.java)
import com.lonton.bigdata.domain.Student;import com.lonton.bigdata.mapper.StudentRowMapper;import com.lonton.bigdata.service.HBaseService;import com.spring4all.spring.boot.starter.hbase.api.HbaseTemplate;import com.spring4all.spring.boot.starter.hbase.api.TableCallback;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Scan;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.List;/*** Description HBase服务接口实现类*/@Servicepublic class StudentHBaseServiceImpl implements HBaseService {@Autowiredprivate HbaseTemplate hbaseTemplate;@Overridepublic Student get(String tableName, String rowKey) {return hbaseTemplate.get(tableName,rowKey,new StudentRowMapper());}@Overridepublic Student get(String tableName, String rowKey, String familyName) {return hbaseTemplate.get(tableName,rowKey,familyName,new StudentRowMapper());}@Overridepublic Student get(String tableName, String rowKey, String familyName, String qualifier) {return hbaseTemplate.get(tableName,rowKey,familyName,qualifier,new StudentRowMapper());}@Overridepublic void saveOrUpdate(String tableName, Mutation mutation) {hbaseTemplate.saveOrUpdate(tableName,mutation);}@Overridepublic void saveOrUpdates(String tableName, List<Mutation> mutations) {hbaseTemplate.saveOrUpdates(tableName,mutations);}@Overridepublic <T> T execute(String tableName, TableCallback<T> action) {return hbaseTemplate.execute(tableName,action);}@Overridepublic List<Student> find(String tableName, Scan scan) {return hbaseTemplate.find(tableName,scan,new StudentRowMapper());}@Overridepublic List<Student> find(String tableName, String family) {return hbaseTemplate.find(tableName,family,new StudentRowMapper());}@Overridepublic List<Student> find(String tableName, String family, String qualifier) {return hbaseTemplate.find(tableName,family,qualifier,new StudentRowMapper());}@Overridepublic Connection getConnection() {return hbaseTemplate.getConnection();}@Overridepublic void closeConnection(Connection connection) {if(connection!=null){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
■ 测试类(HBaseServiceApplicationTests.java)
import com.lonton.bigdata.domain.Student;import com.spring4all.spring.boot.starter.hbase.api.TableCallback;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Assert;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import java.util.ArrayList;import java.util.List;import java.util.Map;/***** @Description HBase单元测试类* create 'student','base','other'* truncate 'student'* put 'student','1001','base:name','zhangsan'* put 'student','1001','base:age',23* put 'student','1001','other:fv',3* scan 'student'*/@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclass HBaseServiceApplicationTests {private String tableName = "student";private String rowKey = "1001";private String CF_BASE = "base";private String CF_OTHER = "other";private String CF_BASE_QUALIFIER_NAME = "name";private String CF_BASE_QUALIFIER_AGE = "age";private String CF_OTHER_QUALIFIER_FV = "fv";private String CF_BASE_VALUE_NAME = "zhangsan";private Integer CF_BASE_VALUE_AGE = 23;private Integer CF_OTHER_VALUE_FV = 3;@Autowiredprivate HBaseService service;/** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=3}} **/@Testpublic void get1() {// 新增测试数据Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 获取记录Student dto=service.get(tableName,rowKey);// 验证System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());}/** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=null}} **/@Testpublic void get2() {// 新增测试数据Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 获取记录Student dto=service.get(tableName,rowKey,CF_BASE);// 验证System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertNull(dto.getOther().getFv());}/** Student{base=BaseInfo{name='zhangsan', age=null}, other=OtherInfo{fv=null}} **/@Testpublic void get3() {// 新增测试数据Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 获取记录Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);// 验证System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertNull(dto.getBase().getAge());Assert.assertNull(dto.getOther().getFv());}@Testpublic void saveOrUpdate() {// 新增测试数据Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 验证Student dto=service.get(tableName,rowKey);System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());}@Testpublic void saveOrUpdates() {List<Mutation> mutations=new ArrayList<>();// 新增${CF_BASE_QUALIFIER_NAME}Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutations.add(mutation);// 新增${CF_BASE_QUALIFIER_AGE}mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutations.add(mutation);// 新增${CF_OTHER_QUALIFIER_FV}mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));mutations.add(mutation);// 批量新增测试数据service.saveOrUpdates(tableName, mutations);// 验证Student dto=service.get(tableName,rowKey);System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());}@Testpublic void execute_deleteAll() {// 新增测试数据Put mutation = new Put(Bytes.toBytes(rowKey));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 获取记录Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);// 验证System.out.println(dto);// 执行操作(deleteAll)TableCallback action = new TableCallback<Boolean>() {@Overridepublic Boolean doInTable(Table table) throws Throwable {boolean flag = false;try {List<Delete> list = new ArrayList<>();Delete d1 = new Delete(rowKey.getBytes());list.add(d1);table.delete(list);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);// 验证dto=service.get(tableName,rowKey);System.out.println(dto);Assert.assertNull(dto.getBase().getName());Assert.assertNull(dto.getBase().getAge());Assert.assertNull(dto.getOther().getFv());}@Testpublic void execute_put() {// 执行操作(put update)TableCallback action = new TableCallback<Boolean>() {@Overridepublic Boolean doInTable(Table table) throws Throwable {boolean flag = false;try {Put put = new Put(rowKey.getBytes());put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));table.put(put);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}};service.execute(tableName, action);// 获取记录Student dto=service.get(tableName,rowKey);// 验证System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());}@Testpublic void find_scan() {// 新增测试数据String startRow=rowKey+"-01";String stopRow=rowKey+"-02";Put mutation = new Put(Bytes.toBytes(startRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);mutation = new Put(Bytes.toBytes(stopRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 范围扫描(scan为空则为全表扫描)Scan scan = new Scan();if (startRow == null) {startRow = "";}if (stopRow == null) {stopRow = "";} else {stopRow += "|";}scan.setStartRow(Bytes.toBytes(startRow));scan.setStopRow(Bytes.toBytes(stopRow));scan.setCaching(5000);List<Student> dtoList=service.find(tableName,scan);// 验证for (Student dto : dtoList) {System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());}}@Testpublic void find_family() {// 新增测试数据String startRow=rowKey+"-01";String stopRow=rowKey+"-02";Put mutation = new Put(Bytes.toBytes(startRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);mutation = new Put(Bytes.toBytes(stopRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 范围扫描List<Student> dtoList=service.find(tableName,CF_BASE);// 验证for (Student dto : dtoList) {System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());Assert.assertNull(dto.getOther().getFv());}}@Testpublic void find_qualifier() {// 新增测试数据String startRow=rowKey+"-01";String stopRow=rowKey+"-02";Put mutation = new Put(Bytes.toBytes(startRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);mutation = new Put(Bytes.toBytes(stopRow));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));service.saveOrUpdate(tableName, mutation);// 范围扫描List<Student> dtoList=service.find(tableName,CF_BASE,CF_BASE_QUALIFIER_NAME);// 验证for (Student dto : dtoList) {System.out.println(dto);Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());Assert.assertNull(dto.getBase().getAge());Assert.assertNull(dto.getOther().getFv());}}@Testpublic void getAndCloseConnection() {Connection connection=service.getConnection();System.out.println(connection);Assert.assertFalse(connection.isClosed());service.closeConnection(connection);Assert.assertTrue(connection.isClosed());}}
■ 控制类(StudentController.java)
import com.lonton.bigdata.domain.Student;import com.lonton.bigdata.service.HBaseService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.*;/*** Description 控制类<br/>* put 'student','1001','base:name','zhangsan'* put 'student','1001','base:age',23* put 'student','1001','other:fv',3* @RestController等价于@Controller + @ResponseBody*/@RestControllerpublic class StudentController {@Autowiredprivate HBaseService service;/** http://localhost:8080/hbaseservice/student/1001 **//*** {"base":{"name":"zhangsan","age":23},"other":{"fv":3}}**/@RequestMapping(value = "/hbaseservice/{tableName}/{rowKey}", method = RequestMethod.GET)public Student getStudent(@PathVariable String tableName, @PathVariable String rowKey) {return service.get(tableName, rowKey);}}
