方式1:SpringBoot + HBaseUtil

1. 添加pom.xml依赖

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>1.2.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-server</artifactId>
  9. <version>1.2.5</version>
  10. </dependency>

2. 编写关键类

■ 常量类(Constant.java)

  1. public class Constant {
  2. /** HBase配置 **/
  3. public static class HBaseConfig {
  4. public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";
  5. public final static String ZK_PORT="2181";
  6. public final static String ZK_PATH="/hbase";
  7. }
  8. }

■ 工具类(HBaseUtil.java)

  1. import com.lonton.bigdata.constant.Constant;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.*;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.io.IOException;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. public class HBaseUtil {
  12. private static final Logger log = LoggerFactory.getLogger(HBaseUtil.class);
  13. public static Configuration conf;
  14. private static Connection conn;
  15. static {
  16. // 使用HBaseConfiguration的单例方法实例化
  17. conf = HBaseConfiguration.create();
  18. // HBase使用ZooKeeper的地址
  19. conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);
  20. // ZooKeeper客户端端口
  21. conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);
  22. // ZooKeeper存储HBase信息的路径
  23. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);
  24. // 所有客户端的默认scan缓存时间, 默认100
  25. conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");
  26. // 失败重试时等待时间, 默认100
  27. conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");
  28. // 一次RPC请求的超时时间, 默认60000
  29. conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");
  30. // 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)
  31. conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");
  32. // 重试次数3次,默认31
  33. conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");
  34. }
  35. /**
  36. * 获得链接
  37. * @return
  38. */
  39. public static synchronized Connection getConnection() {
  40. try {
  41. if (conn == null || conn.isClosed()) {
  42. conn = ConnectionFactory.createConnection(conf);
  43. }
  44. } catch (IOException e) {
  45. log.error("HBase 建立链接失败 ", e);
  46. }
  47. return conn;
  48. }
  49. /**
  50. * 关闭连接
  51. * @throws IOException
  52. */
  53. public static void closeConnect(Connection conn) {
  54. if (null != conn) {
  55. try {
  56. conn.close();
  57. } catch (Exception e) {
  58. log.error("closeConnect failure !", e);
  59. }
  60. }
  61. }
  62. /**
  63. * 判断表是否存在(DDL)
  64. * create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
  65. * @param tableName 表名
  66. * @return 是否创建成功
  67. * @throws IOException
  68. */
  69. public static boolean isTableExist(String tableName) throws IOException {
  70. boolean result = false;
  71. Connection conn = getConnection();
  72. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  73. try {
  74. result = admin.tableExists(tableName);
  75. } catch (Exception e) {
  76. log.error(e.getMessage());
  77. } finally {
  78. admin.close();
  79. }
  80. return result;
  81. }
  82. /**
  83. * 创建表(DDL)
  84. * create 't1', 'cf1', 'cf2'
  85. * @param tableName 表名
  86. * @param columnFamily 列簇
  87. * @throws IOException
  88. */
  89. public static void createTable(String tableName, String... columnFamily) throws IOException {
  90. // 判断表是否存在
  91. if (isTableExist(tableName)) {
  92. log.error("Table:" + tableName + " already exists!");
  93. return;
  94. }
  95. Connection conn = getConnection();
  96. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  97. try {
  98. // 创建表属性对象,表名需要转字节
  99. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  100. // 创建多个列族
  101. for (String cf : columnFamily) {
  102. descriptor.addFamily(new HColumnDescriptor(cf));
  103. }
  104. // 根据对表的配置,创建表
  105. admin.createTable(descriptor);
  106. log.info("Table:" + tableName + " create successfully!");
  107. } catch (Exception e) {
  108. log.error(e.getMessage());
  109. } finally {
  110. admin.close();
  111. }
  112. }
  113. /**
  114. * 新增记录(DML)
  115. * put 't1', '1001', 'cf1:name', 'zhangsan'
  116. * put 't1', '1001', 'cf1:age', '23'
  117. * @param tableName 表名
  118. * @param rowKey 行键
  119. * @param columnFamily 列簇
  120. * @param column 列
  121. * @param value 值
  122. * @throws IOException
  123. */
  124. public static void addRowData(String tableName, String rowKey, String columnFamily, String
  125. column, String value) throws IOException {
  126. Connection conn = getConnection();
  127. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  128. try {
  129. // 向表中插入数据
  130. Put put = new Put(Bytes.toBytes(rowKey));
  131. // 向Put对象中组装数据
  132. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
  133. hTable.put(put);
  134. log.info("insert successfully!");
  135. } catch (Exception e) {
  136. log.error(e.getMessage());
  137. } finally {
  138. hTable.close();
  139. }
  140. }
  141. /**
  142. * 全表扫描(Scan)
  143. * scan "t1"
  144. * @param tableName 表名
  145. * @return
  146. * @throws IOException
  147. */
  148. public static ResultScanner getAllRows(String tableName) throws IOException {
  149. ResultScanner results = null;
  150. Connection conn = getConnection();
  151. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  152. try {
  153. // 得到用于扫描region的对象
  154. Scan scan = new Scan();
  155. // setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
  156. scan.setCaching(1000);
  157. // 使用HTable得到resultcanner实现类的对象
  158. results = hTable.getScanner(scan);
  159. /*for (Result result : results) {
  160. Cell[] cells = result.rawCells();
  161. for (Cell cell : cells) {
  162. System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
  163. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  164. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  165. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  166. System.out.println();
  167. }
  168. }*/
  169. } catch (Exception e) {
  170. log.error(e.getMessage());
  171. } finally {
  172. hTable.close();
  173. }
  174. return results;
  175. }
  176. /**
  177. * 获取单行记录(get)
  178. * get "t1","1001"
  179. * @param tableName 表名
  180. * @param rowKey 行键
  181. * @return
  182. * @throws IOException
  183. */
  184. public static Result getRow(String tableName, String rowKey) throws IOException {
  185. Result result = null;
  186. Connection conn = getConnection();
  187. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  188. try {
  189. Get get = new Get(Bytes.toBytes(rowKey));
  190. //get.setMaxVersions(); // 显示所有版本
  191. //get.setTimeStamp(); // 显示指定时间戳的版本
  192. result = hTable.get(get);
  193. /*for (Cell cell : result.rawCells()) {
  194. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  195. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  196. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  197. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  198. System.out.print("Timestamp:" + cell.getTimestamp());
  199. System.out.println();
  200. }*/
  201. } catch (Exception e) {
  202. log.error(e.getMessage());
  203. } finally {
  204. hTable.close();
  205. }
  206. return result;
  207. }
  208. /**
  209. * 获取多行记录(get)
  210. * @param tableName 表名
  211. * @param rows 行键(可扩展参数)
  212. * @param <T>
  213. * @return
  214. * @throws IOException
  215. */
  216. public static <T> Result[] getRows(String tableName, List<T> rows) throws IOException {
  217. Connection conn = getConnection();
  218. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  219. List<Get> gets = null;
  220. Result[] results = null;
  221. try {
  222. if (hTable != null) {
  223. gets = new ArrayList<Get>();
  224. for (T row : rows) {
  225. if (row != null) {
  226. gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
  227. } else {
  228. throw new RuntimeException("hbase have no data");
  229. }
  230. }
  231. }
  232. if (gets.size() > 0) {
  233. results = hTable.get(gets);
  234. }
  235. } catch (IOException e) {
  236. log.error("getRows failure !", e);
  237. } finally {
  238. try {
  239. hTable.close();
  240. } catch (IOException e) {
  241. log.error("table.close() failure !", e);
  242. }
  243. }
  244. return results;
  245. }
  246. /**
  247. * 根据限定符获取单行记录(get)
  248. * get "t1","1001","cf1:name"
  249. * @param tableName 表名
  250. * @param rowKey 行键
  251. * @param family 列簇
  252. * @param qualifier 限定符
  253. * @throws IOException
  254. */
  255. public static Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
  256. Result result = null;
  257. Connection conn = getConnection();
  258. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  259. try {
  260. Get get = new Get(Bytes.toBytes(rowKey));
  261. get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
  262. result = hTable.get(get);
  263. /*for (Cell cell : result.rawCells()) {
  264. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  265. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  266. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  267. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  268. System.out.print("Timestamp:" + cell.getTimestamp());
  269. System.out.println();
  270. }*/
  271. } catch (Exception e) {
  272. log.error(e.getMessage());
  273. } finally {
  274. hTable.close();
  275. }
  276. return result;
  277. }
  278. /**
  279. * 删除多行数据(DML)
  280. * @param tableName 表名
  281. * @param rows 行键(可扩展参数)
  282. * @throws IOException
  283. */
  284. public static void deleteMultiRow(String tableName, String... rows) throws IOException {
  285. Connection conn = getConnection();
  286. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  287. try {
  288. List<Delete> deleteList = new ArrayList<>();
  289. for (String row : rows) {
  290. Delete delete = new Delete(Bytes.toBytes(row));
  291. deleteList.add(delete);
  292. }
  293. hTable.delete(deleteList);
  294. } catch (Exception e) {
  295. log.error(e.getMessage());
  296. } finally {
  297. hTable.close();
  298. }
  299. }
  300. /**
  301. * 删除表(DDL)
  302. * @param tableName 表名
  303. * @throws IOException
  304. */
  305. public static void dropTable(String tableName) throws IOException {
  306. if (!isTableExist(tableName)) {
  307. log.error("Table:" + tableName + " not exist!");
  308. return;
  309. }
  310. Connection conn = getConnection();
  311. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  312. try {
  313. if (isTableExist(tableName)) {
  314. admin.disableTable(tableName);
  315. admin.deleteTable(tableName);
  316. log.info("Table:" + tableName + " delete successfully!");
  317. }
  318. } catch (Exception e) {
  319. log.error(e.getMessage());
  320. } finally {
  321. admin.close();
  322. }
  323. }
  324. }

3. 测试

