一、计数器

由于实际应用中往往会存在统计点击流和在线广告意见,这些应用需要被收集到日志文件中用于后续的分析,此时我们就可以采用计数器做
实时统计,而不需要较高延迟的批量处理操作。

用户不用初始化计数器,当用户第一次使用计数器时,计数器将被自动设为0,也就是说当用户创建一个新列时,计数器的值是0。第一次增
加操作会返回1或增加设定的值。用户也可以直接读写一个计数器。下面我们先使用单个计数器的方式。

  1. long cnt1 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
  2. long cnt2 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
  3. long current = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 0);
  4. long cnt3 = table.incrementColumnValue(Bytes.toBytes("row5"), Bytes.toBytes("mycf"), Bytes.toBytes("hits"), -1);
  5. System.out.printf("the cnt2: %d, current: %d, cnt3: %d", cnt2, current, cnt3);

如果我们需要使用多计数器,这里需要注意的是只能设置同一行的多个列的数据,需要使用 Increment 对象的 addColumn 函数进行
操作,具体的使用方式如下。

  1. Increment increment = new Increment(Bytes.toBytes("row6"));
  2. increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 1);
  3. increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("clicks"), 1);
  4. increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"), 10);
  5. Result result = table.increment(increment);
  6. for (Cell cell : result.rawCells()) {
  7. String str = Bytes.toString(cell.getValueArray());
  8. System.out.println(str);
  9. }

在进行批量计数时虽然可以限制其他写程序修改此行的值,但是用户无法限制读操作。事实上,这里并没有保证读操作的原子性。因为读
操作不需要获取锁,所以它可能读到一行中被修改到一半的数据。

二、 协处理器

加载协处理器存在多种方式,下述我们将介绍如何加载我们编写的协处理器。其加载方式主要分为静态方式加载与动态方式加载。
首先我们介绍的为静态加载方式。
读者需要通过添加以下配置中的一条或几条到hbase-site.xml文件中来添加协处理器。

  1. <property>
  2. <name>hbase.coprocessor.region.classes</name>
  3. <value>coprocessor.endpoint.SumEndPoint</value>
  4. </property>
  5. <property>
  6. <name>hbase.coprocessor.wal.classes</name>
  7. <value>coprocessor.WALObserverExample</value>
  8. </property>
  9. <property>
  10. <name>hbase.coprocessor.master.classes</name>
  11. <value>coprocessor.MasterObserverExample</value>
  12. </property>

其中 name 对应了不同协处理器的配置项,所以读者需要将对应的协处理器放置在正确的 value 中,如果存在多个则以逗号进行分割。
具体的对应关系如下。

  • hbase.coprocessor.region.classes:RegionObservers与Endpoints协处理器;
  • hbase.coprocessor.wal.classes:WALObservers协处理器;
  • hbase.coprocessor.master.classes:MasterObservers协处理器;

完成以上配置后还需要将对应jar放置到HBase中的lib文件夹下,最后重启HBase服务即可。对于卸载只要删除 property 配置项以及对应
lib文件夹下的jar文件后重启服务即可实现卸载。

动态加载方式可以通过HBase Shell或Java API方式进行加载,首先我们通过HBase Shell实现加载。

disable 'users'
alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2'

enable 'users'
describe 'users'

关于 Coprocessor 部分的说明如下。

  • 文件路径:必须为可读取到协处理器实现jar文件的路径,可以为HBase服务器本地路径,当然这里建议为HDFS路径,且可以记载多个文件,例如:

hdfs://:/user// 或 hdfs://:/user//*.jar;

  • 类名:协处理器的完整类名;
  • 优先级:一个可选的整数,用于决定协处理器的执行顺序;
  • 参数:可选参数,此字段将传递给协处理器实例;

此类协处理器的作用范围仅限该表,不同于通过hbase-site.xml设置的协处理器。如果读者需要卸载该协处理器,我们仅需要重新设置表即可,比如下述
方式。

disable 'users'
alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable 'users'

介绍完上述方式,我们接着通过纯Java的方式设置协处理器,当然对应的范围也仅限设置的表。

