一、计数器
由于实际应用中往往会存在统计点击流和在线广告意见,这些应用需要被收集到日志文件中用于后续的分析,此时我们就可以采用计数器做
实时统计,而不需要较高延迟的批量处理操作。
用户不用初始化计数器,当用户第一次使用计数器时,计数器将被自动设为0,也就是说当用户创建一个新列时,计数器的值是0。第一次增
加操作会返回1或增加设定的值。用户也可以直接读写一个计数器。下面我们先使用单个计数器的方式。
long cnt1 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
long cnt2 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
long current = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 0);
long cnt3 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), -1);
System.out.printf("the cnt2: %d, current: %d, cnt3: %d", cnt2, current, cnt3);
如果我们需要使用多计数器,这里需要注意的是只能设置同一行的多个列的数据,需要使用 Increment
对象的 addColumn
函数进行
操作,具体的使用方式如下。
Increment increment = new Increment(Bytes.toBytes("row6"));
increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("clicks"), 1);
increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 10);
Result result = table.increment(increment);
for (Cell cell : result.rawCells()) {
String str = Bytes.toString(cell.getValueArray());
System.out.println(str);
}
在进行批量计数时虽然可以限制其他写程序修改此行的值,但是用户无法限制读操作。事实上,这里并没有保证读操作的原子性。因为读
操作不需要获取锁,所以它可能读到一行中被修改到一半的数据。
二、 协处理器
加载协处理器存在多种方式,下述我们将介绍如何加载我们编写的协处理器。其加载方式主要分为静态方式加载与动态方式加载。
首先我们介绍的为静态加载方式。
读者需要通过添加以下配置中的一条或几条到hbase-site.xml文件中来添加协处理器。
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.endpoint.SumEndPoint</value>
</property>
<property>
<name>hbase.coprocessor.wal.classes</name>
<value>coprocessor.WALObserverExample</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property>
其中 name
对应了不同协处理器的配置项,所以读者需要将对应的协处理器放置在正确的 value
中,如果存在多个则以逗号进行分割。
具体的对应关系如下。
- hbase.coprocessor.region.classes:RegionObservers与Endpoints协处理器;
- hbase.coprocessor.wal.classes:WALObservers协处理器;
- hbase.coprocessor.master.classes:MasterObservers协处理器;
完成以上配置后还需要将对应jar放置到HBase中的lib文件夹下,最后重启HBase服务即可。对于卸载只要删除 property
配置项以及对应
lib文件夹下的jar文件后重启服务即可实现卸载。
动态加载方式可以通过HBase Shell或Java API方式进行加载,首先我们通过HBase Shell实现加载。
disable 'users'
alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2'
enable 'users'
describe 'users'
关于 Coprocessor
部分的说明如下。
- 文件路径:必须为可读取到协处理器实现jar文件的路径,可以为HBase服务器本地路径,当然这里建议为HDFS路径,且可以记载多个文件,例如:
hdfs://:/user// 或 hdfs://:/user//*.jar;
- 类名:协处理器的完整类名;
- 优先级:一个可选的整数,用于决定协处理器的执行顺序;
- 参数:可选参数,此字段将传递给协处理器实例;
此类协处理器的作用范围仅限该表,不同于通过hbase-site.xml设置的协处理器。如果读者需要卸载该协处理器,我们仅需要重新设置表即可,比如下述
方式。
disable 'users'
alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable 'users'
介绍完上述方式,我们接着通过纯Java的方式设置协处理器,当然对应的范围也仅限设置的表。
TableName tableName = TableName.valueOf("mytable");
admin.disableTable(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of("mycf");
CoprocessorDescriptor coprocessorDescriptor = CoprocessorDescriptorBuilder.newBuilder("Coprocessor.RegionObserverExample")
.setJarPath("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar")
.setPriority(100)
.build();
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(columnFamilyDescriptor)
.setCoprocessor(coprocessorDescriptor)
.build();
admin.modifyTable(tableDescriptor);
admin.enableTable(tableName);
对应关闭协处理器的方式就是重新设置表信息,仅只是不添加协处理器即可。
TableName tableName = TableName.valueOf("mytable");
admin.disableTable(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of("mycf");
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(columnFamilyDescriptor)
.build();
admin.modifyTable(tableDescriptor);
admin.enableTable(tableName);
1. RegionObserver
在region级别中介绍的Coprocessor第一个抽象接口是 RegionObserver
。从名字可知当region界别的操作发生时,他们的钩子函数会被触发。其中可供我们重写进行
触发的函数如下。
- preOpen/postOpen:Region被打开前与打开后调用;
- preWALRestore/postWALRestore:从WAL重做前后重做后调用;
- preFlush/postFlush:刷入前与刷入后;
- preCompact/postCompact:执行合并时;
- preClose/postClose:Region关闭前与关闭后调用;
- preGetOp/postGetOp:客户端Table.get请求执行前和执行后调用;
- prePut/postPut:客户端Table.put请求执行之前和执行后调用;
- preDelete/postDelete:客户端Table.delete请求执行之前和执行后调用;
- preCheckAndPut/postCheckAndPut:客户端Table.checkAndPut请求执行之前和执行后调用;
- preCheckAndDelete/postCheckAndDelete:客户端Table.checkAndDelete请求执行之前和执行后调用;
- preExists/postExists:客户端调用Table.exits之前和之后调用;
- preIncrement/postIncrement:客户端调用Table.increment之前和之后调用;
- preScannerOpen/postScannerOpen:客户端调用Table.getScanner之前和之后调用;
- preScannerNext/postScannerOpen:客户端调用ResultScanner.next之前和之后调用;
- preScannerClose/postScannerClose:客户端调用ResultScanner.close之前和之后调用;
上述仅仅列举了常用的函数,实际还有其他更高级的函数尚未列举出现,如果读者想了解可以阅读官方的API文档。下面我们通过一个实例代码来介绍
其中更深入的内容。
public class RegionObserverExample implements RegionObserver, RegionCoprocessor {
private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
throws IOException {
if (Bytes.equals(get.getRow(),ADMIN)) {
Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
System.currentTimeMillis(), (byte)4, VALUE);
results.add(c);
e.bypass();
}
}
@Override
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) throws IOException {
Filter filter = new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s,
List<Result> results, int limit, boolean hasNext) throws IOException {
Result result = null;
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ADMIN)) {
iterator.remove();
break;
}
}
return hasNext;
}
}
其中我们可以看到每个重写的方式都有统一的上下文对象,其中均为 ObserverContext
且根据类型参数决定其中 getEnvironment
函数的
具体返回对象。可以看到我们在 preGetOp
方法中使用了 bypass
函数,其函数在调用后,框架将使用用户提供的值,而不使用框架通常使
用的值。
2. MasterObserver
其主要处理master服务器的所有回调函数,这些操作与数据库中的DDL类似,它们可以归类到数据处理操作中,所以其中主要的钩子函数如下
列举所示。
- preCreateTable/postCreateTable:在表创建前后被调用
- preDeleteTable/postDeleteTable:在表删除前后被调用
- preModifyTable/postModifyTable:在表修改前后被调用
- preEnableTable/postEnableTable:在表启用前后被调用
- preDisableTable/postDisableTable:在表近用前后被调用
- preMove/postMove:在region被移动前后被调用
- preAssign/postAssign:在region分配前后被调用
- preUnassign/postUnassign:在region未分配前后被调用
- preBalance/postBalance:在region负载均衡操作前后被调用
- preBalanceSwitch/postBalanceSwitch:在修改自动负载均衡标志位前后被调用
- preShutdown:在集群关闭工作开始前被调用。没有post钩子函数,因为集群关闭之后没有进程可以执行post函数
- preStopMaster:在master进程停止工作开始前被调用。没有post钩子函数,因为master停止工作之后没有进程可
以执行post函数
介绍完上述这些常用的钩子函数后,我们编写一个协处理器。
public class MasterObserverExample implements MasterObserver, Coprocessor {
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableDescriptor desc,
RegionInfo[] regions) throws IOException {
String tableName = regions[0].getTable().getNameAsString();
System.out.println("create table:" + tableName);
}
}
3. EndPoint
由于HBase本身的设计,故行键将决定在哪个region处理这个请求。对于需要多个region的进行聚合计算的操作无能为力。为此
HBase提供了EndPoint帮助我们能够编写在region上进行计算的服务,并通过ProtoBuf公开服务以便客户端进行调用。为了演示
这一功能特性,下面我们将带领读者一步一步进行开发。
首先我们需要通过Protobuf开发对应的服务接口以及数据模型
option java_package = "cn.orvillex.observerdemo";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
required string family = 1;
required string column = 2;
}
message SumResponse {
required int64 sum = 1 [default = 0];
}
service SumService {
rpc getSum(SumRequest)
returns (SumResponse);
}
完成以上服务协议编写后我们需要利用Protobuf工具将该文件编译出具体的Java文件,具体操作方法读者可以自行百度进行
查询,生成服务后我们就需要实现该服务。
public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public void getSum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
Sum.SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore = false;
long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum = sum + Bytes.toLong(cell.getValueArray());
}
results.clear();
} while (hasMore);
response = Sum.SumResponse.newBuilder().setSum(sum).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
}
通过上述代码我们可以看出,实际接口中我们仅实现了getSum
方法,而其他方法则是对应所需实现接口的,从而能够满足
HBase对其进行调用,完成该服务的编写后需要生成为jar并放置在region服务器的lib中并对服务进行重启。接着我们在客户
端通过HBase提供的 coprocessorService
进行调用即可。
private static void endpointDemo() throws ServiceException, Throwable {
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
Map<byte[], Long> results = table.coprocessorService(
Sum.SumService.class,
null, /* start key */
null, /* end key */
new Batch.Call<Sum.SumService, Long>() {
@Override
public Long call(Sum.SumService aggregate) throws IOException {
BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
aggregate.getSum(null, request, rpcCallback);
Sum.SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
}
);
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
}
上述我们可以看到因为其服务需要调用多个region,故结果将存在多个,所以需要通过Map进行接收,以便在客户端
进行其他计算。当然后续我们将要介绍的Phoenix就利用了HBase这一特性扩展的其自身的服务。