banner.webp

1.代码编写

导入依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. <scope>test</scope>
  9. <exclusions>
  10. <exclusion>
  11. <groupId>org.junit.vintage</groupId>
  12. <artifactId>junit-vintage-engine</artifactId>
  13. </exclusion>
  14. </exclusions>
  15. </dependency>
  16. <!--HBase -->
  17. <dependency>
  18. <groupId>org.apache.hbase</groupId>
  19. <artifactId>hbase-client</artifactId>
  20. <version>2.3.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.projectlombok</groupId>
  24. <artifactId>lombok</artifactId>
  25. <version>1.18.4</version>
  26. </dependency>

配置文件

  1. ## HBase 配置
  2. hbase:
  3. config:
  4. hbase:
  5. zookeeper:
  6. quorum: hadoop0
  7. port: 2181
  8. znode: /hbase
  9. master: hadoop:16000
  10. client:
  11. keyvalue:
  12. maxsize: 1572864000

HBaseProperties.java

  1. @Data
  2. @ConfigurationProperties(prefix = "hbase")
  3. public class HBaseProperties {
  4. private Map<String, String> config;
  5. }

HBaseConfig.java

  1. @Configuration
  2. @EnableConfigurationProperties(HBaseProperties.class)
  3. public class HBaseConfig {
  4. private final HBaseProperties properties;
  5. public HBaseConfig(HBaseProperties properties) {
  6. this.properties = properties;
  7. }
  8. @Bean
  9. public org.apache.hadoop.conf.Configuration configuration() {
  10. org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
  11. Map<String, String> config = properties.getConfig();
  12. Set<String> keySet = config.keySet();
  13. for (String key : keySet) {
  14. configuration.set(key, config.get(key));
  15. }
  16. return configuration;
  17. }
  18. }