■ 测试类(HBaseBaseUtilTests.java)

  1. import org.apache.hadoop.hbase.Cell;
  2. import org.apache.hadoop.hbase.CellUtil;
  3. import org.apache.hadoop.hbase.client.Result;
  4. import org.apache.hadoop.hbase.client.ResultScanner;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.junit.*;
  7. import java.io.IOException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class HBaseBaseUtilTests {
  11. private String tableName="t1";
  12. private String rowKey="1001";
  13. /** BeforeClass:会在所有方法被调用前被执行,
  14. * 而且该方法是静态的,所有当测试类被加载后接着就会运行它,
  15. * 而且在内存中它只会存在一份实例,它比较适合加载配置文件
  16. **/
  17. @BeforeClass
  18. public static void setUpBeforeClass() {
  19. System.out.println("this is @BeforeClass ...");
  20. HBaseUtil.getConnection();
  21. }
  22. /** AfterClass:通常用来对资源的清理,如关闭数据库的连接 **/
  23. @AfterClass
  24. public static void tearDownAfterClass() {
  25. System.out.println("this is @AfterClass ...");
  26. HBaseUtil.closeConnect(HBaseUtil.getConnection());
  27. }
  28. /** Before:每个测试方法调用前执行一次 **/
  29. @Before
  30. public void setUp() {
  31. System.out.println("this is @Before ...");
  32. }
  33. /** Before:每个测试方法调用后执行一次 **/
  34. @After
  35. public void tearDown() {
  36. System.out.println("this is @After ...");
  37. }
  38. @Test
  39. public void createTable() throws IOException {
  40. HBaseUtil.createTable(tableName, "cf1", "cf2");
  41. assert HBaseUtil.isTableExist(tableName);
  42. }
  43. @Test
  44. public void isTableExist() throws IOException {
  45. assert HBaseUtil.isTableExist(tableName);
  46. }
  47. @Test
  48. public void addRowData() throws IOException {
  49. String value="tom";
  50. HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  51. Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
  52. Cell[] cells=result.rawCells();
  53. Assert.assertNotNull(cells);
  54. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  55. }
  56. @Test
  57. public void getAllRows() throws IOException {
  58. ResultScanner results =HBaseUtil.getAllRows(tableName);
  59. Assert.assertNotNull(results);
  60. for (Result result : results) {
  61. Assert.assertNotNull(result);
  62. }
  63. }
  64. @Test
  65. public void getRow() throws IOException {
  66. String value="tom";
  67. HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  68. Result result=HBaseUtil.getRow(tableName,rowKey);
  69. Cell[] cells=result.rawCells();
  70. Assert.assertNotNull(cells);
  71. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  72. }
  73. @Test
  74. public void getRows() throws IOException {
  75. String rowKey1="1003";
  76. String rowKey2="1004";
  77. String value1="tom1";
  78. String value2="tom2";
  79. HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
  80. HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
  81. List<String> rows=new ArrayList<>();
  82. rows.add(rowKey1);
  83. rows.add(rowKey2);
  84. Result[] results=HBaseUtil.getRows(tableName,rows);
  85. Assert.assertNotNull(results);
  86. Assert.assertTrue(results.length==2);
  87. }
  88. @Test
  89. public void getRowQualifier() throws IOException {
  90. String value="tom";
  91. HBaseUtil.addRowData(tableName, rowKey, "cf1", "name", value);
  92. Result result=HBaseUtil.getRowQualifier(tableName, rowKey, "cf1", "name");
  93. Cell[] cells=result.rawCells();
  94. Assert.assertNotNull(cells);
  95. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  96. }
  97. @Test
  98. public void deleteMultiRow() throws IOException {
  99. String rowKey1="1003";
  100. String rowKey2="1004";
  101. String value1="tom1";
  102. String value2="tom2";
  103. HBaseUtil.addRowData(tableName, rowKey1, "cf1", "name", value1);
  104. HBaseUtil.addRowData(tableName, rowKey2, "cf1", "name", value2);
  105. HBaseUtil.deleteMultiRow(tableName, rowKey1, rowKey2);
  106. List<String> rows=new ArrayList<>();
  107. rows.add(rowKey1);
  108. rows.add(rowKey2);
  109. Result[] results=HBaseUtil.getRows(tableName,rows);
  110. Assert.assertNotNull(results);
  111. for (Result result : results) {
  112. Cell[] cells=result.rawCells();
  113. Assert.assertTrue(cells.length==0);
  114. }
  115. }
  116. @Test
  117. public void dropTable() throws IOException {
  118. assert HBaseUtil.isTableExist(tableName);
  119. HBaseUtil.dropTable(tableName);
  120. assert !HBaseUtil.isTableExist(tableName);
  121. }
  122. }

4. 完整示例代码

源码:hbase-springboot-example01-src.zip

方式2:SpringBoot + HBaseService

1. 添加pom.xml依赖

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>1.2.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-server</artifactId>
  9. <version>1.2.5</version>
  10. </dependency>

2. 编写关键类

■ 常量类(Constant.java)

  1. public class Constant {
  2. /** HBase配置 **/
  3. public static class HBaseConfig {
  4. public final static String ZK_HOST="192.168.0.101,192.168.0.102,192.168.0.103";
  5. public final static String ZK_PORT="2181";
  6. public final static String ZK_PATH="/hbase";
  7. }
  8. }

■ 配置类(HBaseConfig.java)

  1. import com.lonton.bigdata.constant.Constant;
  2. import com.lonton.bigdata.service.HBaseService;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.HConstants;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class HBaseConfig {
  9. @Bean
  10. public HBaseService getHBaseService() {
  11. // 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
  12. //System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");
  13. // 执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
  14. org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
  15. // 使用HBaseConfiguration的单例方法实例化
  16. // HBase使用ZooKeeper的地址
  17. conf.set(HConstants.ZOOKEEPER_QUORUM, Constant.HBaseConfig.ZK_HOST);
  18. // ZooKeeper客户端端口
  19. conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Constant.HBaseConfig.ZK_PORT);
  20. // ZooKeeper存储HBase信息的路径
  21. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Constant.HBaseConfig.ZK_PATH);
  22. // 所有客户端的默认scan缓存时间, 默认100
  23. conf.set(HConstants.HBASE_CLIENT_SCANNER_CACHING, "500");
  24. // 失败重试时等待时间, 默认100
  25. conf.set(HConstants.HBASE_CLIENT_PAUSE, "50");
  26. // 一次RPC请求的超时时间, 默认60000
  27. conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "2000");
  28. // 客户端发起一次数据操作直至得到响应之间总的超时时间, 默认Integer.MAX_VALUE(一次阻塞操作)
  29. conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "3000");
  30. // 重试次数3次,默认31
  31. conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "3");
  32. return new HBaseService(conf);
  33. }
  34. }

■ 服务类(HBaseService.java)

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.*;
  3. import org.apache.hadoop.hbase.client.*;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.io.IOException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class HBaseService {
  11. private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
  12. public static Configuration conf;
  13. private static Connection conn;
  14. public HBaseService(Configuration conf){
  15. try {
  16. if (conn == null || conn.isClosed()) {
  17. conn = ConnectionFactory.createConnection(conf);
  18. }
  19. } catch (IOException e) {
  20. log.error("HBase connect failure", e);
  21. }
  22. }
  23. /**
  24. * 获得链接
  25. *
  26. * @return
  27. */
  28. public Connection getConnection() {
  29. try {
  30. if (conn == null || conn.isClosed()) {
  31. conn = ConnectionFactory.createConnection(conf);
  32. }
  33. } catch (IOException e) {
  34. log.error("HBase connect failure", e);
  35. }
  36. return conn;
  37. }
  38. /**
  39. * 关闭连接
  40. *
  41. * @throws IOException
  42. */
  43. public void closeConnect(Connection conn) {
  44. if (null != conn) {
  45. try {
  46. conn.close();
  47. } catch (Exception e) {
  48. log.error("closeConnect failure !", e);
  49. }
  50. }
  51. }
  52. /**
  53. * 判断表是否存在(DDL)
  54. * create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
  55. *
  56. * @param tableName 表名
  57. * @return 是否创建成功
  58. * @throws IOException
  59. */
  60. public boolean isTableExist(String tableName) throws IOException {
  61. boolean result = false;
  62. Connection conn = getConnection();
  63. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  64. try {
  65. result = admin.tableExists(tableName);
  66. } catch (Exception e) {
  67. log.error(e.getMessage());
  68. } finally {
  69. admin.close();
  70. }
  71. return result;
  72. }
  73. /**
  74. * 创建表(DDL)
  75. * create 't1', 'cf1', 'cf2'
  76. *
  77. * @param tableName 表名
  78. * @param columnFamily 列簇
  79. * @throws IOException
  80. */
  81. public void createTable(String tableName, String... columnFamily) throws IOException {
  82. // 判断表是否存在
  83. if (isTableExist(tableName)) {
  84. log.error("Table:" + tableName + " already exists!");
  85. return;
  86. }
  87. Connection conn = getConnection();
  88. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  89. try {
  90. // 创建表属性对象,表名需要转字节
  91. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  92. // 创建多个列族
  93. for (String cf : columnFamily) {
  94. descriptor.addFamily(new HColumnDescriptor(cf));
  95. }
  96. // 根据对表的配置,创建表
  97. admin.createTable(descriptor);
  98. log.info("Table:" + tableName + " create successfully!");
  99. } catch (Exception e) {
  100. log.error(e.getMessage());
  101. } finally {
  102. admin.close();
  103. }
  104. }
  105. /**
  106. * 新增记录(DML)
  107. * put 't1', '1001', 'cf1:name', 'zhangsan'
  108. * put 't1', '1001', 'cf1:age', '23'
  109. *
  110. * @param tableName 表名
  111. * @param rowKey 行键
  112. * @param columnFamily 列簇
  113. * @param column 列
  114. * @param value 值
  115. * @throws IOException
  116. */
  117. public void addRowData(String tableName, String rowKey, String columnFamily, String
  118. column, String value) throws IOException {
  119. Connection conn = getConnection();
  120. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  121. try {
  122. // 向表中插入数据
  123. Put put = new Put(Bytes.toBytes(rowKey));
  124. // 向Put对象中组装数据
  125. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
  126. hTable.put(put);
  127. log.info("insert successfully!");
  128. } catch (Exception e) {
  129. log.error(e.getMessage());
  130. } finally {
  131. hTable.close();
  132. }
  133. }
  134. /**
  135. * 全表扫描(Scan)
  136. * scan "t1"
  137. *
  138. * @param tableName 表名
  139. * @return
  140. * @throws IOException
  141. */
  142. public ResultScanner getAllRows(String tableName) throws IOException {
  143. ResultScanner results = null;
  144. Connection conn = getConnection();
  145. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  146. try {
  147. // 得到用于扫描region的对象
  148. Scan scan = new Scan();
  149. // setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
  150. scan.setCaching(1000);
  151. // 使用HTable得到resultcanner实现类的对象
  152. results = hTable.getScanner(scan);
  153. /*for (Result result : results) {
  154. Cell[] cells = result.rawCells();
  155. for (Cell cell : cells) {
  156. System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
  157. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  158. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  159. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  160. System.out.println();
  161. }
  162. }*/
  163. } catch (Exception e) {
  164. log.error(e.getMessage());
  165. } finally {
  166. hTable.close();
  167. }
  168. return results;
  169. }
  170. /**
  171. * 获取单行记录(get)
  172. * get "t1","1001"
  173. *
  174. * @param tableName 表名
  175. * @param rowKey 行键
  176. * @return
  177. * @throws IOException
  178. */
  179. public Result getRow(String tableName, String rowKey) throws IOException {
  180. Result result = null;
  181. Connection conn = getConnection();
  182. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  183. try {
  184. Get get = new Get(Bytes.toBytes(rowKey));
  185. //get.setMaxVersions(); // 显示所有版本
  186. //get.setTimeStamp(); // 显示指定时间戳的版本
  187. result = hTable.get(get);
  188. /*for (Cell cell : result.rawCells()) {
  189. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  190. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  191. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  192. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  193. System.out.print("Timestamp:" + cell.getTimestamp());
  194. System.out.println();
  195. }*/
  196. } catch (Exception e) {
  197. log.error(e.getMessage());
  198. } finally {
  199. hTable.close();
  200. }
  201. return result;
  202. }
  203. /**
  204. * 获取多行记录(get)
  205. *
  206. * @param tableName 表名
  207. * @param rows 行键(可扩展参数)
  208. * @param <T>
  209. * @return
  210. * @throws IOException
  211. */
  212. public <T> Result[] getRows(String tableName, List<T> rows) throws IOException {
  213. Connection conn = getConnection();
  214. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  215. List<Get> gets = null;
  216. Result[] results = null;
  217. try {
  218. if (hTable != null) {
  219. gets = new ArrayList<Get>();
  220. for (T row : rows) {
  221. if (row != null) {
  222. gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
  223. } else {
  224. throw new RuntimeException("hbase have no data");
  225. }
  226. }
  227. }
  228. if (gets.size() > 0) {
  229. results = hTable.get(gets);
  230. }
  231. } catch (IOException e) {
  232. log.error("getRows failure !", e);
  233. } finally {
  234. try {
  235. hTable.close();
  236. } catch (IOException e) {
  237. log.error("table.close() failure !", e);
  238. }
  239. }
  240. return results;
  241. }
  242. /**
  243. * 根据限定符获取单行记录(get)
  244. * get "t1","1001","cf1:name"
  245. *
  246. * @param tableName 表名
  247. * @param rowKey 行键
  248. * @param family 列簇
  249. * @param qualifier 限定符
  250. * @throws IOException
  251. */
  252. public Result getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException {
  253. Result result = null;
  254. Connection conn = getConnection();
  255. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  256. try {
  257. Get get = new Get(Bytes.toBytes(rowKey));
  258. get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
  259. result = hTable.get(get);
  260. /*for (Cell cell : result.rawCells()) {
  261. System.out.print("RowKey:" + Bytes.toString(result.getRow()) + "\t");
  262. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  263. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  264. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
  265. System.out.print("Timestamp:" + cell.getTimestamp());
  266. System.out.println();
  267. }*/
  268. } catch (Exception e) {
  269. log.error(e.getMessage());
  270. } finally {
  271. hTable.close();
  272. }
  273. return result;
  274. }
  275. /**
  276. * 删除多行数据(DML)
  277. *
  278. * @param tableName 表名
  279. * @param rows 行键(可扩展参数)
  280. * @throws IOException
  281. */
  282. public void deleteMultiRow(String tableName, String... rows) throws IOException {
  283. Connection conn = getConnection();
  284. HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
  285. try {
  286. List<Delete> deleteList = new ArrayList<>();
  287. for (String row : rows) {
  288. Delete delete = new Delete(Bytes.toBytes(row));
  289. deleteList.add(delete);
  290. }
  291. hTable.delete(deleteList);
  292. } catch (Exception e) {
  293. log.error(e.getMessage());
  294. } finally {
  295. hTable.close();
  296. }
  297. }
  298. /**
  299. * 删除表(DDL)
  300. *
  301. * @param tableName 表名
  302. * @throws IOException
  303. */
  304. public void dropTable(String tableName) throws IOException {
  305. if (!isTableExist(tableName)) {
  306. log.error("Table:" + tableName + " not exist!");
  307. return;
  308. }
  309. Connection conn = getConnection();
  310. HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
  311. try {
  312. if (isTableExist(tableName)) {
  313. admin.disableTable(tableName);
  314. admin.deleteTable(tableName);
  315. log.info("Table:" + tableName + " delete successfully!");
  316. }
  317. } catch (Exception e) {
  318. log.error(e.getMessage());
  319. } finally {
  320. admin.close();
  321. }
  322. }
  323. }

