案例需求
编写协处理器,实现在往A表插入数据的同时让HBase自身(协处理器)向B表中插入一条数据。
实现步骤
1)创建一个maven项目,并引入以下依赖。
<dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.0.5</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.0.5</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
2)定义FruitTableCoprocessor类并继承BaseRegionObserver类
package com.atguigu;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import java.io.IOException;public class FruitTableCoprocessor implements RegionObserver, RegionCoprocessor {@Overridepublic Optional<RegionObserver> getRegionObserver() {return Optional.of(this);}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {//获取连接Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());//获取表对象Table table = connection.getTable(TableName.valueOf("fruit"));//插入数据table.put(put);//关闭资源table.close();connection.close();}}
3)打包放入HBase的lib目录下
4)分发jar包并重启HBase
5)建表时指定注册协处理器
创建表描述构造器下添加如下语句
tableDescriptorBuilder.setCoprocessor(“com.atguigu.FruitTableCoprocessor”);、
public class HBase_DDL {//TODO 创建表public static void createTable(String tableName, String... cfs) throws IOException {//1.判断是否存在列族信息if (cfs.length <= 0) {System.out.println("请设置列族信息!");return;}//2.判断表是否存在if (isTableExist(tableName)) {System.out.println("需要创建的表已存在!");return;}//3.创建配置信息并配置Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");//4.获取与HBase的连接Connection connection = ConnectionFactory.createConnection(configuration);//5.获取DDL操作对象Admin admin = connection.getAdmin();//6.创建表描述器构造器TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));!!!!tableDescriptorBuilder.setCoprocessor("com.atguigu.FruitTableCoprocessor");//7.循环添加列族信息for (String cf : cfs) {ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());}//8.执行创建表的操作admin.createTable(tableDescriptorBuilder.build());//9.关闭资源admin.close();connection.close();}}
注意: 还可以通过不重启的方式动态加载协处理器
1) 给hbase-site.xml中添加配置,防止协处理器异常导致集群停机
/
<property><name>hbase.coprocessor.abortonerror</name><value>false</value></property>
2) 打成jar包上传hdfs,比如存放路径为 hdfs://hadoop102:9820/c1.jar
3) 禁用表: disable ‘A’
4) 加载协处理器:
alter ‘表名’, METHOD => ‘table_att’, ‘Coprocessor’=>’jar所处HDFS路径| 协处理器所在的全类名|优先级|参数’
5) 启动表: enable ‘A’
6) 向A表插入数据,观察B表是否同步有数据插入