HBaseService.java

  1. @Slf4j
  2. @Component
  3. @DependsOn("HBaseConfig")
  4. public class HBaseService {
  5. @Resource
  6. private HBaseConfig config;
  7. private static Admin admin = null;
  8. public static Configuration conf=null;
  9. private static Connection connection = null;
  10. private final ThreadLocal<List<Put>> threadLocal = new ThreadLocal<>();
  11. private static final int CACHE_LIST_SIZE = 1000;
  12. @PostConstruct
  13. private void init() {
  14. if (connection != null) {
  15. return;
  16. }
  17. try {
  18. connection = ConnectionFactory.createConnection(config.configuration());
  19. admin = connection.getAdmin();
  20. } catch (IOException e) {
  21. log.error("HBase create connection failed: {}","异常");
  22. }
  23. }
  24. /**
  25. * create 'tableName','[Column Family 1]','[Column Family 2]'
  26. * @param tableName
  27. * @param columnFamilies 列族名
  28. * @throws IOException
  29. */
  30. public void createTable(String tableName, String... columnFamilies) throws IOException {
  31. TableName name = TableName.valueOf(tableName);
  32. boolean isExists = this.tableExists(tableName);
  33. if (isExists) {
  34. throw new TableExistsException(tableName + "is exists!");
  35. }
  36. TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
  37. List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
  38. for (String columnFamily : columnFamilies) {
  39. ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
  40. .newBuilder(columnFamily.getBytes()).build();
  41. columnFamilyList.add(columnFamilyDescriptor);
  42. }
  43. descriptorBuilder.setColumnFamilies(columnFamilyList);
  44. TableDescriptor tableDescriptor = descriptorBuilder.build();
  45. admin.createTable(tableDescriptor);
  46. }
  47. /**
  48. * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
  49. * @param tableName
  50. * @param rowKey
  51. * @param columnFamily
  52. * @param column
  53. * @param value
  54. * @throws IOException
  55. */
  56. public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
  57. throws IOException {
  58. this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
  59. }
  60. /**
  61. * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
  62. * @param tableName
  63. * @param rowKey
  64. * @param columnFamily
  65. * @param columns
  66. * @param values
  67. * @throws IOException
  68. */
  69. public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
  70. throws IOException {
  71. Table table = connection.getTable(TableName.valueOf(tableName));
  72. Put put = new Put(Bytes.toBytes(rowKey));
  73. for (int i = 0; i < columns.length; i++) {
  74. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
  75. table.put(put);
  76. }
  77. }
  78. /**
  79. * @param tableName
  80. * @param rowKey
  81. * @throws IOException
  82. */
  83. public void deleteRow(String tableName, String rowKey) throws IOException {
  84. Table table = connection.getTable(TableName.valueOf(tableName));
  85. Delete delete = new Delete(rowKey.getBytes());
  86. table.delete(delete);
  87. }
  88. /**
  89. * @param tableName
  90. * @param rowKey
  91. * @param columnFamily
  92. * @throws IOException
  93. */
  94. public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
  95. Table table = connection.getTable(TableName.valueOf(tableName));
  96. Delete delete = new Delete(rowKey.getBytes());
  97. delete.addFamily(Bytes.toBytes(columnFamily));
  98. table.delete(delete);
  99. }
  100. /**
  101. * delete 'tableName','rowKey','columnFamily:column'
  102. * @param tableName
  103. * @param rowKey
  104. * @param columnFamily
  105. * @param column
  106. * @throws IOException
  107. */
  108. public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
  109. Table table = connection.getTable(TableName.valueOf(tableName));
  110. Delete delete = new Delete(rowKey.getBytes());
  111. delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
  112. table.delete(delete);
  113. }
  114. /**
  115. * disable 'tableName' 之后 drop 'tableName'
  116. * @param tableName
  117. * @throws IOException
  118. */
  119. public void deleteTable(String tableName) throws IOException {
  120. boolean isExists = this.tableExists(tableName);
  121. if (!isExists) {
  122. return;
  123. }
  124. TableName name = TableName.valueOf(tableName);
  125. admin.disableTable(name);
  126. admin.deleteTable(name);
  127. }
  128. /**
  129. * get 'tableName','rowkey','family:column'
  130. * @param tableName
  131. * @param rowkey
  132. * @param family
  133. * @param column
  134. * @return
  135. */
  136. public String getValue(String tableName, String rowkey, String family, String column) {
  137. Table table = null;
  138. String value = "";
  139. if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
  140. .isBlank(column)) {
  141. return null;
  142. }
  143. try {
  144. table = connection.getTable(TableName.valueOf(tableName));
  145. Get g = new Get(rowkey.getBytes());
  146. g.addColumn(family.getBytes(), column.getBytes());
  147. Result result = table.get(g);
  148. List<Cell> ceList = result.listCells();
  149. if (ceList != null && ceList.size() > 0) {
  150. for (Cell cell : ceList) {
  151. value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  152. }
  153. }
  154. } catch (IOException e) {
  155. e.printStackTrace();
  156. } finally {
  157. try {
  158. table.close();
  159. connection.close();
  160. } catch (IOException e) {
  161. e.printStackTrace();
  162. }
  163. }
  164. return value;
  165. }
  166. /**
  167. * get 'tableName','rowKey'
  168. * @param tableName
  169. * @param rowKey
  170. * @return
  171. * @throws IOException
  172. */
  173. public String selectOneRow(String tableName, String rowKey) throws IOException {
  174. Table table = connection.getTable(TableName.valueOf(tableName));
  175. Get get = new Get(rowKey.getBytes());
  176. Result result = table.get(get);
  177. NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
  178. for (Cell cell : result.rawCells()) {
  179. String row = Bytes.toString(cell.getRowArray());
  180. String columnFamily = Bytes.toString(cell.getFamilyArray());
  181. String column = Bytes.toString(cell.getQualifierArray());
  182. String value = Bytes.toString(cell.getValueArray());
  183. // 可以通过反射封装成对象(列名和Java属性保持一致)
  184. System.out.println(row);
  185. System.out.println(columnFamily);
  186. System.out.println(column);
  187. System.out.println(value);
  188. }
  189. return null;
  190. }
  191. /**
  192. * scan 't1',{FILTER=>"PrefixFilter('2015')"}
  193. * @param tableName
  194. * @param rowKeyFilter
  195. * @return
  196. * @throws IOException
  197. */
  198. public String scanTable(String tableName, String rowKeyFilter) throws IOException {
  199. Table table = connection.getTable(TableName.valueOf(tableName));
  200. Scan scan = new Scan();
  201. if (!StringUtils.isEmpty(rowKeyFilter)) {
  202. RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
  203. scan.setFilter(rowFilter);
  204. }
  205. ResultScanner scanner = table.getScanner(scan);
  206. try {
  207. for (Result result : scanner) {
  208. System.out.println(Bytes.toString(result.getRow()));
  209. for (Cell cell : result.rawCells()) {
  210. System.out.println(cell);
  211. }
  212. }
  213. } finally {
  214. if (scanner != null) {
  215. scanner.close();
  216. }
  217. }
  218. return null;
  219. }
  220. /**
  221. * 判断表是否已经存在,这里使用间接的方式来实现
  222. *
  223. * admin.tableExists() 会报NoSuchColumnFamilyException, 有人说是hbase-client版本问题
  224. * @param tableName
  225. * @return
  226. * @throws IOException
  227. */
  228. public boolean tableExists(String tableName) throws IOException {
  229. TableName[] tableNames = admin.listTableNames();
  230. if (tableNames != null && tableNames.length > 0) {
  231. for (int i = 0; i < tableNames.length; i++) {
  232. if (tableName.equals(tableNames[i].getNameAsString())) {
  233. return true;
  234. }
  235. }
  236. }
  237. return false;
  238. }
  239. }

2.单元测试

  1. @SpringBootTest
  2. @Log4j2
  3. class HBaseApplicationTests {
  4. @Resource
  5. HBaseService hBaseService;
  6. @Test
  7. void createTable() throws IOException {
  8. hBaseService.createTable("scores","name","grad","course");
  9. hBaseService.insertOrUpdate("scores","xiapi","grad","grad","1");
  10. hBaseService.insertOrUpdate("scores","xiapi","course","Chinese","97");
  11. hBaseService.insertOrUpdate("scores","xiapi","course","Math","128");
  12. hBaseService.insertOrUpdate("scores","xiapi","course","English","85");
  13. }
  14. @Test
  15. public void selectOneRow() throws IOException {
  16. hBaseService.selectOneRow("scores", "xiapi");
  17. }
  18. @Test
  19. public void getValue() {
  20. String result = hBaseService.getValue("scores", "xiapi", "grad", "grad");
  21. System.out.println(result);
  22. }
  23. @Test
  24. public void deleteTable() throws IOException {
  25. hBaseService.deleteTable("scores");
  26. }
  27. }

这里只简单测试几个方法,方法根据业务需求自己改造~

参考链接:https://blog.csdn.net/weixin_42685328/article/details/119040622