3. 测试

■ 测试类(HBaseServiceApplicationTests.java)

  1. import com.lonton.bigdata.service.HBaseService;
  2. import org.apache.hadoop.hbase.Cell;
  3. import org.apache.hadoop.hbase.CellUtil;
  4. import org.apache.hadoop.hbase.client.Result;
  5. import org.apache.hadoop.hbase.client.ResultScanner;
  6. import org.apache.hadoop.hbase.util.Bytes;
  7. import org.junit.Assert;
  8. import org.junit.jupiter.api.Test;
  9. import org.junit.runner.RunWith;
  10. import org.springframework.boot.test.context.SpringBootTest;
  11. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  12. import javax.annotation.Resource;
  13. import java.io.IOException;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. @RunWith(SpringJUnit4ClassRunner.class)
  17. @SpringBootTest
  18. class HBaseServiceApplicationTests {
  19. private String tableName="t1";
  20. private String rowKey="1001";
  21. @Resource
  22. private HBaseService service;
  23. @Test
  24. public void isTableExist() throws IOException {
  25. assert service.isTableExist(tableName);
  26. }
  27. @Test
  28. public void addRowData() throws IOException {
  29. String value="tom";
  30. service.addRowData(tableName, rowKey, "cf1", "name", value);
  31. Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");
  32. Cell[] cells=result.rawCells();
  33. Assert.assertNotNull(cells);
  34. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  35. }
  36. @Test
  37. public void getAllRows() throws IOException {
  38. ResultScanner results =service.getAllRows(tableName);
  39. Assert.assertNotNull(results);
  40. for (Result result : results) {
  41. Cell[] cells = result.rawCells();
  42. for (Cell cell : cells) {
  43. System.out.print("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
  44. System.out.print("ColumnFamily:" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
  45. System.out.print("Qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
  46. System.out.print("Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  47. System.out.println();
  48. }
  49. Assert.assertNotNull(result);
  50. }
  51. }
  52. @Test
  53. public void getRow() throws IOException {
  54. String value="tom";
  55. service.addRowData(tableName, rowKey, "cf1", "name", value);
  56. Result result=service.getRow(tableName,rowKey);
  57. Cell[] cells=result.rawCells();
  58. Assert.assertNotNull(cells);
  59. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  60. }
  61. @Test
  62. public void getRows() throws IOException {
  63. String rowKey1="1003";
  64. String rowKey2="1004";
  65. String value1="tom1";
  66. String value2="tom2";
  67. service.addRowData(tableName, rowKey1, "cf1", "name", value1);
  68. service.addRowData(tableName, rowKey2, "cf1", "name", value2);
  69. List<String> rows=new ArrayList<>();
  70. rows.add(rowKey1);
  71. rows.add(rowKey2);
  72. Result[] results=service.getRows(tableName,rows);
  73. Assert.assertNotNull(results);
  74. Assert.assertTrue(results.length==2);
  75. }
  76. @Test
  77. public void getRowQualifier() throws IOException {
  78. String value="tom";
  79. service.addRowData(tableName, rowKey, "cf1", "name", value);
  80. Result result=service.getRowQualifier(tableName, rowKey, "cf1", "name");
  81. Cell[] cells=result.rawCells();
  82. Assert.assertNotNull(cells);
  83. Assert.assertEquals(value,Bytes.toString(CellUtil.cloneValue(cells[0])));
  84. }
  85. @Test
  86. public void deleteMultiRow() throws IOException {
  87. String rowKey1="1003";
  88. String rowKey2="1004";
  89. String value1="tom1";
  90. String value2="tom2";
  91. service.addRowData(tableName, rowKey1, "cf1", "name", value1);
  92. service.addRowData(tableName, rowKey2, "cf1", "name", value2);
  93. service.deleteMultiRow(tableName, rowKey1, rowKey2);
  94. List<String> rows=new ArrayList<>();
  95. rows.add(rowKey1);
  96. rows.add(rowKey2);
  97. Result[] results=service.getRows(tableName,rows);
  98. Assert.assertNotNull(results);
  99. for (Result result : results) {
  100. Cell[] cells=result.rawCells();
  101. Assert.assertTrue(cells.length==0);
  102. }
  103. }
  104. @Test
  105. public void dropTable() throws IOException {
  106. assert service.isTableExist(tableName);
  107. service.dropTable(tableName);
  108. assert !service.isTableExist(tableName);
  109. }
  110. }

4. 完整示例代码

源码:hbase-springboot-example02-src.zip

方式3:SpringBoot + HbaseTemplate

1. 添加pom.xml依赖

  1. <dependency>
  2. <groupId>org.springframework.data</groupId>
  3. <artifactId>spring-data-hadoop</artifactId>
  4. <version>2.5.0.RELEASE</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-client</artifactId>
  9. <version>1.2.5</version>
  10. </dependency>

2. 工程配置

■ SpringBoot配置(application.yml)

  1. hbase:
  2. config:
  3. hbase.zookeeper.quorum: 192.168.0.101,192.168.0.102,192.168.0.103
  4. hbase.zookeeper.property.clientPort: 2181
  5. zookeeper.znode.parent: /hbase
  6. hbase.client.scanner.caching: 500
  7. hbase.client.pause: 50
  8. hbase.rpc.timeout: 2000
  9. hbase.client.operation.timeout: 3000
  10. hbase.client.retries.number: 3
  11. hbase.rootdir: hdfs://bigdata-node1:9000/hbase

3. 编写关键类

■ 配置映射类(HBaseProperties.java)

  1. import org.springframework.boot.context.properties.ConfigurationProperties;
  2. import java.util.Map;
  3. @ConfigurationProperties(prefix = "hbase")
  4. public class HBaseProperties {
  5. private Map<String, String> config;
  6. public Map<String, String> getConfig() {
  7. return config;
  8. }
  9. public void setConfig(Map<String, String> config) {
  10. this.config = config;
  11. }
  12. }

■ 配置类(HBaseConfig.java)

  1. import org.apache.hadoop.hbase.HBaseConfiguration;
  2. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.data.hadoop.hbase.HbaseTemplate;
  6. import java.util.Map;
  7. import java.util.Set;
  8. @Configuration
  9. @EnableConfigurationProperties(HBaseProperties.class)
  10. public class HBaseConfig {
  11. private final HBaseProperties properties;
  12. public HBaseConfig(HBaseProperties properties) {
  13. this.properties = properties;
  14. }
  15. @Bean
  16. public HbaseTemplate hbaseTemplate() {
  17. HbaseTemplate hbaseTemplate = new HbaseTemplate();
  18. hbaseTemplate.setConfiguration(configuration());
  19. hbaseTemplate.setAutoFlush(true);
  20. return hbaseTemplate;
  21. }
  22. public org.apache.hadoop.conf.Configuration configuration() {
  23. org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
  24. Map<String, String> config = properties.getConfig();
  25. Set<String> keySet = config.keySet();
  26. for (String key : keySet) {
  27. configuration.set(key, config.get(key));
  28. }
  29. return configuration;
  30. }
  31. }

其他方式:

  1. @Configuration
  2. public class HBaseConfig {
  3. @Value("${hbase.zookeeper.quorum}")
  4. private String zookeeperQuorum;
  5. @Value("${hbase.zookeeper.property.clientPort}")
  6. private String clientPort;
  7. @Value("${zookeeper.znode.parent}")
  8. private String znodeParent;
  9. @Bean
  10. public HbaseTemplate hbaseTemplate() {
  11. org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
  12. conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
  13. conf.set("hbase.zookeeper.property.clientPort", clientPort);
  14. conf.set("zookeeper.znode.parent", znodeParent);
  15. return new HbaseTemplate(conf);
  16. }
  17. }

■ 服务类(HBaseService.java)

  1. import org.apache.hadoop.hbase.*;
  2. import org.apache.hadoop.hbase.client.*;
  3. import org.apache.hadoop.hbase.util.Bytes;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.data.hadoop.hbase.HbaseTemplate;
  8. import org.springframework.data.hadoop.hbase.RowMapper;
  9. import org.springframework.data.hadoop.hbase.TableCallback;
  10. import org.springframework.stereotype.Service;
  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. import java.util.Map;
  16. @Service
  17. public class HBaseService {
  18. private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
  19. @Autowired
  20. private HbaseTemplate hbaseTemplate;
  21. /**
  22. * 判断表是否存在(DDL)
  23. * create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
  24. *
  25. * @param tableName 表名
  26. * @return 是否创建成功
  27. * @throws IOException
  28. */
  29. public boolean isTableExist(String tableName) throws IOException {
  30. boolean result = false;
  31. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  32. try {
  33. result = admin.tableExists(tableName);
  34. } catch (Exception e) {
  35. log.error(e.getMessage());
  36. } finally {
  37. admin.close();
  38. }
  39. return result;
  40. }
  41. /**
  42. * 创建表(DDL)
  43. * create 't1', 'cf1', 'cf2'
  44. *
  45. * @param tableName 表名
  46. * @param columnFamily 列簇
  47. * @throws IOException
  48. */
  49. public void createTable(String tableName, String... columnFamily) throws IOException {
  50. // 判断表是否存在
  51. if (isTableExist(tableName)) {
  52. log.error("Table:" + tableName + " already exists!");
  53. return;
  54. }
  55. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  56. try {
  57. // 创建表属性对象,表名需要转字节
  58. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  59. // 创建多个列族
  60. for (String cf : columnFamily) {
  61. descriptor.addFamily(new HColumnDescriptor(cf));
  62. }
  63. // 根据对表的配置,创建表
  64. admin.createTable(descriptor);
  65. log.info("Table:" + tableName + " create successfully!");
  66. } catch (Exception e) {
  67. log.error(e.getMessage());
  68. } finally {
  69. admin.close();
  70. }
  71. }
  72. /**
  73. * 新增记录(DML)
  74. * put 't1', '1001', 'cf1:name', 'zhangsan'
  75. * put 't1', '1001', 'cf1:age', '23'
  76. *
  77. * @param tableName 表名
  78. * @param rowKey 行键
  79. * @param columnFamily 列簇
  80. * @param column 列
  81. * @param value 值
  82. */
  83. public void addRowData(String tableName, String rowKey, String columnFamily, String
  84. column, String value) {
  85. hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));
  86. }
  87. /**
  88. * 全表扫描(Scan)
  89. * scan 't1'
  90. * [
  91. * {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},
  92. * {cf2={name=lisi, age=24}}
  93. * ]
  94. *
  95. * @param tableName 表名
  96. * @return
  97. */
  98. public List<Map<String, Map<String, String>>> getAllRows(String tableName) {
  99. Scan scan = new Scan();
  100. List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {
  101. Cell[] cells = result.rawCells();
  102. Map<String, Map<String, String>> data = new HashMap<>(16);
  103. for (Cell c : cells) {
  104. String columnFamily = new String(CellUtil.cloneFamily(c));
  105. String rowName = new String(CellUtil.cloneQualifier(c));
  106. String value = new String(CellUtil.cloneValue(c));
  107. Map<String, String> obj = data.get(columnFamily);
  108. if (null == obj) {
  109. obj = new HashMap<>(16);
  110. }
  111. obj.put(rowName, value);
  112. data.put(columnFamily, obj);
  113. }
  114. return data;
  115. });
  116. return list;
  117. }
  118. /**
  119. * 根据RowKey获取单行记录(get)
  120. * get 't1','1001'
  121. * {cf1_name=tom, cf1_age=23}
  122. *
  123. * @param tableName 表名
  124. * @param rowKey 行键
  125. * @return
  126. */
  127. public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {
  128. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
  129. @Override
  130. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  131. Cell[] cells = result.rawCells();
  132. Map<String, Map<String, Object>> data = new HashMap<>(16);
  133. for (Cell c : cells) {
  134. String columnFamily = new String(CellUtil.cloneFamily(c));
  135. String rowName = new String(CellUtil.cloneQualifier(c));
  136. String value = new String(CellUtil.cloneValue(c));
  137. Map<String, Object> obj = data.get(columnFamily);
  138. if (null == obj) {
  139. obj = new HashMap<>(16);
  140. }
  141. obj.put(rowName, value);
  142. data.put(columnFamily, obj);
  143. }
  144. return data;
  145. }
  146. });
  147. }
  148. /**
  149. * 获取多行记录(get)
  150. *
  151. * @param tableName 表名
  152. * @param rowKeys 行键(可扩展参数)
  153. * @return
  154. */
  155. public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {
  156. List<Map<String, Map<String, Object>>> rows = new ArrayList<>();
  157. for (String rowKey : rowKeys) {
  158. Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
  159. @Override
  160. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  161. Cell[] cells = result.rawCells();
  162. Map<String, Map<String, Object>> data = new HashMap<>(16);
  163. for (Cell c : cells) {
  164. String columnFamily = new String(CellUtil.cloneFamily(c));
  165. String rowName = new String(CellUtil.cloneQualifier(c));
  166. String value = new String(CellUtil.cloneValue(c));
  167. Map<String, Object> obj = data.get(columnFamily);
  168. if (null == obj) {
  169. obj = new HashMap<>(16);
  170. }
  171. obj.put(rowName, value);
  172. obj.put("rowKey", rowKey);
  173. data.put(columnFamily, obj);
  174. }
  175. return data;
  176. }
  177. });
  178. rows.add(row);
  179. }
  180. return rows;
  181. }
  182. /**
  183. * 根据限定符获取指定数据(get)
  184. *
  185. * @param tableName 表名
  186. * @param rowName 行键
  187. * @param familyName 列簇
  188. * @param qualifier 限定符
  189. * @return
  190. */
  191. public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {
  192. return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {
  193. List<Cell> ceList = result.listCells();
  194. String res = "";
  195. if (ceList != null && ceList.size() > 0) {
  196. for (Cell cell : ceList) {
  197. res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  198. }
  199. }
  200. return res;
  201. });
  202. }
  203. /**
  204. * 根据StartKey和EndKey获取数据(scan),前闭后开区间
  205. *
  206. * @param tableName 表名
  207. * @param startRow 起始rowKey
  208. * @param stopRow 结束rowKey
  209. * @return
  210. */
  211. public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {
  212. Scan scan = new Scan();
  213. if (startRow == null) {
  214. startRow = "";
  215. }
  216. if (stopRow == null) {
  217. stopRow = "";
  218. } else {
  219. stopRow += "|";
  220. }
  221. scan.setStartRow(Bytes.toBytes(startRow));
  222. scan.setStopRow(Bytes.toBytes(stopRow));
  223. /* PageFilter filter = new PageFilter(5);
  224. scan.setFilter(filter);*/
  225. return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {
  226. @Override
  227. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  228. List<Cell> ceList = result.listCells();
  229. Map<String, Map<String, Object>> data = new HashMap<>(16);
  230. if (ceList != null && ceList.size() > 0) {
  231. for (Cell cell : ceList) {
  232. String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
  233. String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  234. String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
  235. String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
  236. Map<String, Object> obj = data.get(family);
  237. if (null == obj) {
  238. obj = new HashMap<>(16);
  239. }
  240. obj.put("rowKey", rowKey);
  241. obj.put(qualifier, value);
  242. data.put(family, obj);
  243. }
  244. }
  245. return data;
  246. }
  247. });
  248. }
  249. /**
  250. * 删除列簇(DML)
  251. *
  252. * @param tableName 表名
  253. * @param rowKey 行键
  254. * @param familyName 列簇
  255. */
  256. public void deleteRow(String tableName, String rowKey, String familyName) {
  257. hbaseTemplate.delete(tableName, rowKey, familyName);
  258. }
  259. /**
  260. * 删除限定符(DML)
  261. *
  262. * @param tableName 表名
  263. * @param rowKey 行键
  264. * @param familyName 列簇
  265. * @param qualifier 限定符
  266. */
  267. public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {
  268. hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);
  269. }
  270. /**
  271. * 删除多行数据(DML)
  272. *
  273. * @param tableName 表名
  274. * @param familyNames 列簇
  275. * @param rowKeys 行键(可扩展参数)
  276. */
  277. public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {
  278. for (String rowKey : rowKeys) {
  279. for (String familyName : familyNames) {
  280. hbaseTemplate.delete(tableName, rowKey, familyName);
  281. }
  282. }
  283. }
  284. /**
  285. * 根据rowKey删除记录(DML)
  286. *
  287. * @param tableName 表名
  288. * @param rowKey 行键
  289. */
  290. public void deleteAll(String tableName, final String rowKey) {
  291. hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
  292. public Boolean doInTable(HTableInterface table) {
  293. boolean flag = false;
  294. try {
  295. List<Delete> list = new ArrayList<>();
  296. Delete d1 = new Delete(rowKey.getBytes());
  297. list.add(d1);
  298. table.delete(list);
  299. flag = true;
  300. } catch (Exception e) {
  301. e.printStackTrace();
  302. }
  303. return flag;
  304. }
  305. });
  306. }
  307. /**
  308. * 删除表(DDL)
  309. *
  310. * @param tableName 表名
  311. * @throws IOException
  312. */
  313. public void dropTable(String tableName) throws IOException {
  314. if (!isTableExist(tableName)) {
  315. log.error("Table:" + tableName + " not exist!");
  316. return;
  317. }
  318. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  319. try {
  320. if (isTableExist(tableName)) {
  321. admin.disableTable(tableName);
  322. admin.deleteTable(tableName);
  323. log.info("Table:" + tableName + " delete successfully!");
  324. }
  325. } catch (Exception e) {
  326. log.error(e.getMessage());
  327. } finally {
  328. admin.close();
  329. }
  330. }
  331. /**
  332. * 通用方法(一般可用于更新操作,如:delete、put)
  333. 示例:deleteAll
  334. TableCallback action = new TableCallback<Boolean>() {
  335. public Boolean doInTable(HTableInterface table) {
  336. boolean flag = false;
  337. try {
  338. List<Delete> list = new ArrayList<>();
  339. Delete d1 = new Delete(rowKey.getBytes());
  340. list.add(d1);
  341. table.delete(list);
  342. flag = true;
  343. } catch (Exception e) {
  344. e.printStackTrace();
  345. }
  346. return flag;
  347. }
  348. };
  349. 示例:put
  350. TableCallback action = new TableCallback<Boolean>() {
  351. public Boolean doInTable(HTableInterface table) {
  352. boolean flag = false;
  353. try {
  354. Put put = new Put(rowKey.getBytes());
  355. put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
  356. put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
  357. table.put(put);
  358. flag = true;
  359. } catch (Exception e) {
  360. e.printStackTrace();
  361. }
  362. return flag;
  363. }
  364. };
  365. * @param tableName
  366. * @param action
  367. */
  368. public void execute(String tableName, TableCallback action) {
  369. hbaseTemplate.execute(tableName, action);
  370. }
  371. }

