1. 开启一个异步任务执行服务监听

  1. @Component
  2. @DependsOn("canalServerListener")
  3. @Slf4j
  4. public class AsyncCanalServerListenerTaskService implements DisposableBean, InitializingBean {
  5. /**
  6. * 线程数
  7. */
  8. private final static int THREAD_NUM = 6;
  9. /**
  10. * 任务线程池
  11. */
  12. private final static ExecutorService taskPool = Executors.newFixedThreadPool(THREAD_NUM);
  13. @Autowired
  14. private CanalServerListener canalServerListener;
  15. @Override
  16. public void afterPropertiesSet() throws Exception {
  17. taskPool.submit(canalServerListener);
  18. }
  19. @Override
  20. public void destroy() {
  21. if (taskPool != null) {
  22. taskPool.shutdown();
  23. }
  24. log.info("task pool shutdown . ");
  25. }
  26. }
  • @DependsOn 是让 canalServerListener 监听服务先进行初始化和加载
  • 实现了 **InitializingBean** 接口,我们需要重写 afterPropertiesSet 方法,同样,实现了 DisposableBean 接口,我们需要重写 destroy 方法(用来定义初始化和销毁逻辑)

    2. Canal 服务监听

    1. @Slf4j
    2. @Component
    3. public class CanalServerListener implements Runnable {
    4. /**
    5. * 空闲休眠时长
    6. */
    7. private final static int NO_DATA_TIME_OUT = 500;
    8. @Autowired
    9. private CanalClientConfig clientConfig;
    10. @Autowired
    11. private IReceiveBinlogMessage receiveBinlogMessage;
    12. /**
    13. * 启动服务监听,拉去binlog数据
    14. *
    15. * @param: []
    16. * @author: zhangxin
    17. * @date: 2019/2/14 21:04
    18. */
    19. public void start() {
    20. String hostName = InetAddressUtil.getHostName();
    21. String hostAddress = InetAddressUtil.getHostAddress();
    22. Long batchId = null;
    23. try {
    24. CanalConnector connector = CanalConnectorManager.getInstance().createConnector(clientConfig);
    25. log.info("数据同步工作节点已启动, hostName : {} , ip : {} .", hostName, hostAddress);
    26. for (; ; ) {
    27. try {
    28. Message message = connector.getWithoutAck(clientConfig.getBatchSize());
    29. batchId = message.getId();
    30. int size = message.getEntries().size();
    31. if (batchId == -1 || size == 0) {
    32. log.info("not pull data , sleep " + NO_DATA_TIME_OUT + " ms .");
    33. TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT);
    34. continue;
    35. }
    36. boolean flag = receiveBinlogMessage.receive(message);
    37. if (flag) {
    38. connector.ack(batchId);
    39. continue;
    40. }
    41. connector.rollback(batchId);
    42. } catch (Exception e) {
    43. if (connector.checkValid()) {
    44. if (batchId != null) {
    45. connector.rollback(batchId);
    46. }
    47. } else {
    48. log.info("数据同步工作节点已停止工作, hostName : {} , ip : {} .", hostName, hostAddress);
    49. }
    50. }
    51. }
    52. } catch (Exception e) {
    53. log.error("启动监听失败.", e);
    54. } finally {
    55. CanalConnectorManager.getInstance().destroy();
    56. }
    57. }
    58. @Override
    59. public void run() {
    60. start();
    61. }
    62. }
  • Message getWithoutAck(int batchSize) 不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待

  • 如果轮询没有获取数据,为了降低 CPU,进行短暂休眠:TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT);
  • boolean flag = receiveBinlogMessage.receive(message) 返回处理结果,为 true 执行 connector.ack(batchId); 进行确认,否则 connector.rollback(batchId); 进行回滚。

    3. Canal 客户端管理

    ```java @Component public class CanalConnectorManager implements InitializingBean {

    /**

    • 持有实例对象 */ private static CanalConnectorManager install = null;

      /**

    • canal客户端连接器 */ private volatile CanalConnector canalConnector;

      /**

    • 客户端连接配置 */ private CanalClientConfig clientConfig;

      /**

    • 创建客户端连接器 *
    • @param clientConfig
    • @return */ public synchronized CanalConnector createConnector(CanalClientConfig clientConfig) { if (canalConnector != null) {

      1. return canalConnector;

      } this.clientConfig = clientConfig; CanalConnector canalConnector = CanalConnectors.newClusterConnector(

      1. clientConfig.getClusterServers(),
      2. clientConfig.getDestination(),
      3. clientConfig.getUsername(),
      4. clientConfig.getPassword()

      ); canalConnector.connect(); canalConnector.rollback(); canalConnector.subscribe(clientConfig.getSubscribe()); this.canalConnector = canalConnector; return canalConnector; }

      /**

    • 获取CanalConnector *
    • @return */ public CanalConnector getCanalConnector() { return this.canalConnector; }

      /**

    • 订阅消息 *
    • @return */ public void subscribe(String filter) { if (this.canalConnector.checkValid()) {

      1. this.canalConnector.subscribe(filter);

      } }

      /**

    • 注销客户端链接 */ public void destroy() { if (this.canalConnector != null) {

      1. this.canalConnector.disconnect();
      2. this.canalConnector = null;

      } }

      /**

    • 获取自身实例 *
    • @return */ public static CanalConnectorManager getInstance() { return install; }

      /**

    • 初始属性 *
    • @throws Exception */ @Override public void afterPropertiesSet() throws Exception { install = this; }

      /**

    • 监听apollo属性变更 *
    • @param changeEvent */ @ApolloConfigChangeListener(value = “prop”, interestedKeys = {“canal.subscribe”}) private void propChangeEvent(ConfigChangeEvent changeEvent) { String filter = changeEvent.getChange(“canal.subscribe”).getNewValue(); this.subscribe(filter); }

}

  1. <a name="1T4DY"></a>
  2. #### 4. 进行数据处理
  3. ```java
  4. @Slf4j
  5. @Service
  6. @DependsOn("springContextHolder")
  7. public class ReceiveBinlogMessageService implements IReceiveBinlogMessage, InitializingBean {
  8. private static Logger dataCollect = LoggerFactory.getLogger("table-data");
  9. private Map<String, IConsumerService> instances = null;
  10. private int awaitTimeout = 30;
  11. private int coreThreadNum = 20;
  12. private ExecutorService pool = Executors.newFixedThreadPool(coreThreadNum);
  13. @Autowired
  14. private PropResource propResource;
  15. @Override
  16. public boolean receive(Message message) throws Exception {
  17. try {
  18. List<CanalEntry.Entry> entries = message.getEntries();
  19. List<TableData> tableDatas = parseEntryFetchTableData(entries);
  20. if (ListUtils.isEmpty(tableDatas)) {
  21. return true;
  22. }
  23. final Collection<IConsumerService> consumerServices = instances.values();
  24. //线程执行进度
  25. final CountDownLatch multiTaskProgress = new CountDownLatch(consumerServices.size());
  26. //执行失败数量
  27. final AtomicInteger failTaskCount = new AtomicInteger();
  28. for (IConsumerService consumerService : consumerServices) {
  29. AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,
  30. multiTaskProgress, failTaskCount);
  31. pool.execute(sendMessageTask);
  32. }
  33. boolean progressFlag = multiTaskProgress.await(awaitTimeout, TimeUnit.SECONDS);
  34. boolean taskExecFlag = failTaskCount.get() == 0;
  35. log.info("任务执行完成状态 : {} , 任务正常结束状态 : {} .", progressFlag, taskExecFlag);
  36. return progressFlag && taskExecFlag;
  37. } catch (Exception e) {
  38. log.error("解析binlog数据异常.", e);
  39. throw new Exception(e);
  40. }
  41. }
  42. @Override
  43. public void afterPropertiesSet() throws Exception {
  44. this.instances = SpringContextHolder.getApplicationContext().getBeansOfType(IConsumerService.class);
  45. }
  • 实现 InitializingBean 接口在 afterPropertiesSet 方法中利用 applicationContext.getBeansOfType 动态加载类放入 Map instances 集合中。然后循环换集合来用线程池执行不同的消费策略。

    1. for (IConsumerService consumerService : consumerServices) {
    2. AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,
    3. multiTaskProgress, failTaskCount);
    4. pool.execute(sendMessageTask);
    5. }
  • 用到了 CountDownLatch来控制线程执行

    5. 发送任务消息

    1. /**
    2. * 发送消息任务
    3. */
    4. @Slf4j
    5. public class AsyncSendMessageTask implements Runnable {
    6. private IConsumerService consumerService;
    7. private List<TableData> tableDatas;
    8. private CountDownLatch multiTaskProgress;
    9. private AtomicInteger failTaskCount;
    10. public AsyncSendMessageTask(IConsumerService consumerService, List<TableData> tableDatas,
    11. CountDownLatch multiTaskProgress, AtomicInteger failTaskCount) {
    12. this.consumerService = consumerService;
    13. this.tableDatas = tableDatas;
    14. this.multiTaskProgress = multiTaskProgress;
    15. this.failTaskCount = failTaskCount;
    16. }
    17. @Override
    18. public void run() {
    19. try {
    20. boolean flag = consumerService.sendMessageData(tableDatas);
    21. log.info("{}service任务执行状态:{}.", consumerService.getClass().getSimpleName(), flag);
    22. if (!flag) {
    23. failTaskCount.incrementAndGet();
    24. }
    25. } catch (Exception e) {
    26. log.error("任务执行时失败.", e);
    27. failTaskCount.incrementAndGet();
    28. } finally {
    29. multiTaskProgress.countDown();
    30. }
    31. }
    32. }
  • 此处 consumerService.sendMessageData 有不同的实现类,来完成不同的消费。