{"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start
2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start
2021-09-18 16:03:52.528 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success
2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success
2021-09-18 16:03:52.531 [pool-162-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - null
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) ~[na:1.8.0_252]
at java.util.ArrayList$Itr.next(ArrayList.java:859) ~[na:1.8.0_252]
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:374) ~[na:na]
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1420) ~[na:na]
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394) ~[na:na]
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:492) ~[na:na]
at com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection$ES7xBulkRequest.bulk(ESConnection.java:426) ~[na:na]
at com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate.commit(ES7xTemplate.java:173) ~[na:na]
at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.commit(ESSyncService.java:905) ~[na:na]
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:97) ~[na:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) ~[client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) ~[client-adapter.launcher-1.1.5.jar:na]
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_252]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) ~[client-adapter.launcher-1.1.5.jar:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_252]
问题解析:
https://github.com/alibaba/canal/issues/2159
https://github.com/alibaba/canal/issues/2160
文档:
https://github.com/alibaba/canal/wiki/Sync-ES
标准姿势应该如下:
application.yml配置
name: es
key: exampleKey #配置es数据源的key 必须唯一 (这个属性超级无敌重要)
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
cluster.name: elasticsearch # es cluster name
适配器映射配置:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值,必须要保证每个mapping的dataSourceKey唯一
destination: example # cannal的instance或者MQ的topic
outerAdapterKey: exampleKey #对应上文中的key 这个key很重要 再强调一次
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
如果按照官网现在给的那个例子,有概率出现以下问题:
由于es没有配置key 会导致多个 instance 共享 一个 EsAdapater,且EsAdapter 会间接关联到一个非线程安全的final List requests = new ArrayList<>()
1.在多个instance同时写入es的时候 有可能会数据丢失(requests.add操作) 【我自己遇到过,而且极难排查】
2.抛出java.lang.RuntimeException: java.lang.RuntimeException: java.util.ConcurrentModificationException 这是因为 当其中某个 instance在提交的时候 会进行for循环 遍历requests 假如恰好另外一个instance中间 进行了 add操作 就会触发。【有人遇到过这个问题】
顺便附一段 初始化 EsAdapter的代码:
public T getExtension(String name, String key) {
if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
String extKey = name + "-" + StringUtils.trimToEmpty(key);
Holder holder = cachedInstances.get(extKey);
if (holder == null) {
cachedInstances.putIfAbsent(extKey, new Holder<>());
holder = cachedInstances.get(extKey);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, key);
holder.set(instance);
}
}
}
return (T) instance;
}
从这段代码可以看出 不配置key 引用指向的都是同一个EsAdapter.