4. 测试

■ 测试类(HBaseServiceApplicationTests.java)

  1. import org.apache.hadoop.hbase.client.Delete;
  2. import org.apache.hadoop.hbase.client.HTableInterface;
  3. import org.apache.hadoop.hbase.client.Put;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.junit.Assert;
  6. import org.junit.jupiter.api.Test;
  7. import org.junit.runner.RunWith;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import org.springframework.data.hadoop.hbase.TableCallback;
  11. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  12. import java.io.IOException;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. @RunWith(SpringJUnit4ClassRunner.class)
  17. @SpringBootTest
  18. class HBaseServiceApplicationTests {
  19. private String tableName = "t1";
  20. private String rowKey = "1001";
  21. private String rowKey2 = "1002";
  22. private String rowKey3 = "1003";
  23. private String CF_01 = "cf1";
  24. private String CF1_QUALIFIER_NAME = "name";
  25. private String CF1_QUALIFIER_AGE = "age";
  26. private String CF1_NAME_VALUE = "tom";
  27. private String CF1_AGE_VALUE = "23";
  28. private String CF_02 = "cf2";
  29. private String CF2_QUALIFIER_ADDR = "address";
  30. private String CF2_ADDR_VALUE = "beijing";
  31. @Autowired
  32. private HBaseService service;
  33. @Test
  34. public void isTableExist() throws IOException {
  35. assert service.isTableExist(tableName);
  36. }
  37. @Test
  38. public void createTable() throws IOException {
  39. service.createTable(tableName, CF_01, CF_02);
  40. assert service.isTableExist(tableName);
  41. }
  42. @Test
  43. public void addRowData() {
  44. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  45. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  46. }
  47. @Test
  48. public void getAllRows() {
  49. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  50. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  51. List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);
  52. Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据
  53. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  54. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  55. Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));
  56. System.out.println(rows);
  57. }
  58. @Test
  59. public void getRowByRowKey() {
  60. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  61. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  62. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  63. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  64. Assert.assertNotNull(row);
  65. Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));
  66. Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));
  67. Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));
  68. System.out.println(row);
  69. }
  70. @Test
  71. public void getRows() {
  72. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  73. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  74. List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
  75. Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
  76. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  77. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  78. System.out.println(rows);
  79. }
  80. @Test
  81. public void getRowQualifier() {
  82. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  83. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  84. }
  85. @Test
  86. public void getRowsByKey() {
  87. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  88. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  89. List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);
  90. Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
  91. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  92. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  93. System.out.println(rows);
  94. }
  95. @Test
  96. public void deleteQualifier() {
  97. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  98. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  99. service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);
  100. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  101. System.out.println(row);
  102. Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));
  103. Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));
  104. }
  105. @Test
  106. public void deleteRow() {
  107. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  108. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  109. service.deleteRow(tableName, rowKey, CF_01);
  110. service.deleteRow(tableName, rowKey, CF_02);
  111. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  112. System.out.println(row);
  113. Assert.assertTrue(row.isEmpty());
  114. }
  115. @Test
  116. public void deleteMultiRow() {
  117. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  118. service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  119. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  120. service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  121. List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
  122. System.out.println("----------------------- test 1 -------------------------");
  123. System.out.println(rows);
  124. String[] familyNames = {CF_01, CF_02};
  125. service.deleteRows(tableName, familyNames, rowKey2, rowKey3);
  126. rows = service.getRows(tableName, rowKey2, rowKey3);
  127. System.out.println("----------------------- test 2 -------------------------");
  128. System.out.println(rows); // [{}, {}]
  129. Assert.assertTrue(rows.get(0).isEmpty());
  130. }
  131. @Test
  132. public void deleteAll() {
  133. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  134. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  135. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  136. System.out.println(row);
  137. service.deleteAll(tableName, rowKey);
  138. row = service.getRowByRowKey(tableName, rowKey);
  139. System.out.println(row);
  140. Assert.assertTrue(row.isEmpty());
  141. }
  142. @Test
  143. public void dropTable() throws IOException {
  144. if (!service.isTableExist(tableName)) {
  145. service.createTable(tableName);
  146. }
  147. assert service.isTableExist(tableName);
  148. service.dropTable(tableName);
  149. assert !service.isTableExist(tableName);
  150. }
  151. @Test
  152. public void execute_deleteAll() {
  153. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  154. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  155. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  156. System.out.println(row);
  157. TableCallback action = new TableCallback<Boolean>() {
  158. public Boolean doInTable(HTableInterface table) {
  159. boolean flag = false;
  160. try {
  161. List<Delete> list = new ArrayList<>();
  162. Delete d1 = new Delete(rowKey.getBytes());
  163. list.add(d1);
  164. table.delete(list);
  165. flag = true;
  166. } catch (Exception e) {
  167. e.printStackTrace();
  168. }
  169. return flag;
  170. }
  171. };
  172. service.execute(tableName, action);
  173. row = service.getRowByRowKey(tableName, rowKey);
  174. System.out.println(row);
  175. Assert.assertTrue(row.isEmpty());
  176. }
  177. @Test
  178. public void execute_put() {
  179. TableCallback action = new TableCallback<Boolean>() {
  180. public Boolean doInTable(HTableInterface table) {
  181. boolean flag = false;
  182. try {
  183. Put put = new Put(rowKey.getBytes());
  184. put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
  185. put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
  186. table.put(put);
  187. flag = true;
  188. } catch (Exception e) {
  189. e.printStackTrace();
  190. }
  191. return flag;
  192. }
  193. };
  194. service.execute(tableName, action);
  195. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  196. Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));
  197. }
  198. }

