HBase-API-ForJava-简介
资料 https://hbase.apache.org/apidocs/index.html http://hbase.apache.org/book.html#external_apis
1、使用Java创建,修改和删除一个表-示例
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
public class Example {
private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";
private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";
public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
if (admin.tableExists(table.getTableName())) {
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
}
admin.createTable(table);
}
public static void createSchemaTables(Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));
System.out.print("Creating table. ");
createOrOverwrite(admin, table);
System.out.println(" Done.");
}
}
public static void modifySchema (Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(TABLE_NAME);
if (!admin.tableExists(tableName)) {
System.out.println("Table does not exist.");
System.exit(-1);
}
HTableDescriptor table = admin.getTableDescriptor(tableName);
// Update existing table
HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");
newColumn.setCompactionCompressionType(Algorithm.GZ);
newColumn.setMaxVersions(HConstants.ALL_VERSIONS);
admin.addColumn(tableName, newColumn);
// Update existing column family
HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);
existingColumn.setCompactionCompressionType(Algorithm.GZ);
existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);
table.modifyFamily(existingColumn);
admin.modifyTable(tableName, table);
// Disable an existing table
admin.disableTable(tableName);
// Delete an existing column family
admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));
// Delete a table (Need to be disabled first)
admin.deleteTable(tableName);
}
}
public static void main(String... args) throws IOException {
Configuration config = HBaseConfiguration.create();
//Add any necessary configuration files (hbase-site.xml, core-site.xml)
config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));
config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));
createSchemaTables(config);
modifySchema(config);
}
}
2、调用API,对hbase表进行保存|更新|删除操作
1、新增保存操作
HTable hTable = new HTable(configInit(), Bytes.toBytes(tableName));
//HTablePool hTablePool = new HTablePool(configInit(), 300);
try {
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(saveValue));
//hTablePool.getTable(tableName).put(put);
hTable.put(put);
return true;
} catch (Exception e) {
log.error("save error:{}, columnQualifier:{}, saveValue:{} ", e.getMessage(), columnFamily, saveValue);
} finally {
//hTablePool.closeTablePool(tableName);
Table.closeScanner(hTable, null);
}
2、更新操作
HTable hTable = new HTable(configInit(), TABLE_NAME);
try {
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), Bytes.toBytes(updateValue));
hTable.put(put);
return true;
} catch (Exception e) {
log.error("delete " + tableName + " error. save=" + columnQualifier, e);
} finally {
Table.closeScanner(hTable, null);
}
3、删除操作
HTable hTable = new HTable(configInit(), tableName);
try {
hTable.delete(delete);
return true;
} catch (Exception e) {
log.error("delete " + tableName + " error. delete=" + delete, e);
} finally {
Table.closeScanner(hTable, null);
}
3、使用hbaseTemplate, 对hbase表进行新增保存|更新|删除|查询操作
1、新增保存操作
hbaseTemplate.execute(tableName, table -> {
boolean flag = false;
try{
byte[] rowkey = rowKey.getBytes();
Put put = new Put(rowkey);
put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));
table.put(put);
flag = true;
}catch(Exception e){
log.error("保存hbase表数据失败,异常:{}, 列名:{}, 列值:{}", e.getMessage(), tagCode, tagValue);
}
return flag;
});
2、更新操作:跟新增保存一样,会自动覆盖值
hbaseTemplate.execute(tableName, table -> {
boolean flag = false;
try{
byte[] rowkey = rowKey.getBytes();
Put put = new Put(rowkey);
put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(tagCode), Bytes.toBytes(tagValue));
table.put(put);
flag = true;
}catch(Exception e){
log.error("更新hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);
}
return flag;
});
3、删除操作
hbaseTemplate.execute(tableName, table -> {
boolean flag = false;
try{
byte[] rowkey = rowKey.getBytes();
//指定删除的rowKey
Delete delete = new Delete(rowkey);
//指定删除的列和列族
delete.deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(tagCode));
table.delete(delete);
flag = true;
}catch(Exception e){
log.error("删除hbase表数据失败,异常:{}, 列名:{}", e.getMessage(), tagCode);
}
return flag;
});
4、查询操作
4-1、点查查询
hbaseTemplate.execute(tableName, table -> {
Get get = new Get(Bytes.toBytes(rowName));
//根据列族和列(接口参数传入的列)获取指定列的数据
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnParam));
Result result = null;
try {
result = table.get(get);
} catch (IOException e) {
e.getStackTrace();
}
List<KeyValue> keyValueList = result.list();
if (!CollectionUtils.isEmpty(keyValueList)) {
for (KeyValue keyValue : keyValueList) {
String keyColumn = Bytes.toString(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
}
return dataMap;
} else {
return Collections.EMPTY_MAP;
}
});
4-2、scan查询
Scan scan = new Scan();
//行键开始范围
scan.setStartRow(Bytes.toBytes(ufeScanStart));
//行键结束范围
scan.setStopRow(Bytes.toBytes(ufeScanEnd));
//最大结果数(客户端缓存的最大字节数)
scan.setMaxResultSize(ufeScanLimit);
//是否反向扫描(默认是正向扫描(false)[按照字典顺序从小到大的顺序读取的])
scan.setReversed(ufeScanReversed);
//......
hbaseTemplate.find(tableName, scan, results -> {
List<Map<String, Object>> resultList = new ArrayList<>();
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
List<KeyValue> keyValueScanList = result.list();
//业务逻辑
}
}
return resultList;
});