1. {"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}
    1. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}
    2. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log
    3. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start
    4. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log
    5. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start
    6. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start
    7. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end
    8. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
    9. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start
    10. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end
    11. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
    12. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
    13. 2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start
    14. 2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log
    15. 2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start
    16. 2021-09-18 16:03:52.528 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success
    17. 2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success
    18. 2021-09-18 16:03:52.531 [pool-162-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - null
    19. java.util.ConcurrentModificationException: null
    20. at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) ~[na:1.8.0_252]
    21. at java.util.ArrayList$Itr.next(ArrayList.java:859) ~[na:1.8.0_252]
    22. at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:374) ~[na:na]
    23. at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1420) ~[na:na]
    24. at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394) ~[na:na]
    25. at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:492) ~[na:na]
    26. at com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection$ES7xBulkRequest.bulk(ESConnection.java:426) ~[na:na]
    27. at com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate.commit(ES7xTemplate.java:173) ~[na:na]
    28. at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.commit(ESSyncService.java:905) ~[na:na]
    29. at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:97) ~[na:na]
    30. at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) ~[client-adapter.launcher-1.1.5.jar:na]
    31. at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) ~[client-adapter.launcher-1.1.5.jar:na]
    32. at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_252]
    33. at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) ~[client-adapter.launcher-1.1.5.jar:na]
    34. at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_252]
    35. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_252]
    36. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_252]
    37. at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_252]

    问题解析:
    https://github.com/alibaba/canal/issues/2159
    https://github.com/alibaba/canal/issues/2160
    文档:
    https://github.com/alibaba/canal/wiki/Sync-ES

    1. 标准姿势应该如下:
    2. application.yml配置
    3. name: es
    4. key: exampleKey #配置es数据源的key 必须唯一 (这个属性超级无敌重要)
    5. hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
    6. properties:
    7. cluster.name: elasticsearch # es cluster name
    8. 适配器映射配置:
    9. dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值,必须要保证每个mappingdataSourceKey唯一
    10. destination: example # cannalinstance或者MQtopic
    11. outerAdapterKey: exampleKey #对应上文中的key 这个key很重要 再强调一次
    12. groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
    13. 如果按照官网现在给的那个例子,有概率出现以下问题:
    14. 由于es没有配置key 会导致多个 instance 共享 一个 EsAdapater,且EsAdapter 会间接关联到一个非线程安全的final List requests = new ArrayList<>()
    15. 1.在多个instance同时写入es的时候 有可能会数据丢失(requests.add操作) 【我自己遇到过,而且极难排查】
    16. 2.抛出java.lang.RuntimeException: java.lang.RuntimeException: java.util.ConcurrentModificationException 这是因为 当其中某个 instance在提交的时候 会进行for循环 遍历requests 假如恰好另外一个instance中间 进行了 add操作 就会触发。【有人遇到过这个问题】
    17. 顺便附一段 初始化 EsAdapter的代码:
    18. public T getExtension(String name, String key) {
    19. if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
    20. if ("true".equals(name)) {
    21. return getDefaultExtension();
    22. }
    23. String extKey = name + "-" + StringUtils.trimToEmpty(key);
    24. Holder holder = cachedInstances.get(extKey);
    25. if (holder == null) {
    26. cachedInstances.putIfAbsent(extKey, new Holder<>());
    27. holder = cachedInstances.get(extKey);
    28. }
    29. Object instance = holder.get();
    30. if (instance == null) {
    31. synchronized (holder) {
    32. instance = holder.get();
    33. if (instance == null) {
    34. instance = createExtension(name, key);
    35. holder.set(instance);
    36. }
    37. }
    38. }
    39. return (T) instance;
    40. }
    41. 从这段代码可以看出 不配置key 引用指向的都是同一个EsAdapter.