5. 完整示例代码

源码:hbase-springboot-example03-src.zip

方式4:SpringBoot + HbaseTemplate(Spring配置)

1. 添加pom.xml依赖

  1. <dependency>
  2. <groupId>org.springframework.data</groupId>
  3. <artifactId>spring-data-hadoop</artifactId>
  4. <version>2.5.0.RELEASE</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-client</artifactId>
  9. <version>1.2.5</version>
  10. </dependency>

2. 工程配置

■ 参数配置(resources/config/hbase.properties)

  1. hbase.zk.host=192.168.0.101,192.168.0.102,192.168.0.103
  2. hbase.zk.port=2181

■ Spring配置(resources/config/hbase-spring.xml)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:hdp="http://www.springframework.org/schema/hadoop"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  8. http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
  9. <context:property-placeholder location="classpath:/config/hbase.properties"/>
  10. <hdp:configuration id="hadoopConfiguration">fs.default.name=hdfs://bigdata-node1:9000</hdp:configuration>
  11. <!-- HA -->
  12. <!--<hdp:configuration id="hadoopConfiguration">fs.defaultFS=hdfs://bigdata-node1:9000</hdp:configuration>-->
  13. <hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}" delete-connection="false" />
  14. <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
  15. <property name="configuration" ref="hbaseConfiguration"/>
  16. </bean>
  17. </beans>

3. 编写关键类

■ 启动类(Application.java)

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.context.annotation.ImportResource;
  4. @SpringBootApplication
  5. @ImportResource(locations = {"classpath:/config/hbase-spring.xml"})
  6. public class Application {
  7. public static void main(String[] args) {
  8. // 设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
  9. //System.setProperty("hadoop.home.dir", "E:\\bigdataenv\\hadoop-common-2.2.0-bin-master");
  10. SpringApplication.run(Application.class, args);
  11. }
  12. }

■ 服务类(HBaseService.java)

  1. import org.apache.hadoop.hbase.*;
  2. import org.apache.hadoop.hbase.client.*;
  3. import org.apache.hadoop.hbase.util.Bytes;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.data.hadoop.hbase.HbaseTemplate;
  8. import org.springframework.data.hadoop.hbase.RowMapper;
  9. import org.springframework.data.hadoop.hbase.TableCallback;
  10. import org.springframework.stereotype.Service;
  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. import java.util.Map;
  16. @Service
  17. public class HBaseService {
  18. private static final Logger log = LoggerFactory.getLogger(HBaseService.class);
  19. @Autowired
  20. private HbaseTemplate hbaseTemplate;
  21. /**
  22. * 判断表是否存在(DDL)
  23. * create 't_user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
  24. *
  25. * @param tableName 表名
  26. * @return 是否创建成功
  27. * @throws IOException
  28. */
  29. public boolean isTableExist(String tableName) throws IOException {
  30. boolean result = false;
  31. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  32. try {
  33. result = admin.tableExists(tableName);
  34. } catch (Exception e) {
  35. log.error(e.getMessage());
  36. } finally {
  37. admin.close();
  38. }
  39. return result;
  40. }
  41. /**
  42. * 创建表(DDL)
  43. * create 't1', 'cf1', 'cf2'
  44. *
  45. * @param tableName 表名
  46. * @param columnFamily 列簇
  47. * @throws IOException
  48. */
  49. public void createTable(String tableName, String... columnFamily) throws IOException {
  50. // 判断表是否存在
  51. if (isTableExist(tableName)) {
  52. log.error("Table:" + tableName + " already exists!");
  53. return;
  54. }
  55. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  56. try {
  57. // 创建表属性对象,表名需要转字节
  58. HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
  59. // 创建多个列族
  60. for (String cf : columnFamily) {
  61. descriptor.addFamily(new HColumnDescriptor(cf));
  62. }
  63. // 根据对表的配置,创建表
  64. admin.createTable(descriptor);
  65. log.info("Table:" + tableName + " create successfully!");
  66. } catch (Exception e) {
  67. log.error(e.getMessage());
  68. } finally {
  69. admin.close();
  70. }
  71. }
  72. /**
  73. * 新增记录(DML)
  74. * put 't1', '1001', 'cf1:name', 'zhangsan'
  75. * put 't1', '1001', 'cf1:age', '23'
  76. *
  77. * @param tableName 表名
  78. * @param rowKey 行键
  79. * @param columnFamily 列簇
  80. * @param column 列
  81. * @param value 值
  82. */
  83. public void addRowData(String tableName, String rowKey, String columnFamily, String
  84. column, String value) {
  85. hbaseTemplate.put(tableName, rowKey, columnFamily, column, Bytes.toBytes(value));
  86. }
  87. /**
  88. * 全表扫描(Scan)
  89. * scan 't1'
  90. * [
  91. * {cf2={name=zhangsan, age=23}, cf1={name=tom1, age=23}},
  92. * {cf2={name=lisi, age=24}}
  93. * ]
  94. *
  95. * @param tableName 表名
  96. * @return
  97. */
  98. public List<Map<String, Map<String, String>>> getAllRows(String tableName) {
  99. Scan scan = new Scan();
  100. List<Map<String, Map<String, String>>> list = hbaseTemplate.find(tableName, scan, (result, rowNum) -> {
  101. Cell[] cells = result.rawCells();
  102. Map<String, Map<String, String>> data = new HashMap<>(16);
  103. for (Cell c : cells) {
  104. String columnFamily = new String(CellUtil.cloneFamily(c));
  105. String rowName = new String(CellUtil.cloneQualifier(c));
  106. String value = new String(CellUtil.cloneValue(c));
  107. Map<String, String> obj = data.get(columnFamily);
  108. if (null == obj) {
  109. obj = new HashMap<>(16);
  110. }
  111. obj.put(rowName, value);
  112. data.put(columnFamily, obj);
  113. }
  114. return data;
  115. });
  116. return list;
  117. }
  118. /**
  119. * 根据RowKey获取单行记录(get)
  120. * get 't1','1001'
  121. * {cf1_name=tom, cf1_age=23}
  122. *
  123. * @param tableName 表名
  124. * @param rowKey 行键
  125. * @return
  126. */
  127. public Map<String, Map<String, Object>> getRowByRowKey(String tableName, String rowKey) {
  128. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
  129. @Override
  130. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  131. Cell[] cells = result.rawCells();
  132. Map<String, Map<String, Object>> data = new HashMap<>(16);
  133. for (Cell c : cells) {
  134. String columnFamily = new String(CellUtil.cloneFamily(c));
  135. String rowName = new String(CellUtil.cloneQualifier(c));
  136. String value = new String(CellUtil.cloneValue(c));
  137. Map<String, Object> obj = data.get(columnFamily);
  138. if (null == obj) {
  139. obj = new HashMap<>(16);
  140. }
  141. obj.put(rowName, value);
  142. data.put(columnFamily, obj);
  143. }
  144. return data;
  145. }
  146. });
  147. }
  148. /**
  149. * 获取多行记录(get)
  150. *
  151. * @param tableName 表名
  152. * @param rowKeys 行键(可扩展参数)
  153. * @return
  154. */
  155. public List<Map<String, Map<String, Object>>> getRows(String tableName, String... rowKeys) {
  156. List<Map<String, Map<String, Object>>> rows = new ArrayList<>();
  157. for (String rowKey : rowKeys) {
  158. Map<String, Map<String, Object>> row = hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Map<String, Object>>>() {
  159. @Override
  160. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  161. Cell[] cells = result.rawCells();
  162. Map<String, Map<String, Object>> data = new HashMap<>(16);
  163. for (Cell c : cells) {
  164. String columnFamily = new String(CellUtil.cloneFamily(c));
  165. String rowName = new String(CellUtil.cloneQualifier(c));
  166. String value = new String(CellUtil.cloneValue(c));
  167. Map<String, Object> obj = data.get(columnFamily);
  168. if (null == obj) {
  169. obj = new HashMap<>(16);
  170. }
  171. obj.put(rowName, value);
  172. obj.put("rowKey", rowKey);
  173. data.put(columnFamily, obj);
  174. }
  175. return data;
  176. }
  177. });
  178. rows.add(row);
  179. }
  180. return rows;
  181. }
  182. /**
  183. * 根据限定符获取指定数据(get)
  184. *
  185. * @param tableName 表名
  186. * @param rowName 行键
  187. * @param familyName 列簇
  188. * @param qualifier 限定符
  189. * @return
  190. */
  191. public String getRowQualifier(String tableName, String rowName, String familyName, String qualifier) {
  192. return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {
  193. List<Cell> ceList = result.listCells();
  194. String res = "";
  195. if (ceList != null && ceList.size() > 0) {
  196. for (Cell cell : ceList) {
  197. res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  198. }
  199. }
  200. return res;
  201. });
  202. }
  203. /**
  204. * 根据StartKey和EndKey获取数据(scan),前闭后开区间
  205. *
  206. * @param tableName 表名
  207. * @param startRow 起始rowKey
  208. * @param stopRow 结束rowKey
  209. * @return
  210. */
  211. public List<Map<String, Map<String, Object>>> getRowsByKey(String tableName, String startRow, String stopRow) {
  212. Scan scan = new Scan();
  213. if (startRow == null) {
  214. startRow = "";
  215. }
  216. if (stopRow == null) {
  217. stopRow = "";
  218. } else {
  219. stopRow += "|";
  220. }
  221. scan.setStartRow(Bytes.toBytes(startRow));
  222. scan.setStopRow(Bytes.toBytes(stopRow));
  223. /* PageFilter filter = new PageFilter(5);
  224. scan.setFilter(filter);*/
  225. return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Map<String, Object>>>() {
  226. @Override
  227. public Map<String, Map<String, Object>> mapRow(Result result, int i) {
  228. List<Cell> ceList = result.listCells();
  229. Map<String, Map<String, Object>> data = new HashMap<>(16);
  230. if (ceList != null && ceList.size() > 0) {
  231. for (Cell cell : ceList) {
  232. String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
  233. String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  234. String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
  235. String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
  236. Map<String, Object> obj = data.get(family);
  237. if (null == obj) {
  238. obj = new HashMap<>(16);
  239. }
  240. obj.put("rowKey", rowKey);
  241. obj.put(qualifier, value);
  242. data.put(family, obj);
  243. }
  244. }
  245. return data;
  246. }
  247. });
  248. }
  249. /**
  250. * 删除列簇(DML)
  251. *
  252. * @param tableName 表名
  253. * @param rowKey 行键
  254. * @param familyName 列簇
  255. */
  256. public void deleteRow(String tableName, String rowKey, String familyName) {
  257. hbaseTemplate.delete(tableName, rowKey, familyName);
  258. }
  259. /**
  260. * 删除限定符(DML)
  261. *
  262. * @param tableName 表名
  263. * @param rowKey 行键
  264. * @param familyName 列簇
  265. * @param qualifier 限定符
  266. */
  267. public void deleteQualifier(String tableName, String rowKey, String familyName, String qualifier) {
  268. hbaseTemplate.delete(tableName, rowKey, familyName, qualifier);
  269. }
  270. /**
  271. * 删除多行数据(DML)
  272. *
  273. * @param tableName 表名
  274. * @param familyNames 列簇
  275. * @param rowKeys 行键(可扩展参数)
  276. */
  277. public void deleteRows(String tableName, String[] familyNames, String... rowKeys) {
  278. for (String rowKey : rowKeys) {
  279. for (String familyName : familyNames) {
  280. hbaseTemplate.delete(tableName, rowKey, familyName);
  281. }
  282. }
  283. }
  284. /**
  285. * 根据rowKey删除记录(DML)
  286. *
  287. * @param tableName 表名
  288. * @param rowKey 行键
  289. */
  290. public void deleteAll(String tableName, final String rowKey) {
  291. hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
  292. public Boolean doInTable(HTableInterface table) {
  293. boolean flag = false;
  294. try {
  295. List<Delete> list = new ArrayList<>();
  296. Delete d1 = new Delete(rowKey.getBytes());
  297. list.add(d1);
  298. table.delete(list);
  299. flag = true;
  300. } catch (Exception e) {
  301. e.printStackTrace();
  302. }
  303. return flag;
  304. }
  305. });
  306. }
  307. /**
  308. * 删除表(DDL)
  309. *
  310. * @param tableName 表名
  311. * @throws IOException
  312. */
  313. public void dropTable(String tableName) throws IOException {
  314. if (!isTableExist(tableName)) {
  315. log.error("Table:" + tableName + " not exist!");
  316. return;
  317. }
  318. HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
  319. try {
  320. if (isTableExist(tableName)) {
  321. admin.disableTable(tableName);
  322. admin.deleteTable(tableName);
  323. log.info("Table:" + tableName + " delete successfully!");
  324. }
  325. } catch (Exception e) {
  326. log.error(e.getMessage());
  327. } finally {
  328. admin.close();
  329. }
  330. }
  331. /**
  332. * 通用方法(一般可用于更新操作,如:delete、put)
  333. 示例:deleteAll
  334. TableCallback action = new TableCallback<Boolean>() {
  335. public Boolean doInTable(HTableInterface table) {
  336. boolean flag = false;
  337. try {
  338. List<Delete> list = new ArrayList<>();
  339. Delete d1 = new Delete(rowKey.getBytes());
  340. list.add(d1);
  341. table.delete(list);
  342. flag = true;
  343. } catch (Exception e) {
  344. e.printStackTrace();
  345. }
  346. return flag;
  347. }
  348. };
  349. 示例:put
  350. TableCallback action = new TableCallback<Boolean>() {
  351. public Boolean doInTable(HTableInterface table) {
  352. boolean flag = false;
  353. try {
  354. Put put = new Put(rowKey.getBytes());
  355. put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
  356. put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
  357. table.put(put);
  358. flag = true;
  359. } catch (Exception e) {
  360. e.printStackTrace();
  361. }
  362. return flag;
  363. }
  364. };
  365. * @param tableName
  366. * @param action
  367. */
  368. public void execute(String tableName, TableCallback action) {
  369. hbaseTemplate.execute(tableName, action);
  370. }
  371. }

