1. 开启一个异步任务执行服务监听
@Component@DependsOn("canalServerListener")@Slf4jpublic class AsyncCanalServerListenerTaskService implements DisposableBean, InitializingBean {/*** 线程数*/private final static int THREAD_NUM = 6;/*** 任务线程池*/private final static ExecutorService taskPool = Executors.newFixedThreadPool(THREAD_NUM);@Autowiredprivate CanalServerListener canalServerListener;@Overridepublic void afterPropertiesSet() throws Exception {taskPool.submit(canalServerListener);}@Overridepublic void destroy() {if (taskPool != null) {taskPool.shutdown();}log.info("task pool shutdown . ");}}
@DependsOn是让canalServerListener监听服务先进行初始化和加载实现了
**InitializingBean**接口,我们需要重写afterPropertiesSet方法,同样,实现了 DisposableBean 接口,我们需要重写destroy方法(用来定义初始化和销毁逻辑)2. Canal 服务监听
@Slf4j@Componentpublic class CanalServerListener implements Runnable {/*** 空闲休眠时长*/private final static int NO_DATA_TIME_OUT = 500;@Autowiredprivate CanalClientConfig clientConfig;@Autowiredprivate IReceiveBinlogMessage receiveBinlogMessage;/*** 启动服务监听,拉去binlog数据** @param: []* @author: zhangxin* @date: 2019/2/14 21:04*/public void start() {String hostName = InetAddressUtil.getHostName();String hostAddress = InetAddressUtil.getHostAddress();Long batchId = null;try {CanalConnector connector = CanalConnectorManager.getInstance().createConnector(clientConfig);log.info("数据同步工作节点已启动, hostName : {} , ip : {} .", hostName, hostAddress);for (; ; ) {try {Message message = connector.getWithoutAck(clientConfig.getBatchSize());batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {log.info("not pull data , sleep " + NO_DATA_TIME_OUT + " ms .");TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT);continue;}boolean flag = receiveBinlogMessage.receive(message);if (flag) {connector.ack(batchId);continue;}connector.rollback(batchId);} catch (Exception e) {if (connector.checkValid()) {if (batchId != null) {connector.rollback(batchId);}} else {log.info("数据同步工作节点已停止工作, hostName : {} , ip : {} .", hostName, hostAddress);}}}} catch (Exception e) {log.error("启动监听失败.", e);} finally {CanalConnectorManager.getInstance().destroy();}}@Overridepublic void run() {start();}}
Message getWithoutAck(int batchSize)不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待- 如果轮询没有获取数据,为了降低 CPU,进行短暂休眠:
TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT); boolean flag = receiveBinlogMessage.receive(message)返回处理结果,为true执行connector.ack(batchId);进行确认,否则connector.rollback(batchId);进行回滚。3. Canal 客户端管理
```java @Component public class CanalConnectorManager implements InitializingBean {
/**
持有实例对象 */ private static CanalConnectorManager install = null;
/**
canal客户端连接器 */ private volatile CanalConnector canalConnector;
/**
客户端连接配置 */ private CanalClientConfig clientConfig;
/**
- 创建客户端连接器 *
- @param clientConfig
@return */ public synchronized CanalConnector createConnector(CanalClientConfig clientConfig) { if (canalConnector != null) {
return canalConnector;
} this.clientConfig = clientConfig; CanalConnector canalConnector = CanalConnectors.newClusterConnector(
clientConfig.getClusterServers(),clientConfig.getDestination(),clientConfig.getUsername(),clientConfig.getPassword()
); canalConnector.connect(); canalConnector.rollback(); canalConnector.subscribe(clientConfig.getSubscribe()); this.canalConnector = canalConnector; return canalConnector; }
/**
- 获取CanalConnector *
@return */ public CanalConnector getCanalConnector() { return this.canalConnector; }
/**
- 订阅消息 *
@return */ public void subscribe(String filter) { if (this.canalConnector.checkValid()) {
this.canalConnector.subscribe(filter);
} }
/**
注销客户端链接 */ public void destroy() { if (this.canalConnector != null) {
this.canalConnector.disconnect();this.canalConnector = null;
} }
/**
- 获取自身实例 *
@return */ public static CanalConnectorManager getInstance() { return install; }
/**
- 初始属性 *
@throws Exception */ @Override public void afterPropertiesSet() throws Exception { install = this; }
/**
- 监听apollo属性变更 *
- @param changeEvent */ @ApolloConfigChangeListener(value = “prop”, interestedKeys = {“canal.subscribe”}) private void propChangeEvent(ConfigChangeEvent changeEvent) { String filter = changeEvent.getChange(“canal.subscribe”).getNewValue(); this.subscribe(filter); }
}
<a name="1T4DY"></a>#### 4. 进行数据处理```java@Slf4j@Service@DependsOn("springContextHolder")public class ReceiveBinlogMessageService implements IReceiveBinlogMessage, InitializingBean {private static Logger dataCollect = LoggerFactory.getLogger("table-data");private Map<String, IConsumerService> instances = null;private int awaitTimeout = 30;private int coreThreadNum = 20;private ExecutorService pool = Executors.newFixedThreadPool(coreThreadNum);@Autowiredprivate PropResource propResource;@Overridepublic boolean receive(Message message) throws Exception {try {List<CanalEntry.Entry> entries = message.getEntries();List<TableData> tableDatas = parseEntryFetchTableData(entries);if (ListUtils.isEmpty(tableDatas)) {return true;}final Collection<IConsumerService> consumerServices = instances.values();//线程执行进度final CountDownLatch multiTaskProgress = new CountDownLatch(consumerServices.size());//执行失败数量final AtomicInteger failTaskCount = new AtomicInteger();for (IConsumerService consumerService : consumerServices) {AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,multiTaskProgress, failTaskCount);pool.execute(sendMessageTask);}boolean progressFlag = multiTaskProgress.await(awaitTimeout, TimeUnit.SECONDS);boolean taskExecFlag = failTaskCount.get() == 0;log.info("任务执行完成状态 : {} , 任务正常结束状态 : {} .", progressFlag, taskExecFlag);return progressFlag && taskExecFlag;} catch (Exception e) {log.error("解析binlog数据异常.", e);throw new Exception(e);}}@Overridepublic void afterPropertiesSet() throws Exception {this.instances = SpringContextHolder.getApplicationContext().getBeansOfType(IConsumerService.class);}
实现
InitializingBean接口在afterPropertiesSet方法中利用applicationContext.getBeansOfType动态加载类放入Map instances集合中。然后循环换集合来用线程池执行不同的消费策略。for (IConsumerService consumerService : consumerServices) {AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,multiTaskProgress, failTaskCount);pool.execute(sendMessageTask);}
-
5. 发送任务消息
/*** 发送消息任务*/@Slf4jpublic class AsyncSendMessageTask implements Runnable {private IConsumerService consumerService;private List<TableData> tableDatas;private CountDownLatch multiTaskProgress;private AtomicInteger failTaskCount;public AsyncSendMessageTask(IConsumerService consumerService, List<TableData> tableDatas,CountDownLatch multiTaskProgress, AtomicInteger failTaskCount) {this.consumerService = consumerService;this.tableDatas = tableDatas;this.multiTaskProgress = multiTaskProgress;this.failTaskCount = failTaskCount;}@Overridepublic void run() {try {boolean flag = consumerService.sendMessageData(tableDatas);log.info("{}service任务执行状态:{}.", consumerService.getClass().getSimpleName(), flag);if (!flag) {failTaskCount.incrementAndGet();}} catch (Exception e) {log.error("任务执行时失败.", e);failTaskCount.incrementAndGet();} finally {multiTaskProgress.countDown();}}}
此处
consumerService.sendMessageData有不同的实现类,来完成不同的消费。