TableName tableName = TableName.valueOf("mytable");
admin.disableTable(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of("mycf");
CoprocessorDescriptor coprocessorDescriptor = CoprocessorDescriptorBuilder.newBuilder("Coprocessor.RegionObserverExample")
    .setJarPath("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar")
    .setPriority(100)
    .build();
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
    .setColumnFamily(columnFamilyDescriptor)
    .setCoprocessor(coprocessorDescriptor)
    .build();
admin.modifyTable(tableDescriptor);
admin.enableTable(tableName);

对应关闭协处理器的方式就是重新设置表信息,仅只是不添加协处理器即可。

TableName tableName = TableName.valueOf("mytable");
admin.disableTable(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of("mycf");
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
    .setColumnFamily(columnFamilyDescriptor)
    .build();
admin.modifyTable(tableDescriptor);
admin.enableTable(tableName);

1. RegionObserver

在region级别中介绍的Coprocessor第一个抽象接口是 RegionObserver 。从名字可知当region界别的操作发生时,他们的钩子函数会被触发。其中可供我们重写进行
触发的函数如下。

  • preOpen/postOpen:Region被打开前与打开后调用;
  • preWALRestore/postWALRestore:从WAL重做前后重做后调用;
  • preFlush/postFlush:刷入前与刷入后;
  • preCompact/postCompact:执行合并时;
  • preClose/postClose:Region关闭前与关闭后调用;
  • preGetOp/postGetOp:客户端Table.get请求执行前和执行后调用;
  • prePut/postPut:客户端Table.put请求执行之前和执行后调用;
  • preDelete/postDelete:客户端Table.delete请求执行之前和执行后调用;
  • preCheckAndPut/postCheckAndPut:客户端Table.checkAndPut请求执行之前和执行后调用;
  • preCheckAndDelete/postCheckAndDelete:客户端Table.checkAndDelete请求执行之前和执行后调用;
  • preExists/postExists:客户端调用Table.exits之前和之后调用;
  • preIncrement/postIncrement:客户端调用Table.increment之前和之后调用;
  • preScannerOpen/postScannerOpen:客户端调用Table.getScanner之前和之后调用;
  • preScannerNext/postScannerOpen:客户端调用ResultScanner.next之前和之后调用;
  • preScannerClose/postScannerClose:客户端调用ResultScanner.close之前和之后调用;

上述仅仅列举了常用的函数,实际还有其他更高级的函数尚未列举出现,如果读者想了解可以阅读官方的API文档。下面我们通过一个实例代码来介绍
其中更深入的内容。

public class RegionObserverExample implements RegionObserver, RegionCoprocessor {
    private static final byte[] ADMIN = Bytes.toBytes("admin");
    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
    private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
    private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");

    @Override
    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    @Override
    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
            throws IOException {
        if (Bytes.equals(get.getRow(),ADMIN)) {
            Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
            System.currentTimeMillis(), (byte)4, VALUE);
            results.add(c);
            e.bypass();
        }
    }

    @Override
    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) throws IOException {
        Filter filter = new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(ADMIN));
        scan.setFilter(filter);
    }

    @Override
    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s,
            List<Result> results, int limit, boolean hasNext) throws IOException {
        Result result = null;
        Iterator<Result> iterator = results.iterator();
        while (iterator.hasNext()) {
        result = iterator.next();
            if (Bytes.equals(result.getRow(), ADMIN)) {
                iterator.remove();
                break;
            }
        }
        return hasNext;
    }
}

其中我们可以看到每个重写的方式都有统一的上下文对象,其中均为 ObserverContext 且根据类型参数决定其中 getEnvironment 函数的
具体返回对象。可以看到我们在 preGetOp 方法中使用了 bypass 函数,其函数在调用后,框架将使用用户提供的值,而不使用框架通常使
用的值。

2. MasterObserver

