概念介绍
利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。
另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。
装载和卸载Coprocessor
以下文章来源:https://www.zhyea.com/2017/04/13/using-hbase-coprocessor.html
要使用Coprocessor,就需要先完成对其的装载。这可以静态实现(通过HBase配置文件),也可以动态完成(通过shell或Java API)。
静态
按以下如下步骤可以静态装载自定义的Coprocessor。需要注意的是,如果一个Coprocessor是静态装载的,要卸载它就需要重启HBase。
静态装载步骤如下:
**
- 在hbase-site.xml中使用标签定义一个Coprocessor。的子元素的值只能从下面三个中选一个:
- hbase.coprocessor.region.classes 对应 RegionObservers和Endpoints;
- hbase.coprocessor.wal.classes 对应 WALObservers;
hbase.coprocessor.master.classes 对应MasterObservers。
而标签的内容则是自定义Coprocessor的全限定类名。
下面演示了如何装载一个自定义Coprocessor(这里是在SumEndPoint.java中实现的),需要在每个RegionServer的hbase-site.xml中创建如下的记录:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property>
如果要装载多个类,类名需要以逗号分隔。HBase会使用默认的类加载器加载配置中的这些类,因此需要将相应的jar文件上传到HBase服务端的类路径下。
使用这种方式加载的Coprocessor将会作用在HBase所有表的全部Region上,因此这样加载的Coprocessor又被称为系统Coprocessor。在Coprocessor列表中第一个Coprocessor的优先级值为Coprocessor.Priority.SYSTEM,其后的每个Coprocessor的值将会按序加一(这意味着优先级会减降低,因为优先级是按整数的自然顺序降序排列的)。
当调用配置的Observer Coprocessor时,HBase将会按照优先级顺序依次调用它们的回调方法。
2、将代码放到HBase的类路径下。一个简单的方法是将封装好的jar(包括代码和依赖)放到HBase安装路径下的/lib目录中。
3、重启HBase。
静态卸载的步骤如下:
1. 移除在hbase-site.xml中的配置。
2. 重启HBase。
3. 这一步是可选的,将上传到HBase类路径下的jar包移除。
动态
动态装载Coprocessor的一个优势就是不需要重启HBase。不过动态装载的Coprocessor只是针对某个表有效。因此,动态装载的Coprocessor又被称为表级Coprocessor。
此外,动态装载Coprocessor是对表的一次schema级别的调整,因此在动态装载Coprocessor时,目标表需要离线。
动态装载Coprocessor有两种方式:通过HBase Shell和通过Java API。
在下面介绍关于动态装载的部分,假设已经封装好了一个coprocessor.jar的包,里面包含实现代码及所有的依赖,并且已经将这个jar上传到了HDFS中。
HBase Shell
- 在HBase Shell中disable 掉目标表,
disable 'user'
- 使用类似如下的命令加载Coprocessor
alter 'bigdata:twx_telephone',METHOD=>'table_att','Coprocessor'=>'hdfs://hd03:8020/tangwx/libs|com.soyuan.hbase.telephone.observer.CalleeWriteObserver|1073741823|'
#移除
alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
简单解释下这个命令。这条命令在一个表的table_att中添加了一个新的属性“Coprocessor”。使用的时候Coprocessor会尝试从这个表的table_attr中读取这个属性的信息。这个属性的值用管道符“|”分成了四部分:
- 文件路径 :文件路径中需要包含Coprocessor的实现,并且对所有的RegionServer都是可达的。这个路径可以是每个RegionServer的本地磁盘路径,也可以是HDFS上的一个路径。通常建议是将Coprocessor实现存储到HDFS。HBASE-14548允许使用一个路径中包含的所有的jar,或者是在路径中使用通配符来指定某些jar,比如:hdfs://
: /user/ / 或者 hdfs:// : /user/ /*.jar。需要注意的是如果是用路径来指定要加载的Coprocessor,这个路径下的所有jar文件都会被加载,不过该路径下的子目录中的jar不会被加载。另外,如果要用路径指定Coprocessor时,就不要再使用通配符了。这些特性在Java API中也得到了支持。 - 类名 :Coprocessor的全限定类名。
- 优先级 :一个整数。HBase将会使用优先级来决定在同一个位置配置的所有Observer Coprocessor的执行顺序。这个位置可以留白,这样HBase将会分配一个默认的优先级。
- 参数 :这些值会被传递给要使用的Coprocessor实现。这个项是可选的。
- enable这个表:
enable 'user'
- 检验Coprocessor是否被加载:
describe 'user'
JAVA API
针对不同版本的HBase会有不同的JavaAPI。幸运的是有一个全版本的Java API。下面的代码演示了是如何使用Java API来装载Coprocessor的:
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
卸载方式如下:
卸载方式就是重新加载表定义信息。重新加载的时候就不需要再使用setValue()方法或者是addCoprocessor()方法设置表的Coprocessor信息了
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);