案例需求
编写协处理器,实现在往A表插入数据的同时让HBase自身(协处理器)向B表中插入一条数据。
实现步骤
1)创建一个maven项目,并引入以下依赖。
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2)定义FruitTableCoprocessor类并继承BaseRegionObserver类
package com.atguigu;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
public class FruitTableCoprocessor implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//获取连接
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
//获取表对象
Table table = connection.getTable(TableName.valueOf("fruit"));
//插入数据
table.put(put);
//关闭资源
table.close();
connection.close();
}
}
3)打包放入HBase的lib目录下
4)分发jar包并重启HBase
5)建表时指定注册协处理器
创建表描述构造器下添加如下语句
tableDescriptorBuilder.setCoprocessor(“com.atguigu.FruitTableCoprocessor”);、
public class HBase_DDL {
//TODO 创建表
public static void createTable(String tableName, String... cfs) throws IOException {
//1.判断是否存在列族信息
if (cfs.length <= 0) {
System.out.println("请设置列族信息!");
return;
}
//2.判断表是否存在
if (isTableExist(tableName)) {
System.out.println("需要创建的表已存在!");
return;
}
//3.创建配置信息并配置
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
//4.获取与HBase的连接
Connection connection = ConnectionFactory.createConnection(configuration);
//5.获取DDL操作对象
Admin admin = connection.getAdmin();
//6.创建表描述器构造器
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
!!!!tableDescriptorBuilder.setCoprocessor("com.atguigu.FruitTableCoprocessor");
//7.循环添加列族信息
for (String cf : cfs) {
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
//8.执行创建表的操作
admin.createTable(tableDescriptorBuilder.build());
//9.关闭资源
admin.close();
connection.close();
}
}
注意: 还可以通过不重启的方式动态加载协处理器
1) 给hbase-site.xml中添加配置,防止协处理器异常导致集群停机
/
<property>
<name>hbase.coprocessor.abortonerror</name>
<value>false</value>
</property>
2) 打成jar包上传hdfs,比如存放路径为 hdfs://hadoop102:9820/c1.jar
3) 禁用表: disable ‘A’
4) 加载协处理器:
alter ‘表名’, METHOD => ‘table_att’, ‘Coprocessor’=>’jar所处HDFS路径| 协处理器所在的全类名|优先级|参数’
5) 启动表: enable ‘A’
6) 向A表插入数据,观察B表是否同步有数据插入