其主要处理master服务器的所有回调函数,这些操作与数据库中的DDL类似,它们可以归类到数据处理操作中,所以其中主要的钩子函数如下
列举所示。

  • preCreateTable/postCreateTable:在表创建前后被调用
  • preDeleteTable/postDeleteTable:在表删除前后被调用
  • preModifyTable/postModifyTable:在表修改前后被调用
  • preEnableTable/postEnableTable:在表启用前后被调用
  • preDisableTable/postDisableTable:在表近用前后被调用
  • preMove/postMove:在region被移动前后被调用
  • preAssign/postAssign:在region分配前后被调用
  • preUnassign/postUnassign:在region未分配前后被调用
  • preBalance/postBalance:在region负载均衡操作前后被调用
  • preBalanceSwitch/postBalanceSwitch:在修改自动负载均衡标志位前后被调用
  • preShutdown:在集群关闭工作开始前被调用。没有post钩子函数,因为集群关闭之后没有进程可以执行post函数
  • preStopMaster:在master进程停止工作开始前被调用。没有post钩子函数,因为master停止工作之后没有进程可

以执行post函数

介绍完上述这些常用的钩子函数后,我们编写一个协处理器。

public class MasterObserverExample implements MasterObserver, Coprocessor {
    @Override
    public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableDescriptor desc,
            RegionInfo[] regions) throws IOException {
        String tableName = regions[0].getTable().getNameAsString();

        System.out.println("create table:" + tableName);
    }
}

3. EndPoint

由于HBase本身的设计,故行键将决定在哪个region处理这个请求。对于需要多个region的进行聚合计算的操作无能为力。为此
HBase提供了EndPoint帮助我们能够编写在region上进行计算的服务,并通过ProtoBuf公开服务以便客户端进行调用。为了演示
这一功能特性,下面我们将带领读者一步一步进行开发。

首先我们需要通过Protobuf开发对应的服务接口以及数据模型

option java_package = "cn.orvillex.observerdemo";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
    required string family = 1;
    required string column = 2;
}

message SumResponse {
  required int64 sum = 1 [default = 0];
}

service SumService {
  rpc getSum(SumRequest)
    returns (SumResponse);
}

完成以上服务协议编写后我们需要利用Protobuf工具将该文件编译出具体的Java文件,具体操作方法读者可以自行百度进行
查询,生成服务后我们就需要实现该服务。

public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
    private RegionCoprocessorEnvironment env;

    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment)env;
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {

    }

    @Override
    public void getSum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(request.getFamily()));
        scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));

        Sum.SumResponse response = null;
        InternalScanner scanner = null;

        try {
            scanner = env.getRegion().getScanner(scan);
            List<Cell> results = new ArrayList<>();
            boolean hasMore = false;
            long sum = 0L;

            do {
                hasMore = scanner.next(results);
                for (Cell cell : results) {
                    sum = sum + Bytes.toLong(cell.getValueArray());
                }
                results.clear();
            } while (hasMore);

            response = Sum.SumResponse.newBuilder().setSum(sum).build();
        } catch (IOException ioe) {
            ResponseConverter.setControllerException(controller, ioe);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException ignored) {}
            }
        }
        done.run(response);
    }
}

通过上述代码我们可以看出,实际接口中我们仅实现了getSum方法,而其他方法则是对应所需实现接口的,从而能够满足
HBase对其进行调用,完成该服务的编写后需要生成为jar并放置在region服务器的lib中并对服务进行重启。接着我们在客户
端通过HBase提供的 coprocessorService 进行调用即可。

private static void endpointDemo() throws ServiceException, Throwable {
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("users");
    Table table = connection.getTable(tableName);

    final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();

    Map<byte[], Long> results = table.coprocessorService(
        Sum.SumService.class,
        null,  /* start key */
        null,  /* end   key */
        new Batch.Call<Sum.SumService, Long>() {
            @Override
            public Long call(Sum.SumService aggregate) throws IOException {
                BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
                aggregate.getSum(null, request, rpcCallback);
                Sum.SumResponse response = rpcCallback.get();
                return response.hasSum() ? response.getSum() : 0L;
            }
        }
    );

    for (Long sum : results.values()) {
        System.out.println("Sum = " + sum);
    }
}

上述我们可以看到因为其服务需要调用多个region,故结果将存在多个,所以需要通过Map进行接收,以便在客户端
进行其他计算。当然后续我们将要介绍的Phoenix就利用了HBase这一特性扩展的其自身的服务。