案例需求
    编写协处理器,实现在往A表插入数据的同时让HBase自身(协处理器)向B表中插入一条数据。
    实现步骤
    1)创建一个maven项目,并引入以下依赖。

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hbase</groupId>
    4. <artifactId>hbase-client</artifactId>
    5. <version>2.0.5</version>
    6. <scope>provided</scope>
    7. </dependency>
    8. <dependency>
    9. <groupId>org.apache.hbase</groupId>
    10. <artifactId>hbase-server</artifactId>
    11. <version>2.0.5</version>
    12. <scope>provided</scope>
    13. </dependency>
    14. </dependencies>
    15. <build>
    16. <plugins>
    17. <plugin>
    18. <artifactId>maven-compiler-plugin</artifactId>
    19. <version>2.3.2</version>
    20. <configuration>
    21. <source>1.8</source>
    22. <target>1.8</target>
    23. </configuration>
    24. </plugin>
    25. <plugin>
    26. <artifactId>maven-assembly-plugin</artifactId>
    27. <configuration>
    28. <descriptorRefs>
    29. <descriptorRef>jar-with-dependencies</descriptorRef>
    30. </descriptorRefs>
    31. </configuration>
    32. <executions>
    33. <execution>
    34. <id>make-assembly</id>
    35. <phase>package</phase>
    36. <goals>
    37. <goal>single</goal>
    38. </goals>
    39. </execution>
    40. </executions>
    41. </plugin>
    42. </plugins>
    43. </build>

    2)定义FruitTableCoprocessor类并继承BaseRegionObserver类

    1. package com.atguigu;
    2. import org.apache.hadoop.hbase.HBaseConfiguration;
    3. import org.apache.hadoop.hbase.TableName;
    4. import org.apache.hadoop.hbase.client.*;
    5. import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    6. import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    7. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    8. import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    9. import java.io.IOException;
    10. public class FruitTableCoprocessor implements RegionObserver, RegionCoprocessor {
    11. @Override
    12. public Optional<RegionObserver> getRegionObserver() {
    13. return Optional.of(this);
    14. }
    15. @Override
    16. public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    17. //获取连接
    18. Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
    19. //获取表对象
    20. Table table = connection.getTable(TableName.valueOf("fruit"));
    21. //插入数据
    22. table.put(put);
    23. //关闭资源
    24. table.close();
    25. connection.close();
    26. }
    27. }

    3)打包放入HBase的lib目录下
    4)分发jar包并重启HBase
    5)建表时指定注册协处理器
    创建表描述构造器下添加如下语句
    tableDescriptorBuilder.setCoprocessor(“com.atguigu.FruitTableCoprocessor”);、

    1. public class HBase_DDL {
    2. //TODO 创建表
    3. public static void createTable(String tableName, String... cfs) throws IOException {
    4. //1.判断是否存在列族信息
    5. if (cfs.length <= 0) {
    6. System.out.println("请设置列族信息!");
    7. return;
    8. }
    9. //2.判断表是否存在
    10. if (isTableExist(tableName)) {
    11. System.out.println("需要创建的表已存在!");
    12. return;
    13. }
    14. //3.创建配置信息并配置
    15. Configuration configuration = HBaseConfiguration.create();
    16. configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    17. //4.获取与HBase的连接
    18. Connection connection = ConnectionFactory.createConnection(configuration);
    19. //5.获取DDL操作对象
    20. Admin admin = connection.getAdmin();
    21. //6.创建表描述器构造器
    22. TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
    23. !!!!tableDescriptorBuilder.setCoprocessor("com.atguigu.FruitTableCoprocessor");
    24. //7.循环添加列族信息
    25. for (String cf : cfs) {
    26. ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
    27. tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
    28. }
    29. //8.执行创建表的操作
    30. admin.createTable(tableDescriptorBuilder.build());
    31. //9.关闭资源
    32. admin.close();
    33. connection.close();
    34. }
    35. }

    注意: 还可以通过不重启的方式动态加载协处理器
    1) 给hbase-site.xml中添加配置,防止协处理器异常导致集群停机

    /

    1. <property>
    2. <name>hbase.coprocessor.abortonerror</name>
    3. <value>false</value>
    4. </property>

    2) 打成jar包上传hdfs,比如存放路径为 hdfs://hadoop102:9820/c1.jar
    3) 禁用表: disable ‘A’
    4) 加载协处理器:
    alter ‘表名’, METHOD => ‘table_att’, ‘Coprocessor’=>’jar所处HDFS路径| 协处理器所在的全类名|优先级|参数’
    5) 启动表: enable ‘A’
    6) 向A表插入数据,观察B表是否同步有数据插入