{"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}
2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"settle_acct_id":"2104758729693937747","reason":null,"metadata":null,"finished_at":null,"fee":"0","created_at":"2021-09-18 16:03:52.415","memo":null,"balance_acct_id":"2004758746508042272","remark":"1","bank_acct_type":"2","req_id":"7ec375b8e5004180800bc33305e58085","updated_at":"2021-09-18 16:03:52.415","refunded_at":null,"bank_id":"1203623228267896861","extra":null,"business_type":"1","withdrawal_type":"0","currency":"CNY","is_reversed":"0","channel_type":null,"root_mch_id":"1003752812805693510","bank_acct_no":"31001609507050000563","bank_code":"105100000017","amount":"100","service_fee":"0","out_order_no":"TK2109180035","withdrawal_status":"1","deleted_at":null,"version":"0","channel_trade_no":null,"sent_at":null,"channel_status":null,"agent_mch_id":null,"withdrawal_id":"3704760130841100356","cup_org_id":"1402358175721916456","bank_branch_code":"105221002003","name":"c11","root_bank_id":"1203623228267896861","out_batch_order_no":null,"mobile_number":null}],"database":"unionpay_trade","destination":"uat_trade_withdrawals","es":1631952232000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["withdrawal_id"],"sql":"","table":"withdrawals","ts":1631952232522,"type":"INSERT"}2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync-test-log2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - insert start2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-start2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - dataList-test-end2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log2021-09-18 16:03:52.527 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start2021-09-18 16:03:52.527 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - schemaItem-test-log2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex start2021-09-18 16:03:52.528 [pool-162-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success2021-09-18 16:03:52.528 [pool-166-thread-1] INFO c.a.o.canal.client.adapter.es.core.service.ESSyncService - rebuildIndex success2021-09-18 16:03:52.531 [pool-162-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - nulljava.util.ConcurrentModificationException: nullat java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) ~[na:1.8.0_252]at java.util.ArrayList$Itr.next(ArrayList.java:859) ~[na:1.8.0_252]at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:374) ~[na:na]at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1420) ~[na:na]at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394) ~[na:na]at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:492) ~[na:na]at com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection$ES7xBulkRequest.bulk(ESConnection.java:426) ~[na:na]at com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate.commit(ES7xTemplate.java:173) ~[na:na]at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.commit(ESSyncService.java:905) ~[na:na]at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:97) ~[na:na]at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) ~[client-adapter.launcher-1.1.5.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) ~[client-adapter.launcher-1.1.5.jar:na]at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_252]at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) ~[client-adapter.launcher-1.1.5.jar:na]at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_252]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_252]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_252]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_252]
问题解析:
https://github.com/alibaba/canal/issues/2159
https://github.com/alibaba/canal/issues/2160
文档:
https://github.com/alibaba/canal/wiki/Sync-ES
标准姿势应该如下:application.yml配置name: eskey: exampleKey #配置es数据源的key 必须唯一 (这个属性超级无敌重要)hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔properties:cluster.name: elasticsearch # es cluster name适配器映射配置:dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值,必须要保证每个mapping的dataSourceKey唯一destination: example # cannal的instance或者MQ的topicouterAdapterKey: exampleKey #对应上文中的key 这个key很重要 再强调一次groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据如果按照官网现在给的那个例子,有概率出现以下问题:由于es没有配置key 会导致多个 instance 共享 一个 EsAdapater,且EsAdapter 会间接关联到一个非线程安全的final List requests = new ArrayList<>()1.在多个instance同时写入es的时候 有可能会数据丢失(requests.add操作) 【我自己遇到过,而且极难排查】2.抛出java.lang.RuntimeException: java.lang.RuntimeException: java.util.ConcurrentModificationException 这是因为 当其中某个 instance在提交的时候 会进行for循环 遍历requests 假如恰好另外一个instance中间 进行了 add操作 就会触发。【有人遇到过这个问题】顺便附一段 初始化 EsAdapter的代码:public T getExtension(String name, String key) {if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");if ("true".equals(name)) {return getDefaultExtension();}String extKey = name + "-" + StringUtils.trimToEmpty(key);Holder holder = cachedInstances.get(extKey);if (holder == null) {cachedInstances.putIfAbsent(extKey, new Holder<>());holder = cachedInstances.get(extKey);}Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {instance = createExtension(name, key);holder.set(instance);}}}return (T) instance;}从这段代码可以看出 不配置key 引用指向的都是同一个EsAdapter.
