HBase-API-ForJava-简介
资料 https://hbase.apache.org/apidocs/index.html http://hbase.apache.org/book.html#external_apis
1、使用Java创建,修改和删除一个表-示例
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HConstants;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;public class Example {private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {if (admin.tableExists(table.getTableName())) {admin.disableTable(table.getTableName());admin.deleteTable(table.getTableName());}admin.createTable(table);}public static void createSchemaTables(Configuration config) throws IOException {try (Connection connection = ConnectionFactory.createConnection(config);Admin admin = connection.getAdmin()) {HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));System.out.print("Creating table. ");createOrOverwrite(admin, table);System.out.println(" Done.");}}public static void modifySchema (Configuration config) throws IOException {try (Connection connection = ConnectionFactory.createConnection(config);Admin admin = connection.getAdmin()) {TableName tableName = TableName.valueOf(TABLE_NAME);if (!admin.tableExists(tableName)) {System.out.println("Table does not exist.");System.exit(-1);}HTableDescriptor table = admin.getTableDescriptor(tableName);// Update existing tableHColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");newColumn.setCompactionCompressionType(Algorithm.GZ);newColumn.setMaxVersions(HConstants.ALL_VERSIONS);admin.addColumn(tableName, newColumn);// Update existing column familyHColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);existingColumn.setCompactionCompressionType(Algorithm.GZ);existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);table.modifyFamily(existingColumn);admin.modifyTable(tableName, table);// Disable an existing tableadmin.disableTable(tableName);// Delete an existing column familyadmin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));// Delete a table (Need to be disabled first)admin.deleteTable(tableName);}}public static void main(String... args) throws IOException {Configuration config = HBaseConfiguration.create();//Add any necessary configuration files (hbase-site.xml, core-site.xml)config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));createSchemaTables(config);modifySchema(config);}}
2、调用API,对hbase表进行保存|更新|删除操作
1、新增保存操作HTable hTable = new HTable(configInit(), Bytes.toBytes(tableName));//HTablePool hTablePool = new HTablePool(configInit(), 300);try {Put put = new Put(Bytes.toBytes(rowKey));put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(saveValue));//hTablePool.getTable(tableName).put(put);hTable.put(put);return true;} catch (Exception e) {log.error("save error:{}, columnQualifier:{}, saveValue:{} ", e.getMessage(), columnFamily, saveValue);} finally {//hTablePool.closeTablePool(tableName);Table.closeScanner(hTable, null);}2、更新操作HTable hTable = new HTable(configInit(), TABLE_NAME);try {Put put = new Put(Bytes.toBytes(rowKey));put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(updateValue));hTable.put(put);return true;} catch (Exception e) {log.error("delete " + tableName + " error. save=" + columnQualifier, e);} finally {Table.closeScanner(hTable, null);}3、删除操作HTable hTable = new HTable(configInit(), tableName);try {hTable.delete(delete);return true;} catch (Exception e) {log.error("delete " + tableName + " error. delete=" + delete, e);} finally {Table.closeScanner(hTable, null);}
3、使用hbaseTemplate, 对hbase表进行新增保存|更新|删除|查询操作
1、新增保存操作hbaseTemplate.execute(tableName, table -> {boolean flag = false;try{byte[] rowkey = rowKey.getBytes();Put put = new Put(rowkey);put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));table.put(put);flag = true;}catch(Exception e){log.error("保存hbase表数据失败,异常:{}, 列名:{}, 列值:{}", e.getMessage(), tagCode, tagValue);}return flag;});2、更新操作:跟新增保存一样,会自动覆盖值hbaseTemplate.execute(tableName, table -> {boolean flag = false;try{byte[] rowkey = rowKey.getBytes();Put put = new Put(rowkey);put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));table.put(put);flag = true;}catch(Exception e){log.error("更新hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);}return flag;});3、删除操作hbaseTemplate.execute(tableName, table -> {boolean flag = false;try{byte[] rowkey = rowKey.getBytes();//指定删除的rowKeyDelete delete = new Delete(rowkey);//指定删除的列和列族delete.deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(tagCode));table.delete(delete);flag = true;}catch(Exception e){log.error("删除hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);}return flag;});4、查询操作4-1、点查查询hbaseTemplate.execute(tableName, table -> {Get get = new Get(Bytes.toBytes(rowName));//根据列族和列(接口参数传入的列)获取指定列的数据get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnParam));Result result = null;try {result = table.get(get);} catch (IOException e) {e.getStackTrace();}List<KeyValue> keyValueList = result.list();if (!CollectionUtils.isEmpty(keyValueList)) {for (KeyValue keyValue : keyValueList) {String keyColumn = Bytes.toString(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());}return dataMap;} else {return Collections.EMPTY_MAP;}});4-2、scan查询Scan scan = new Scan();//行键开始范围scan.setStartRow(Bytes.toBytes(ufeScanStart));//行键结束范围scan.setStopRow(Bytes.toBytes(ufeScanEnd));//最大结果数(客户端缓存的最大字节数)scan.setMaxResultSize(ufeScanLimit);//是否反向扫描(默认是正向扫描(false)[按照字典顺序从小到大的顺序读取的])scan.setReversed(ufeScanReversed);//......hbaseTemplate.find(tableName, scan, results -> {List<Map<String, Object>> resultList = new ArrayList<>();Iterator<Result> iterator = results.iterator();while (iterator.hasNext()) {Result result = iterator.next();List<KeyValue> keyValueScanList = result.list();//业务逻辑}}return resultList;});
