1. 开启一个异步任务执行服务监听
@Component
@DependsOn("canalServerListener")
@Slf4j
public class AsyncCanalServerListenerTaskService implements DisposableBean, InitializingBean {
/**
* 线程数
*/
private final static int THREAD_NUM = 6;
/**
* 任务线程池
*/
private final static ExecutorService taskPool = Executors.newFixedThreadPool(THREAD_NUM);
@Autowired
private CanalServerListener canalServerListener;
@Override
public void afterPropertiesSet() throws Exception {
taskPool.submit(canalServerListener);
}
@Override
public void destroy() {
if (taskPool != null) {
taskPool.shutdown();
}
log.info("task pool shutdown . ");
}
}
@DependsOn
是让canalServerListener
监听服务先进行初始化和加载实现了
**InitializingBean**
接口,我们需要重写afterPropertiesSet
方法,同样,实现了 DisposableBean 接口,我们需要重写destroy
方法(用来定义初始化和销毁逻辑)2. Canal 服务监听
@Slf4j
@Component
public class CanalServerListener implements Runnable {
/**
* 空闲休眠时长
*/
private final static int NO_DATA_TIME_OUT = 500;
@Autowired
private CanalClientConfig clientConfig;
@Autowired
private IReceiveBinlogMessage receiveBinlogMessage;
/**
* 启动服务监听,拉去binlog数据
*
* @param: []
* @author: zhangxin
* @date: 2019/2/14 21:04
*/
public void start() {
String hostName = InetAddressUtil.getHostName();
String hostAddress = InetAddressUtil.getHostAddress();
Long batchId = null;
try {
CanalConnector connector = CanalConnectorManager.getInstance().createConnector(clientConfig);
log.info("数据同步工作节点已启动, hostName : {} , ip : {} .", hostName, hostAddress);
for (; ; ) {
try {
Message message = connector.getWithoutAck(clientConfig.getBatchSize());
batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("not pull data , sleep " + NO_DATA_TIME_OUT + " ms .");
TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT);
continue;
}
boolean flag = receiveBinlogMessage.receive(message);
if (flag) {
connector.ack(batchId);
continue;
}
connector.rollback(batchId);
} catch (Exception e) {
if (connector.checkValid()) {
if (batchId != null) {
connector.rollback(batchId);
}
} else {
log.info("数据同步工作节点已停止工作, hostName : {} , ip : {} .", hostName, hostAddress);
}
}
}
} catch (Exception e) {
log.error("启动监听失败.", e);
} finally {
CanalConnectorManager.getInstance().destroy();
}
}
@Override
public void run() {
start();
}
}
Message getWithoutAck(int batchSize)
不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待- 如果轮询没有获取数据,为了降低 CPU,进行短暂休眠:
TimeUnit.MILLISECONDS.sleep(NO_DATA_TIME_OUT);
boolean flag = receiveBinlogMessage.receive(message)
返回处理结果,为true
执行connector.ack(batchId);
进行确认,否则connector.rollback(batchId);
进行回滚。3. Canal 客户端管理
```java @Component public class CanalConnectorManager implements InitializingBean {
/**
持有实例对象 */ private static CanalConnectorManager install = null;
/**
canal客户端连接器 */ private volatile CanalConnector canalConnector;
/**
客户端连接配置 */ private CanalClientConfig clientConfig;
/**
- 创建客户端连接器 *
- @param clientConfig
@return */ public synchronized CanalConnector createConnector(CanalClientConfig clientConfig) { if (canalConnector != null) {
return canalConnector;
} this.clientConfig = clientConfig; CanalConnector canalConnector = CanalConnectors.newClusterConnector(
clientConfig.getClusterServers(),
clientConfig.getDestination(),
clientConfig.getUsername(),
clientConfig.getPassword()
); canalConnector.connect(); canalConnector.rollback(); canalConnector.subscribe(clientConfig.getSubscribe()); this.canalConnector = canalConnector; return canalConnector; }
/**
- 获取CanalConnector *
@return */ public CanalConnector getCanalConnector() { return this.canalConnector; }
/**
- 订阅消息 *
@return */ public void subscribe(String filter) { if (this.canalConnector.checkValid()) {
this.canalConnector.subscribe(filter);
} }
/**
注销客户端链接 */ public void destroy() { if (this.canalConnector != null) {
this.canalConnector.disconnect();
this.canalConnector = null;
} }
/**
- 获取自身实例 *
@return */ public static CanalConnectorManager getInstance() { return install; }
/**
- 初始属性 *
@throws Exception */ @Override public void afterPropertiesSet() throws Exception { install = this; }
/**
- 监听apollo属性变更 *
- @param changeEvent */ @ApolloConfigChangeListener(value = “prop”, interestedKeys = {“canal.subscribe”}) private void propChangeEvent(ConfigChangeEvent changeEvent) { String filter = changeEvent.getChange(“canal.subscribe”).getNewValue(); this.subscribe(filter); }
}
<a name="1T4DY"></a>
#### 4. 进行数据处理
```java
@Slf4j
@Service
@DependsOn("springContextHolder")
public class ReceiveBinlogMessageService implements IReceiveBinlogMessage, InitializingBean {
private static Logger dataCollect = LoggerFactory.getLogger("table-data");
private Map<String, IConsumerService> instances = null;
private int awaitTimeout = 30;
private int coreThreadNum = 20;
private ExecutorService pool = Executors.newFixedThreadPool(coreThreadNum);
@Autowired
private PropResource propResource;
@Override
public boolean receive(Message message) throws Exception {
try {
List<CanalEntry.Entry> entries = message.getEntries();
List<TableData> tableDatas = parseEntryFetchTableData(entries);
if (ListUtils.isEmpty(tableDatas)) {
return true;
}
final Collection<IConsumerService> consumerServices = instances.values();
//线程执行进度
final CountDownLatch multiTaskProgress = new CountDownLatch(consumerServices.size());
//执行失败数量
final AtomicInteger failTaskCount = new AtomicInteger();
for (IConsumerService consumerService : consumerServices) {
AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,
multiTaskProgress, failTaskCount);
pool.execute(sendMessageTask);
}
boolean progressFlag = multiTaskProgress.await(awaitTimeout, TimeUnit.SECONDS);
boolean taskExecFlag = failTaskCount.get() == 0;
log.info("任务执行完成状态 : {} , 任务正常结束状态 : {} .", progressFlag, taskExecFlag);
return progressFlag && taskExecFlag;
} catch (Exception e) {
log.error("解析binlog数据异常.", e);
throw new Exception(e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
this.instances = SpringContextHolder.getApplicationContext().getBeansOfType(IConsumerService.class);
}
实现
InitializingBean
接口在afterPropertiesSet
方法中利用applicationContext.getBeansOfType
动态加载类放入Map instances
集合中。然后循环换集合来用线程池执行不同的消费策略。for (IConsumerService consumerService : consumerServices) {
AsyncSendMessageTask sendMessageTask = new AsyncSendMessageTask(consumerService, tableDatas,
multiTaskProgress, failTaskCount);
pool.execute(sendMessageTask);
}
-
5. 发送任务消息
/**
* 发送消息任务
*/
@Slf4j
public class AsyncSendMessageTask implements Runnable {
private IConsumerService consumerService;
private List<TableData> tableDatas;
private CountDownLatch multiTaskProgress;
private AtomicInteger failTaskCount;
public AsyncSendMessageTask(IConsumerService consumerService, List<TableData> tableDatas,
CountDownLatch multiTaskProgress, AtomicInteger failTaskCount) {
this.consumerService = consumerService;
this.tableDatas = tableDatas;
this.multiTaskProgress = multiTaskProgress;
this.failTaskCount = failTaskCount;
}
@Override
public void run() {
try {
boolean flag = consumerService.sendMessageData(tableDatas);
log.info("{}service任务执行状态:{}.", consumerService.getClass().getSimpleName(), flag);
if (!flag) {
failTaskCount.incrementAndGet();
}
} catch (Exception e) {
log.error("任务执行时失败.", e);
failTaskCount.incrementAndGet();
} finally {
multiTaskProgress.countDown();
}
}
}
此处
consumerService.sendMessageData
有不同的实现类,来完成不同的消费。