4. 测试

■ 测试类(HBaseServiceApplicationTests.java)

  1. import org.apache.hadoop.hbase.client.Delete;
  2. import org.apache.hadoop.hbase.client.HTableInterface;
  3. import org.apache.hadoop.hbase.client.Put;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.junit.Assert;
  6. import org.junit.jupiter.api.Test;
  7. import org.junit.runner.RunWith;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import org.springframework.data.hadoop.hbase.TableCallback;
  11. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  12. import java.io.IOException;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. @RunWith(SpringJUnit4ClassRunner.class)
  17. @SpringBootTest
  18. class HBaseServiceApplicationTests {
  19. private String tableName = "t1";
  20. private String rowKey = "1001";
  21. private String rowKey2 = "1002";
  22. private String rowKey3 = "1003";
  23. private String CF_01 = "cf1";
  24. private String CF1_QUALIFIER_NAME = "name";
  25. private String CF1_QUALIFIER_AGE = "age";
  26. private String CF1_NAME_VALUE = "tom";
  27. private String CF1_AGE_VALUE = "23";
  28. private String CF_02 = "cf2";
  29. private String CF2_QUALIFIER_ADDR = "address";
  30. private String CF2_ADDR_VALUE = "beijing";
  31. @Autowired
  32. private HBaseService service;
  33. @Test
  34. public void isTableExist() throws IOException {
  35. assert service.isTableExist(tableName);
  36. }
  37. @Test
  38. public void createTable() throws IOException {
  39. service.createTable(tableName, CF_01, CF_02);
  40. assert service.isTableExist(tableName);
  41. }
  42. @Test
  43. public void addRowData() {
  44. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  45. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  46. }
  47. @Test
  48. public void getAllRows() {
  49. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  50. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  51. List<Map<String, Map<String, String>>> rows = service.getAllRows(tableName);
  52. Assert.assertTrue(rows.size() == 1); // 1个RowKey,对应1行数据
  53. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  54. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  55. Assert.assertEquals(CF1_AGE_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_AGE));
  56. System.out.println(rows);
  57. }
  58. @Test
  59. public void getRowByRowKey() {
  60. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  61. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  62. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  63. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  64. Assert.assertNotNull(row);
  65. Assert.assertEquals(CF1_NAME_VALUE, row.get(CF_01).get(CF1_QUALIFIER_NAME));
  66. Assert.assertEquals(CF1_AGE_VALUE, row.get(CF_01).get(CF1_QUALIFIER_AGE));
  67. Assert.assertEquals(CF2_ADDR_VALUE, row.get(CF_02).get(CF2_QUALIFIER_ADDR));
  68. System.out.println(row);
  69. }
  70. @Test
  71. public void getRows() {
  72. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  73. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  74. List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
  75. Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
  76. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  77. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  78. System.out.println(rows);
  79. }
  80. @Test
  81. public void getRowQualifier() {
  82. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  83. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  84. }
  85. @Test
  86. public void getRowsByKey() {
  87. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  88. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  89. List<Map<String, Map<String, Object>>> rows = service.getRowsByKey(tableName, rowKey2, rowKey3);
  90. Assert.assertTrue(rows.size() == 2); // 2个RowKey,对应1行数据
  91. Assert.assertTrue(rows.get(0).size() == 1); // 1个列簇
  92. Assert.assertEquals(CF1_NAME_VALUE, rows.get(0).get(CF_01).get(CF1_QUALIFIER_NAME));
  93. System.out.println(rows);
  94. }
  95. @Test
  96. public void deleteQualifier() {
  97. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  98. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_AGE, CF1_AGE_VALUE);
  99. service.deleteQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME);
  100. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  101. System.out.println(row);
  102. Assert.assertNull(row.get(CF_01).get(CF1_QUALIFIER_NAME));
  103. Assert.assertNotNull(row.get(CF_01).get(CF1_QUALIFIER_AGE));
  104. }
  105. @Test
  106. public void deleteRow() {
  107. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  108. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  109. service.deleteRow(tableName, rowKey, CF_01);
  110. service.deleteRow(tableName, rowKey, CF_02);
  111. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  112. System.out.println(row);
  113. Assert.assertTrue(row.isEmpty());
  114. }
  115. @Test
  116. public void deleteMultiRow() {
  117. service.addRowData(tableName, rowKey2, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  118. service.addRowData(tableName, rowKey2, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  119. service.addRowData(tableName, rowKey3, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  120. service.addRowData(tableName, rowKey3, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  121. List<Map<String, Map<String, Object>>> rows = service.getRows(tableName, rowKey2, rowKey3);
  122. System.out.println("----------------------- test 1 -------------------------");
  123. System.out.println(rows);
  124. String[] familyNames = {CF_01, CF_02};
  125. service.deleteRows(tableName, familyNames, rowKey2, rowKey3);
  126. rows = service.getRows(tableName, rowKey2, rowKey3);
  127. System.out.println("----------------------- test 2 -------------------------");
  128. System.out.println(rows); // [{}, {}]
  129. Assert.assertTrue(rows.get(0).isEmpty());
  130. }
  131. @Test
  132. public void deleteAll() {
  133. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  134. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  135. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  136. System.out.println(row);
  137. service.deleteAll(tableName, rowKey);
  138. row = service.getRowByRowKey(tableName, rowKey);
  139. System.out.println(row);
  140. Assert.assertTrue(row.isEmpty());
  141. }
  142. @Test
  143. public void dropTable() throws IOException {
  144. if (!service.isTableExist(tableName)) {
  145. service.createTable(tableName);
  146. }
  147. assert service.isTableExist(tableName);
  148. service.dropTable(tableName);
  149. assert !service.isTableExist(tableName);
  150. }
  151. @Test
  152. public void execute_deleteAll() {
  153. service.addRowData(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME, CF1_NAME_VALUE);
  154. service.addRowData(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR, CF2_ADDR_VALUE);
  155. Map<String, Map<String, Object>> row = service.getRowByRowKey(tableName, rowKey);
  156. System.out.println(row);
  157. TableCallback action = new TableCallback<Boolean>() {
  158. public Boolean doInTable(HTableInterface table) {
  159. boolean flag = false;
  160. try {
  161. List<Delete> list = new ArrayList<>();
  162. Delete d1 = new Delete(rowKey.getBytes());
  163. list.add(d1);
  164. table.delete(list);
  165. flag = true;
  166. } catch (Exception e) {
  167. e.printStackTrace();
  168. }
  169. return flag;
  170. }
  171. };
  172. service.execute(tableName, action);
  173. row = service.getRowByRowKey(tableName, rowKey);
  174. System.out.println(row);
  175. Assert.assertTrue(row.isEmpty());
  176. }
  177. @Test
  178. public void execute_put() {
  179. TableCallback action = new TableCallback<Boolean>() {
  180. public Boolean doInTable(HTableInterface table) {
  181. boolean flag = false;
  182. try {
  183. Put put = new Put(rowKey.getBytes());
  184. put.add(Bytes.toBytes(CF_01), Bytes.toBytes(CF1_QUALIFIER_NAME), Bytes.toBytes(CF1_NAME_VALUE));
  185. put.add(Bytes.toBytes(CF_02), Bytes.toBytes(CF2_QUALIFIER_ADDR), Bytes.toBytes(CF2_ADDR_VALUE));
  186. table.put(put);
  187. flag = true;
  188. } catch (Exception e) {
  189. e.printStackTrace();
  190. }
  191. return flag;
  192. }
  193. };
  194. service.execute(tableName, action);
  195. Assert.assertEquals(CF1_NAME_VALUE, service.getRowQualifier(tableName, rowKey, CF_01, CF1_QUALIFIER_NAME));
  196. Assert.assertEquals(CF2_ADDR_VALUE, service.getRowQualifier(tableName, rowKey, CF_02, CF2_QUALIFIER_ADDR));
  197. }
  198. }

■ Runner测试类(HBaseServiceRunnerTest.java)

  1. import com.lonton.bigdata.service.HBaseService;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.ApplicationArguments;
  6. import org.springframework.boot.ApplicationRunner;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class HBaseServiceRunnerTest implements ApplicationRunner {
  10. private final static Logger log = LoggerFactory.getLogger(HBaseServiceRunnerTest.class);
  11. private String tableName = "t1";
  12. @Autowired
  13. private HBaseService service;
  14. @Override
  15. public void run(ApplicationArguments args) throws Exception {
  16. log.info("------------------------- HBaseServiceTest ApplicationRunner --------------------------------");
  17. log.info(String.valueOf(service.isTableExist(tableName)));
  18. }
  19. }

5. 完整示例代码

源码:hbase-springboot-example04-src.zip

方式5:SpringBoot + hbase starter

spring-boot-starter-hbase是spring-boot自定义的starter,为hbase的query和更新等操作提供简易的api并集成spring-boot的auto configuration。如果HbaseTemplate操作不满足需求,完全可以使用hbaseTemplate的getConnection()方法,获取连接。进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能**。**
源码:https://github.com/SpringForAll/spring-boot-starter-hbase
示例:https://github.com/JeffLi1993/springboot-learning-example
参考:Spring Boot 2.x:通过spring-boot-starter-hbase集成HBase

1. 添加pom.xml依赖

  1. <!-- Spring Boot HBase 依赖 -->
  2. <!-- https://mvnrepository.com/artifact/com.spring4all/spring-boot-starter-hbase -->
  3. <dependency>
  4. <groupId>com.spring4all</groupId>
  5. <artifactId>spring-boot-starter-hbase</artifactId>
  6. <version>1.0.0.RELEASE</version>
  7. </dependency>

补充:安装spring-boot-starter-hbase组件依赖(因为不在公共仓库,只能自行安装。如果有maven私库,可以考虑安装到私库)

  1. # 下载项目到本地
  2. git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
  3. cd spring-boot-starter-hbase
  4. # 安装依赖
  5. mvn clean install

源码:spring-boot-starter-hbase-master.zip

2. 工程配置

■ SpringBoot配置(application.properties)

  1. ## HBase Configuration
  2. spring.data.hbase.quorum=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
  3. spring.data.hbase.rootDir=hdfs://bigdata-node1:9000/hbase
  4. spring.data.hbase.nodeParent=/hbase

具体配置项信息如下:

  • spring.data.hbase.quorum:指定HBase的zk地址
  • spring.data.hbase.rootDir:指定HBase在HDFS上存储的路径
  • spring.data.hbase.nodeParent:指定ZK中HBase的根ZNode

    3. 编写关键类

    ■ Service接口(HBaseService.java) ```java import com.spring4all.spring.boot.starter.hbase.api.TableCallback; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import java.util.List;

/**

  • Description HBase服务接口 */ public interface HBaseService {

    /**

    • 根据rowKey获取单行记录,数据由mapper映射 *
    • @param tableName 表名
    • @param rowKey 行键
    • @param 泛型
    • @return */ T get(String tableName, String rowKey);

      /**

    • 根据rowKey和列簇取单行记录,数据由mapper映射 *
    • @param tableName 表名
    • @param rowKey 行键
    • @param familyName 列簇
    • @param 泛型
    • @return */ T get(String tableName, String rowKey, String familyName);

      /**

    • 根据限定符获取单行记录,数据由mapper映射 *
    • @param tableName 表名
    • @param rowKey 行键
    • @param familyName 列簇
    • @param qualifier 限定符
    • @param 泛型
    • @return */ T get(String tableName, String rowKey, String familyName, String qualifier);

      /**

    • 新增或修改
    • 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
    • @param tableName 表名
    • @param mutation 执行put update or delete */ void saveOrUpdate(String tableName, Mutation mutation);

      /**

    • 批量新增或修改
    • 注意:写入非字符串型的数据时,要先转为String,如:int,Bytes.toBytes(String.valueOf(值)) *
    • @param tableName 表名
    • @param mutations 批量执行put update or delete */ void saveOrUpdates(String tableName, List mutations);

      /**

    • 通用方法(一般可用于更新操作,如:delete、put)
      // 执行操作(deleteAll) TableCallback action = new TableCallback() {
      1. @Override
      2. public Boolean doInTable(Table table) throws Throwable {
      3. boolean flag = false;
      4. try {
      5. List<Delete> list = new ArrayList<>();
      6. Delete d1 = new Delete(rowKey.getBytes());
      7. list.add(d1);
      8. table.delete(list);
      9. flag = true;
      10. } catch (Exception e) {
      11. e.printStackTrace();
      12. }
      13. return flag;
      14. }
      }; // 执行操作(put update) TableCallback action = new TableCallback() {
      1. @Override
      2. public Boolean doInTable(Table table) throws Throwable {
      3. boolean flag = false;
      4. try {
      5. Put put = new Put(rowKey.getBytes());
      6. put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
      7. put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
      8. put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
      9. table.put(put);
      10. flag = true;
      11. } catch (Exception e) {
      12. e.printStackTrace();
      13. }
      14. return flag;
      15. }
      };
    • @param tableName 表名
    • @param action 执行put update or delete
    • @param 泛型
    • @return */ T execute(String tableName, TableCallback action);

      /**

    • 扫描 *
    • @param tableName 表名
    • @param scan 扫描对象
    • @param 泛型
    • @return */ List find(String tableName, Scan scan);

      /**

    • 扫描(根据列簇)
    • @param tableName
    • @param family
    • @param
    • @return */ List find(String tableName, String family);

      /**

    • 扫描(根据限定符)
    • @param tableName
    • @param family
    • @param qualifier
    • @param
    • @return */ List find(String tableName, String family, String qualifier);

      /**

    • 如果HbaseTemplate操作不满足需求,可以使用hbaseTemplate的getConnection()方法,
    • 获取连接,进而类似HbaseTemplate实现的逻辑,实现更复杂的需求查询等功能
    • @return */ Connection getConnection();

      /**

    • 关闭连接
    • @param connection */ void closeConnection(Connection connection);

}

  1. <a name="Bi7lI"></a>
  2. ## 4. 测试
  3. **■ DTO定义(Student.java)**
  4. ```java
  5. import lombok.AllArgsConstructor;
  6. import lombok.Data;
  7. import lombok.NoArgsConstructor;
  8. /**
  9. * Description DTO-学生实体类
  10. * create 'student','base','other'
  11. */
  12. @Data
  13. @NoArgsConstructor
  14. @AllArgsConstructor
  15. public class Student {
  16. private BaseInfo base;
  17. private OtherInfo other;
  18. @Data
  19. @NoArgsConstructor
  20. @AllArgsConstructor
  21. public static class BaseInfo{
  22. private String name;
  23. private Integer age;
  24. @Override
  25. public String toString() {
  26. return "BaseInfo{" +
  27. "name='" + name + '\'' +
  28. ", age=" + age +
  29. '}';
  30. }
  31. }
  32. @Data
  33. @NoArgsConstructor
  34. @AllArgsConstructor
  35. public static class OtherInfo{
  36. private Integer fv;
  37. @Override
  38. public String toString() {
  39. return "OtherInfo{" +
  40. "fv=" + fv +
  41. '}';
  42. }
  43. }
  44. @Override
  45. public String toString() {
  46. return "Student{" +
  47. "base=" + base +
  48. ", other=" + other +
  49. '}';
  50. }
  51. }

■ RowMapper定义(StudentRowMapper.java)

  1. import com.lonton.bigdata.domain.Student;
  2. import com.spring4all.spring.boot.starter.hbase.api.RowMapper;
  3. import org.apache.hadoop.hbase.client.Result;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. /**
  6. * Description HBase Mapper
  7. */
  8. public class StudentRowMapper implements RowMapper<Student> {
  9. private static final byte[] BASE_FAMILY = "base".getBytes();
  10. private static final byte[] OTHER_FAMILY = "other".getBytes();
  11. private static final byte[] NAME = "name".getBytes();
  12. private static final byte[] AGE = "age".getBytes();
  13. private static final byte[] FV = "fv".getBytes();
  14. /*方法1:ok*/
  15. /*
  16. @Override
  17. public Student mapRow(Result result, int rowNum) throws Exception {
  18. Student.BaseInfo baseInfo = new Student.BaseInfo();
  19. Student.OtherInfo otherInfo = new Student.OtherInfo();
  20. Cell[] cells = result.rawCells();
  21. for (Cell c : cells) {
  22. String columnFamily = new String(CellUtil.cloneFamily(c));
  23. String rowName = new String(CellUtil.cloneQualifier(c));
  24. String value = new String(CellUtil.cloneValue(c));
  25. if(value!=null){
  26. if(Bytes.toString(BASE_FAMILY).equals(columnFamily)){
  27. if(Bytes.toString(NAME).equals(rowName)){
  28. baseInfo.setName(value);
  29. }else if(Bytes.toString(AGE).equals(rowName)){
  30. Integer age=Integer.parseInt(value);
  31. baseInfo.setAge(age);
  32. }
  33. } else if(Bytes.toString(OTHER_FAMILY).equals(columnFamily)){
  34. if(Bytes.toString(FV).equals(rowName)){
  35. Integer fv=Integer.parseInt(value);
  36. otherInfo.setFv(fv);
  37. }
  38. }
  39. }
  40. }
  41. Student dto = new Student(baseInfo, otherInfo);
  42. return dto;
  43. }*/
  44. /*方法2:ok*/
  45. @Override
  46. public Student mapRow(Result result, int rowNum) throws Exception {
  47. Student.BaseInfo baseInfo = new Student.BaseInfo();
  48. if (result.containsColumn(BASE_FAMILY, NAME)) {
  49. String name = Bytes.toString(result.getValue(BASE_FAMILY, NAME));
  50. baseInfo.setName(name);
  51. }
  52. if (result.containsColumn(BASE_FAMILY, AGE)) {
  53. Integer age = Integer.parseInt(Bytes.toString(result.getValue(BASE_FAMILY, AGE)));
  54. baseInfo.setAge(age);
  55. }
  56. Student.OtherInfo otherInfo = new Student.OtherInfo();
  57. if (result.containsColumn(OTHER_FAMILY, FV)) {
  58. Integer fv = Integer.parseInt(Bytes.toString(result.getValue(OTHER_FAMILY, FV)));
  59. otherInfo.setFv(fv);
  60. }
  61. Student dto = new Student(baseInfo, otherInfo);
  62. return dto;
  63. }
  64. }

■ Service实现类(StudentHBaseServiceImpl.java)

  1. import com.lonton.bigdata.domain.Student;
  2. import com.lonton.bigdata.mapper.StudentRowMapper;
  3. import com.lonton.bigdata.service.HBaseService;
  4. import com.spring4all.spring.boot.starter.hbase.api.HbaseTemplate;
  5. import com.spring4all.spring.boot.starter.hbase.api.TableCallback;
  6. import org.apache.hadoop.hbase.client.Connection;
  7. import org.apache.hadoop.hbase.client.Mutation;
  8. import org.apache.hadoop.hbase.client.Scan;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Service;
  11. import java.io.IOException;
  12. import java.util.List;
  13. /**
  14. * Description HBase服务接口实现类
  15. */
  16. @Service
  17. public class StudentHBaseServiceImpl implements HBaseService {
  18. @Autowired
  19. private HbaseTemplate hbaseTemplate;
  20. @Override
  21. public Student get(String tableName, String rowKey) {
  22. return hbaseTemplate.get(tableName,rowKey,new StudentRowMapper());
  23. }
  24. @Override
  25. public Student get(String tableName, String rowKey, String familyName) {
  26. return hbaseTemplate.get(tableName,rowKey,familyName,new StudentRowMapper());
  27. }
  28. @Override
  29. public Student get(String tableName, String rowKey, String familyName, String qualifier) {
  30. return hbaseTemplate.get(tableName,rowKey,familyName,qualifier,new StudentRowMapper());
  31. }
  32. @Override
  33. public void saveOrUpdate(String tableName, Mutation mutation) {
  34. hbaseTemplate.saveOrUpdate(tableName,mutation);
  35. }
  36. @Override
  37. public void saveOrUpdates(String tableName, List<Mutation> mutations) {
  38. hbaseTemplate.saveOrUpdates(tableName,mutations);
  39. }
  40. @Override
  41. public <T> T execute(String tableName, TableCallback<T> action) {
  42. return hbaseTemplate.execute(tableName,action);
  43. }
  44. @Override
  45. public List<Student> find(String tableName, Scan scan) {
  46. return hbaseTemplate.find(tableName,scan,new StudentRowMapper());
  47. }
  48. @Override
  49. public List<Student> find(String tableName, String family) {
  50. return hbaseTemplate.find(tableName,family,new StudentRowMapper());
  51. }
  52. @Override
  53. public List<Student> find(String tableName, String family, String qualifier) {
  54. return hbaseTemplate.find(tableName,family,qualifier,new StudentRowMapper());
  55. }
  56. @Override
  57. public Connection getConnection() {
  58. return hbaseTemplate.getConnection();
  59. }
  60. @Override
  61. public void closeConnection(Connection connection) {
  62. if(connection!=null){
  63. try {
  64. connection.close();
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

■ 测试类(HBaseServiceApplicationTests.java)

  1. import com.lonton.bigdata.domain.Student;
  2. import com.spring4all.spring.boot.starter.hbase.api.TableCallback;
  3. import org.apache.hadoop.hbase.client.*;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.junit.Assert;
  6. import org.junit.jupiter.api.Test;
  7. import org.junit.runner.RunWith;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.boot.test.context.SpringBootTest;
  12. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. /****
  17. * @Description HBase单元测试类
  18. * create 'student','base','other'
  19. * truncate 'student'
  20. * put 'student','1001','base:name','zhangsan'
  21. * put 'student','1001','base:age',23
  22. * put 'student','1001','other:fv',3
  23. * scan 'student'
  24. */
  25. @RunWith(SpringJUnit4ClassRunner.class)
  26. @SpringBootTest
  27. class HBaseServiceApplicationTests {
  28. private String tableName = "student";
  29. private String rowKey = "1001";
  30. private String CF_BASE = "base";
  31. private String CF_OTHER = "other";
  32. private String CF_BASE_QUALIFIER_NAME = "name";
  33. private String CF_BASE_QUALIFIER_AGE = "age";
  34. private String CF_OTHER_QUALIFIER_FV = "fv";
  35. private String CF_BASE_VALUE_NAME = "zhangsan";
  36. private Integer CF_BASE_VALUE_AGE = 23;
  37. private Integer CF_OTHER_VALUE_FV = 3;
  38. @Autowired
  39. private HBaseService service;
  40. /** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=3}} **/
  41. @Test
  42. public void get1() {
  43. // 新增测试数据
  44. Put mutation = new Put(Bytes.toBytes(rowKey));
  45. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  46. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  47. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  48. service.saveOrUpdate(tableName, mutation);
  49. // 获取记录
  50. Student dto=service.get(tableName,rowKey);
  51. // 验证
  52. System.out.println(dto);
  53. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  54. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  55. Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
  56. }
  57. /** Student{base=BaseInfo{name='zhangsan', age=23}, other=OtherInfo{fv=null}} **/
  58. @Test
  59. public void get2() {
  60. // 新增测试数据
  61. Put mutation = new Put(Bytes.toBytes(rowKey));
  62. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  63. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  64. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  65. service.saveOrUpdate(tableName, mutation);
  66. // 获取记录
  67. Student dto=service.get(tableName,rowKey,CF_BASE);
  68. // 验证
  69. System.out.println(dto);
  70. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  71. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  72. Assert.assertNull(dto.getOther().getFv());
  73. }
  74. /** Student{base=BaseInfo{name='zhangsan', age=null}, other=OtherInfo{fv=null}} **/
  75. @Test
  76. public void get3() {
  77. // 新增测试数据
  78. Put mutation = new Put(Bytes.toBytes(rowKey));
  79. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  80. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  81. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  82. service.saveOrUpdate(tableName, mutation);
  83. // 获取记录
  84. Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);
  85. // 验证
  86. System.out.println(dto);
  87. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  88. Assert.assertNull(dto.getBase().getAge());
  89. Assert.assertNull(dto.getOther().getFv());
  90. }
  91. @Test
  92. public void saveOrUpdate() {
  93. // 新增测试数据
  94. Put mutation = new Put(Bytes.toBytes(rowKey));
  95. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  96. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  97. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  98. service.saveOrUpdate(tableName, mutation);
  99. // 验证
  100. Student dto=service.get(tableName,rowKey);
  101. System.out.println(dto);
  102. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  103. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  104. Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
  105. }
  106. @Test
  107. public void saveOrUpdates() {
  108. List<Mutation> mutations=new ArrayList<>();
  109. // 新增${CF_BASE_QUALIFIER_NAME}
  110. Put mutation = new Put(Bytes.toBytes(rowKey));
  111. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  112. mutations.add(mutation);
  113. // 新增${CF_BASE_QUALIFIER_AGE}
  114. mutation = new Put(Bytes.toBytes(rowKey));
  115. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  116. mutations.add(mutation);
  117. // 新增${CF_OTHER_QUALIFIER_FV}
  118. mutation = new Put(Bytes.toBytes(rowKey));
  119. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  120. mutations.add(mutation);
  121. // 批量新增测试数据
  122. service.saveOrUpdates(tableName, mutations);
  123. // 验证
  124. Student dto=service.get(tableName,rowKey);
  125. System.out.println(dto);
  126. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  127. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  128. Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
  129. }
  130. @Test
  131. public void execute_deleteAll() {
  132. // 新增测试数据
  133. Put mutation = new Put(Bytes.toBytes(rowKey));
  134. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  135. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  136. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  137. service.saveOrUpdate(tableName, mutation);
  138. // 获取记录
  139. Student dto=service.get(tableName,rowKey,CF_BASE,CF_BASE_QUALIFIER_NAME);
  140. // 验证
  141. System.out.println(dto);
  142. // 执行操作(deleteAll)
  143. TableCallback action = new TableCallback<Boolean>() {
  144. @Override
  145. public Boolean doInTable(Table table) throws Throwable {
  146. boolean flag = false;
  147. try {
  148. List<Delete> list = new ArrayList<>();
  149. Delete d1 = new Delete(rowKey.getBytes());
  150. list.add(d1);
  151. table.delete(list);
  152. flag = true;
  153. } catch (Exception e) {
  154. e.printStackTrace();
  155. }
  156. return flag;
  157. }
  158. };
  159. service.execute(tableName, action);
  160. // 验证
  161. dto=service.get(tableName,rowKey);
  162. System.out.println(dto);
  163. Assert.assertNull(dto.getBase().getName());
  164. Assert.assertNull(dto.getBase().getAge());
  165. Assert.assertNull(dto.getOther().getFv());
  166. }
  167. @Test
  168. public void execute_put() {
  169. // 执行操作(put update)
  170. TableCallback action = new TableCallback<Boolean>() {
  171. @Override
  172. public Boolean doInTable(Table table) throws Throwable {
  173. boolean flag = false;
  174. try {
  175. Put put = new Put(rowKey.getBytes());
  176. put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  177. put.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  178. put.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  179. table.put(put);
  180. flag = true;
  181. } catch (Exception e) {
  182. e.printStackTrace();
  183. }
  184. return flag;
  185. }
  186. };
  187. service.execute(tableName, action);
  188. // 获取记录
  189. Student dto=service.get(tableName,rowKey);
  190. // 验证
  191. System.out.println(dto);
  192. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  193. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  194. Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
  195. }
  196. @Test
  197. public void find_scan() {
  198. // 新增测试数据
  199. String startRow=rowKey+"-01";
  200. String stopRow=rowKey+"-02";
  201. Put mutation = new Put(Bytes.toBytes(startRow));
  202. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  203. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  204. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  205. service.saveOrUpdate(tableName, mutation);
  206. mutation = new Put(Bytes.toBytes(stopRow));
  207. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  208. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  209. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  210. service.saveOrUpdate(tableName, mutation);
  211. // 范围扫描(scan为空则为全表扫描)
  212. Scan scan = new Scan();
  213. if (startRow == null) {
  214. startRow = "";
  215. }
  216. if (stopRow == null) {
  217. stopRow = "";
  218. } else {
  219. stopRow += "|";
  220. }
  221. scan.setStartRow(Bytes.toBytes(startRow));
  222. scan.setStopRow(Bytes.toBytes(stopRow));
  223. scan.setCaching(5000);
  224. List<Student> dtoList=service.find(tableName,scan);
  225. // 验证
  226. for (Student dto : dtoList) {
  227. System.out.println(dto);
  228. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  229. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  230. Assert.assertEquals(CF_OTHER_VALUE_FV, dto.getOther().getFv());
  231. }
  232. }
  233. @Test
  234. public void find_family() {
  235. // 新增测试数据
  236. String startRow=rowKey+"-01";
  237. String stopRow=rowKey+"-02";
  238. Put mutation = new Put(Bytes.toBytes(startRow));
  239. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  240. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  241. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  242. service.saveOrUpdate(tableName, mutation);
  243. mutation = new Put(Bytes.toBytes(stopRow));
  244. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  245. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  246. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  247. service.saveOrUpdate(tableName, mutation);
  248. // 范围扫描
  249. List<Student> dtoList=service.find(tableName,CF_BASE);
  250. // 验证
  251. for (Student dto : dtoList) {
  252. System.out.println(dto);
  253. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  254. Assert.assertEquals(CF_BASE_VALUE_AGE, dto.getBase().getAge());
  255. Assert.assertNull(dto.getOther().getFv());
  256. }
  257. }
  258. @Test
  259. public void find_qualifier() {
  260. // 新增测试数据
  261. String startRow=rowKey+"-01";
  262. String stopRow=rowKey+"-02";
  263. Put mutation = new Put(Bytes.toBytes(startRow));
  264. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  265. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  266. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  267. service.saveOrUpdate(tableName, mutation);
  268. mutation = new Put(Bytes.toBytes(stopRow));
  269. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_NAME), Bytes.toBytes(CF_BASE_VALUE_NAME));
  270. mutation.addColumn(Bytes.toBytes(CF_BASE), Bytes.toBytes(CF_BASE_QUALIFIER_AGE), Bytes.toBytes(String.valueOf(CF_BASE_VALUE_AGE)));
  271. mutation.addColumn(Bytes.toBytes(CF_OTHER), Bytes.toBytes(CF_OTHER_QUALIFIER_FV), Bytes.toBytes(String.valueOf(CF_OTHER_VALUE_FV)));
  272. service.saveOrUpdate(tableName, mutation);
  273. // 范围扫描
  274. List<Student> dtoList=service.find(tableName,CF_BASE,CF_BASE_QUALIFIER_NAME);
  275. // 验证
  276. for (Student dto : dtoList) {
  277. System.out.println(dto);
  278. Assert.assertEquals(CF_BASE_VALUE_NAME, dto.getBase().getName());
  279. Assert.assertNull(dto.getBase().getAge());
  280. Assert.assertNull(dto.getOther().getFv());
  281. }
  282. }
  283. @Test
  284. public void getAndCloseConnection() {
  285. Connection connection=service.getConnection();
  286. System.out.println(connection);
  287. Assert.assertFalse(connection.isClosed());
  288. service.closeConnection(connection);
  289. Assert.assertTrue(connection.isClosed());
  290. }
  291. }

■ 控制类(StudentController.java)

  1. import com.lonton.bigdata.domain.Student;
  2. import com.lonton.bigdata.service.HBaseService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.*;
  6. /**
  7. * Description 控制类<br/>
  8. * put 'student','1001','base:name','zhangsan'
  9. * put 'student','1001','base:age',23
  10. * put 'student','1001','other:fv',3
  11. * @RestController等价于@Controller + @ResponseBody
  12. */
  13. @RestController
  14. public class StudentController {
  15. @Autowired
  16. private HBaseService service;
  17. /** http://localhost:8080/hbaseservice/student/1001 **/
  18. /**
  19. * {"base":{"name":"zhangsan","age":23},"other":{"fv":3}}
  20. **/
  21. @RequestMapping(value = "/hbaseservice/{tableName}/{rowKey}", method = RequestMethod.GET)
  22. public Student getStudent(@PathVariable String tableName, @PathVariable String rowKey) {
  23. return service.get(tableName, rowKey);
  24. }
  25. }

5. 完整示例代码

源码:hbase-springboot-example05-src.zip