备注:该代码基于canal1.1.5
    client-adapter:
    resources下es7配置文件:
    添加自定义配置:
    indexSuffixfield: created_at #db table 创建时间
    indexSuffixPattern: yyyyMM # 时间需要对应ES索引后缀格式
    示例:

    1. dataSourceKey: defaultDS
    2. destination: example
    3. groupId: g1
    4. esMapping:
    5. _index: payments
    6. _id: _id
    7. upsert: true
    8. sql: "select t.payment_id as _id, t.payment_id, t.created_at,t.deleted_at,t.root_mch_id,t.agent_mch_id,t.bank_id,t.root_bank_id,t.cup_org_id,t.pay_balance_acct_id,t.amount,t.out_order_no,t.remark,t.fee,t.is_refunded,t.refunded_amount,t.extra,t.metadata,t.req_id,t.currency,t.recv_balance_acct_id,t.payment_status,t.finished_at,t.sent_at,t.version,t.reason,t.batch_payment_id from payments t"
    9. etlCondition: "where t.created_at>={}"
    10. commitBatch: 3000
    11. indexSuffixfield: created_at
    12. indexSuffixPattern: yyyyMM

    创建自定义索引扩展帮助类:

    1. package com.alibaba.otter.canal.client.adapter.es.core.support;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import org.springframework.util.CollectionUtils;
    7. import org.springframework.util.StringUtils;
    8. import java.text.SimpleDateFormat;
    9. import java.util.Map;
    10. public class ESMapppingPlus {
    11. private static Logger logger = LoggerFactory.getLogger(ESMapppingPlus.class);
    12. /**
    13. * 重构索引
    14. * @param esMapping
    15. * @param esFieldData
    16. */
    17. public static void rebuildIndex(ESSyncConfig.ESMapping esMapping, Map<String, Object> esFieldData) {
    18. if (CollectionUtils.isEmpty(esFieldData)){
    19. return;
    20. }
    21. String indexSuffixfield = esMapping.getIndexSuffixfield();
    22. if (StringUtils.isEmpty(indexSuffixfield)) {
    23. //索引后缀字段不存在 不带后缀
    24. esMapping.set_index(esMapping.getIndexPrefix());
    25. return;
    26. }
    27. String indexSuffix = esFieldData.containsKey(indexSuffixfield) ? esFieldData.get(indexSuffixfield).toString() : null;
    28. if (StringUtils.isEmpty(indexSuffix)) {
    29. //未匹配到索引后缀字段
    30. logger.error("esMapping.indexSuffixfield not matched to dml field, index:{} ,indexSuffixfield:{}, sql:{}, esFieldData:{}",
    31. esMapping.getIndexPrefix(), indexSuffixfield, esMapping.getSql(), JSONObject.toJSONString(esFieldData));
    32. throw new RuntimeException("esMapping.indexSuffixfield not matched to dml field");
    33. }
    34. String indexSuffixPattern = esMapping.getIndexSuffixPattern();
    35. if (indexSuffix.indexOf("-") != -1) {
    36. //时间字符串格式
    37. indexSuffix = datePattern(indexSuffix, indexSuffixPattern);
    38. String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;
    39. esMapping.set_index(index);
    40. return;
    41. } else if(indexSuffix.length()==13){
    42. //时间戳格式
    43. SimpleDateFormat formatter = new SimpleDateFormat(indexSuffixPattern);
    44. indexSuffix = formatter.format(indexSuffix);
    45. String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;
    46. esMapping.set_index(index);
    47. return;
    48. } else {
    49. //未匹配到索引后缀字段
    50. logger.error("esMapping.indexSuffixPattern can't null, index:{}",
    51. esMapping.getIndexPrefix());
    52. throw new RuntimeException("esMapping.indexSuffixPattern can't null");
    53. }
    54. }
    55. /**
    56. * 重构索引
    57. * @param config
    58. * @param esFieldData
    59. */
    60. public static void rebuildIndex(ESSyncConfig config, Map<String, Object> esFieldData) {
    61. rebuildIndex(config.getEsMapping(), esFieldData);
    62. }
    63. private static String datePattern(String date, String pattern) {
    64. switch (pattern) {
    65. case "yyyy" :
    66. return date.substring(0,4);
    67. case "yyyyMM":
    68. return date.substring(0,7).replaceAll("-", "");
    69. case "yyyyMMdd":
    70. return date.substring(0,10).replaceAll("-", "");
    71. case "yyyy-MM":
    72. return date.substring(0,7);
    73. case "yyyy-MM-dd":
    74. return date.substring(0,10);
    75. default:
    76. return date.substring(0,7).replaceAll("-", "");
    77. }
    78. }
    79. }

    ES7xTemplate.java添加

    1. ESMapppingPlus.rebuildIndex(mapping, esFieldData);

    �或者添加:

    1. ESMapppingPlus.rebuildIndex(config, esFieldData);

    修改后的完整代码:

    1. package com.alibaba.otter.canal.client.adapter.es7x.support;
    2. import java.sql.ResultSet;
    3. import java.sql.SQLException;
    4. import java.util.ArrayList;
    5. import java.util.HashMap;
    6. import java.util.LinkedHashMap;
    7. import java.util.List;
    8. import java.util.Map;
    9. import java.util.concurrent.ConcurrentHashMap;
    10. import java.util.concurrent.ConcurrentMap;
    11. import javax.sql.DataSource;
    12. import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;
    13. import org.apache.commons.lang.StringUtils;
    14. import org.elasticsearch.action.search.SearchResponse;
    15. import org.elasticsearch.cluster.metadata.MappingMetaData;
    16. import org.elasticsearch.index.query.BoolQueryBuilder;
    17. import org.elasticsearch.index.query.QueryBuilders;
    18. import org.elasticsearch.search.SearchHit;
    19. import org.slf4j.Logger;
    20. import org.slf4j.LoggerFactory;
    21. import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
    22. import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
    23. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
    24. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;
    25. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
    26. import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
    27. import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESBulkResponse;
    28. import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESDeleteRequest;
    29. import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESIndexRequest;
    30. import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESUpdateRequest;
    31. import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;
    32. import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
    33. import com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection.ESSearchRequest;
    34. import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
    35. import com.alibaba.otter.canal.client.adapter.support.Util;
    36. public class ES7xTemplate implements ESTemplate {
    37. private static final Logger logger = LoggerFactory
    38. .getLogger(ESTemplate.class);
    39. private static final int MAX_BATCH_SIZE = 1000;
    40. private ESConnection esConnection;
    41. private ESBulkRequest esBulkRequest;
    42. // es 字段类型本地缓存
    43. private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
    44. public ES7xTemplate(ESConnection esConnection){
    45. this.esConnection = esConnection;
    46. this.esBulkRequest = this.esConnection.new ES7xBulkRequest();
    47. }
    48. public ESBulkRequest getBulk() {
    49. return esBulkRequest;
    50. }
    51. public void resetBulkRequestBuilder() {
    52. this.esBulkRequest.resetBulk();
    53. }
    54. @Override
    55. public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
    56. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    57. if (mapping.get_id() != null) {
    58. String parentVal = (String) esFieldData.remove("$parent_routing");
    59. if (mapping.isUpsert()) {
    60. ESUpdateRequest updateRequest = esConnection.new ES7xUpdateRequest(mapping.get_index(),
    61. pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
    62. if (StringUtils.isNotEmpty(parentVal)) {
    63. updateRequest.setRouting(parentVal);
    64. }
    65. getBulk().add(updateRequest);
    66. } else {
    67. ESIndexRequest indexRequest = esConnection.new ES7xIndexRequest(mapping.get_index(), pkVal.toString())
    68. .setSource(esFieldData);
    69. if (StringUtils.isNotEmpty(parentVal)) {
    70. indexRequest.setRouting(parentVal);
    71. }
    72. getBulk().add(indexRequest);
    73. }
    74. commitBulk();
    75. } else {
    76. ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
    77. .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
    78. .size(10000);
    79. SearchResponse response = esSearchRequest.getResponse();
    80. for (SearchHit hit : response.getHits()) {
    81. ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
    82. hit.getId()).setDoc(esFieldData);
    83. getBulk().add(esUpdateRequest);
    84. commitBulk();
    85. }
    86. }
    87. }
    88. @Override
    89. public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
    90. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    91. Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());
    92. esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));
    93. append4Update(mapping, pkVal, esFieldDataTmp);
    94. commitBulk();
    95. }
    96. @Override
    97. public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
    98. ESMapppingPlus.rebuildIndex(config, esFieldData);
    99. if (paramsTmp.isEmpty()) {
    100. return;
    101. }
    102. ESMapping mapping = config.getEsMapping();
    103. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
    104. paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
    105. // 查询sql批量更新
    106. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    107. StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
    108. List<Object> values = new ArrayList<>();
    109. paramsTmp.forEach((fieldName, value) -> {
    110. sql.append("_v.").append(fieldName).append("=? AND ");
    111. values.add(value);
    112. });
    113. // TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
    114. int len = sql.length();
    115. sql.delete(len - 4, len);
    116. Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {
    117. int count = 0;
    118. try {
    119. while (rs.next()) {
    120. Object idVal = getIdValFromRS(mapping, rs);
    121. append4Update(mapping, idVal, esFieldData);
    122. commitBulk();
    123. count++;
    124. }
    125. } catch (Exception e) {
    126. throw new RuntimeException(e);
    127. }
    128. return count;
    129. });
    130. if (logger.isTraceEnabled()) {
    131. logger.trace("Update ES by query affected {} records", syncCount);
    132. }
    133. }
    134. @Override
    135. public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
    136. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    137. if (mapping.get_id() != null) {
    138. ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(),
    139. pkVal.toString());
    140. getBulk().add(esDeleteRequest);
    141. commitBulk();
    142. } else {
    143. ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
    144. .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
    145. .size(10000);
    146. SearchResponse response = esSearchRequest.getResponse();
    147. for (SearchHit hit : response.getHits()) {
    148. ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
    149. hit.getId()).setDoc(esFieldData);
    150. getBulk().add(esUpdateRequest);
    151. commitBulk();
    152. }
    153. }
    154. }
    155. @Override
    156. public void commit() {
    157. if (getBulk().numberOfActions() > 0) {
    158. ESBulkResponse response = getBulk().bulk();
    159. if (response.hasFailures()) {
    160. response.processFailBulkResponse("ES sync commit error ");
    161. }
    162. resetBulkRequestBuilder();
    163. }
    164. }
    165. @Override
    166. public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
    167. String columnName) throws SQLException {
    168. fieldName = Util.cleanColumn(fieldName);
    169. columnName = Util.cleanColumn(columnName);
    170. String esType = getEsType(mapping, fieldName);
    171. Object value = resultSet.getObject(columnName);
    172. if (value instanceof Boolean) {
    173. if (!"boolean".equals(esType)) {
    174. value = resultSet.getByte(columnName);
    175. }
    176. }
    177. // 如果是对象类型
    178. if (mapping.getObjFields().containsKey(fieldName)) {
    179. return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
    180. } else {
    181. return ESSyncUtil.typeConvert(value, esType);
    182. }
    183. }
    184. @Override
    185. public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
    186. Map<String, Object> esFieldData) throws SQLException {
    187. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    188. SchemaItem schemaItem = mapping.getSchemaItem();
    189. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    190. Object resultIdVal = null;
    191. for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    192. Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
    193. if (fieldItem.getFieldName().equals(idFieldName)) {
    194. resultIdVal = value;
    195. }
    196. if (!fieldItem.getFieldName().equals(mapping.get_id())
    197. && !mapping.getSkips().contains(fieldItem.getFieldName())) {
    198. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
    199. }
    200. }
    201. // 添加父子文档关联信息
    202. putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
    203. return resultIdVal;
    204. }
    205. @Override
    206. public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
    207. SchemaItem schemaItem = mapping.getSchemaItem();
    208. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    209. Object resultIdVal = null;
    210. for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    211. Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
    212. if (fieldItem.getFieldName().equals(idFieldName)) {
    213. resultIdVal = value;
    214. break;
    215. }
    216. }
    217. return resultIdVal;
    218. }
    219. @Override
    220. public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
    221. Map<String, Object> esFieldData) throws SQLException {
    222. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    223. SchemaItem schemaItem = mapping.getSchemaItem();
    224. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    225. Object resultIdVal = null;
    226. for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    227. if (fieldItem.getFieldName().equals(idFieldName)) {
    228. resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
    229. }
    230. for (ColumnItem columnItem : fieldItem.getColumnItems()) {
    231. if (dmlOld.containsKey(columnItem.getColumnName())
    232. && !mapping.getSkips().contains(fieldItem.getFieldName())) {
    233. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
    234. getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
    235. break;
    236. }
    237. }
    238. }
    239. // 添加父子文档关联信息
    240. putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
    241. return resultIdVal;
    242. }
    243. @Override
    244. public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
    245. String esType = getEsType(mapping, fieldName);
    246. Object value = dmlData.get(columnName);
    247. if (value instanceof Byte) {
    248. if ("boolean".equals(esType)) {
    249. value = ((Byte) value).intValue() != 0;
    250. }
    251. }
    252. // 如果是对象类型
    253. if (mapping.getObjFields().containsKey(fieldName)) {
    254. return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
    255. } else {
    256. return ESSyncUtil.typeConvert(value, esType);
    257. }
    258. }
    259. @Override
    260. public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
    261. Map<String, Object> esFieldData) {
    262. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    263. SchemaItem schemaItem = mapping.getSchemaItem();
    264. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    265. Object resultIdVal = null;
    266. for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    267. String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
    268. Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
    269. if (fieldItem.getFieldName().equals(idFieldName)) {
    270. resultIdVal = value;
    271. }
    272. if (!fieldItem.getFieldName().equals(mapping.get_id())
    273. && !mapping.getSkips().contains(fieldItem.getFieldName())) {
    274. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
    275. }
    276. }
    277. // 添加父子文档关联信息
    278. putRelationData(mapping, schemaItem, dmlData, esFieldData);
    279. return resultIdVal;
    280. }
    281. @Override
    282. public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
    283. Map<String, Object> esFieldData) {
    284. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    285. SchemaItem schemaItem = mapping.getSchemaItem();
    286. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    287. Object resultIdVal = null;
    288. for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    289. String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
    290. if (fieldItem.getFieldName().equals(idFieldName)) {
    291. resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
    292. }
    293. if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
    294. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
    295. getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
    296. }
    297. }
    298. // 添加父子文档关联信息
    299. putRelationData(mapping, schemaItem, dmlOld, esFieldData);
    300. return resultIdVal;
    301. }
    302. /**
    303. * 如果大于批量数则提交批次
    304. */
    305. private void commitBulk() {
    306. if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
    307. commit();
    308. }
    309. }
    310. private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
    311. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    312. if (mapping.get_id() != null) {
    313. String parentVal = (String) esFieldData.remove("$parent_routing");
    314. if (mapping.isUpsert()) {
    315. ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
    316. pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
    317. if (StringUtils.isNotEmpty(parentVal)) {
    318. esUpdateRequest.setRouting(parentVal);
    319. }
    320. getBulk().add(esUpdateRequest);
    321. } else {
    322. ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
    323. pkVal.toString()).setDoc(esFieldData);
    324. if (StringUtils.isNotEmpty(parentVal)) {
    325. esUpdateRequest.setRouting(parentVal);
    326. }
    327. getBulk().add(esUpdateRequest);
    328. }
    329. } else {
    330. ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
    331. .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
    332. .size(10000);
    333. SearchResponse response = esSearchRequest.getResponse();
    334. for (SearchHit hit : response.getHits()) {
    335. ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
    336. hit.getId()).setDoc(esFieldData);
    337. getBulk().add(esUpdateRequest);
    338. }
    339. }
    340. }
    341. /**
    342. * 获取es mapping中的属性类型
    343. *
    344. * @param mapping mapping配置
    345. * @param fieldName 属性名
    346. * @return 类型
    347. */
    348. @SuppressWarnings("unchecked")
    349. private String getEsType(ESMapping mapping, String fieldName) {
    350. String key = mapping.get_index() + "-" + mapping.get_type();
    351. Map<String, String> fieldType = esFieldTypes.get(key);
    352. if (fieldType != null) {
    353. return fieldType.get(fieldName);
    354. } else {
    355. // MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index());
    356. //模版获取
    357. MappingMetaData mappingMetaData = esConnection.getMapping(mapping.getIndexPrefix());
    358. if (mappingMetaData == null) {
    359. throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
    360. }
    361. fieldType = new LinkedHashMap<>();
    362. Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
    363. Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
    364. for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
    365. Map<String, Object> value = (Map<String, Object>) entry.getValue();
    366. if (value.containsKey("properties")) {
    367. fieldType.put(entry.getKey(), "object");
    368. } else {
    369. fieldType.put(entry.getKey(), (String) value.get("type"));
    370. }
    371. }
    372. esFieldTypes.put(key, fieldType);
    373. return fieldType.get(fieldName);
    374. }
    375. }
    376. private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
    377. Map<String, Object> esFieldData) {
    378. ESMapppingPlus.rebuildIndex(mapping, esFieldData);
    379. // 添加父子文档关联信息
    380. if (!mapping.getRelations().isEmpty()) {
    381. mapping.getRelations().forEach((relationField, relationMapping) -> {
    382. Map<String, Object> relations = new HashMap<>();
    383. relations.put("name", relationMapping.getName());
    384. if (StringUtils.isNotEmpty(relationMapping.getParent())) {
    385. FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
    386. Object parentVal;
    387. try {
    388. parentVal = getValFromRS(mapping,
    389. resultSet,
    390. parentFieldItem.getFieldName(),
    391. parentFieldItem.getFieldName());
    392. } catch (SQLException e) {
    393. throw new RuntimeException(e);
    394. }
    395. if (parentVal != null) {
    396. relations.put("parent", parentVal.toString());
    397. esFieldData.put("$parent_routing", parentVal.toString());
    398. }
    399. }
    400. esFieldData.put(relationField, relations);
    401. });
    402. }
    403. }
    404. private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
    405. Map<String, Object> esFieldData) {
    406. // 添加父子文档关联信息
    407. if (!mapping.getRelations().isEmpty()) {
    408. mapping.getRelations().forEach((relationField, relationMapping) -> {
    409. Map<String, Object> relations = new HashMap<>();
    410. relations.put("name", relationMapping.getName());
    411. if (StringUtils.isNotEmpty(relationMapping.getParent())) {
    412. FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
    413. String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
    414. Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
    415. if (parentVal != null) {
    416. relations.put("parent", parentVal.toString());
    417. esFieldData.put("$parent_routing", parentVal.toString());
    418. }
    419. }
    420. esFieldData.put(relationField, relations);
    421. });
    422. }
    423. }
    424. }

    修改ESSyncService.java文件:
    同样添加:

    1. ESMapppingPlus.rebuildIndex(config, data);

    或者:

    1. ESMapppingPlus.rebuildIndex(config, esFieldData);

    完整修改如下:

    1. package com.alibaba.otter.canal.client.adapter.es.core.service;
    2. import java.util.ArrayList;
    3. import java.util.Collection;
    4. import java.util.LinkedHashMap;
    5. import java.util.List;
    6. import java.util.Map;
    7. import javax.sql.DataSource;
    8. import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;
    9. import org.slf4j.Logger;
    10. import org.slf4j.LoggerFactory;
    11. import com.alibaba.fastjson.JSON;
    12. import com.alibaba.fastjson.serializer.SerializerFeature;
    13. import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
    14. import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
    15. import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
    16. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
    17. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;
    18. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
    19. import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.TableItem;
    20. import com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser;
    21. import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;
    22. import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
    23. import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
    24. import com.alibaba.otter.canal.client.adapter.support.Dml;
    25. import com.alibaba.otter.canal.client.adapter.support.Util;
    26. /**
    27. * ES 同步 Service
    28. *
    29. * @author rewerma 2018-11-01
    30. * @version 1.0.0
    31. */
    32. public class ESSyncService {
    33. private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
    34. private ESTemplate esTemplate;
    35. public ESSyncService(ESTemplate esTemplate){
    36. this.esTemplate = esTemplate;
    37. }
    38. public void sync(Collection<ESSyncConfig> esSyncConfigs, Dml dml) {
    39. long begin = System.currentTimeMillis();
    40. if (esSyncConfigs != null) {
    41. if (logger.isTraceEnabled()) {
    42. logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",
    43. dml.getDestination(),
    44. dml.getDatabase(),
    45. dml.getTable(),
    46. dml.getType(),
    47. esSyncConfigs.size());
    48. }
    49. for (ESSyncConfig config : esSyncConfigs) {
    50. if (logger.isTraceEnabled()) {
    51. logger.trace("Prepared to sync index: {}, destination: {}",
    52. config.getEsMapping().get_index(),
    53. dml.getDestination());
    54. }
    55. this.sync(config, dml);
    56. if (logger.isTraceEnabled()) {
    57. logger.trace("Sync completed: {}, destination: {}",
    58. config.getEsMapping().get_index(),
    59. dml.getDestination());
    60. }
    61. }
    62. if (logger.isTraceEnabled()) {
    63. logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",
    64. (System.currentTimeMillis() - begin),
    65. esSyncConfigs.size(),
    66. dml.getDestination());
    67. }
    68. if (logger.isDebugEnabled()) {
    69. StringBuilder configIndexes = new StringBuilder();
    70. esSyncConfigs
    71. .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
    72. logger.debug("DML: {} \nAffected indexes: {}",
    73. JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
    74. configIndexes.toString());
    75. }
    76. }
    77. }
    78. public void sync(ESSyncConfig config, Dml dml) {
    79. try {
    80. // 如果是按时间戳定时更新则返回
    81. if (config.getEsMapping().isSyncByTimestamp()) {
    82. return;
    83. }
    84. long begin = System.currentTimeMillis();
    85. String type = dml.getType();
    86. if (type != null && type.equalsIgnoreCase("INSERT")) {
    87. insert(config, dml);
    88. } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
    89. update(config, dml);
    90. } else if (type != null && type.equalsIgnoreCase("DELETE")) {
    91. delete(config, dml);
    92. } else {
    93. return;
    94. }
    95. if (logger.isTraceEnabled()) {
    96. logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
    97. (System.currentTimeMillis() - begin),
    98. dml.getDestination(),
    99. config.getEsMapping().get_index());
    100. }
    101. } catch (Throwable e) {
    102. logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
    103. throw new RuntimeException(e);
    104. }
    105. }
    106. /**
    107. * 插入操作dml
    108. *
    109. * @param config es配置
    110. * @param dml dml数据
    111. */
    112. private void insert(ESSyncConfig config, Dml dml) {
    113. List<Map<String, Object>> dataList = dml.getData();
    114. if (dataList == null || dataList.isEmpty()) {
    115. return;
    116. }
    117. SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
    118. for (Map<String, Object> data : dataList) {
    119. if (data == null || data.isEmpty()) {
    120. continue;
    121. }
    122. ESMapppingPlus.rebuildIndex(config, data);
    123. if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
    124. // ------单表 & 所有字段都为简单字段------
    125. singleTableSimpleFiledInsert(config, dml, data);
    126. } else {
    127. // ------是主表 查询sql来插入------
    128. if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
    129. mainTableInsert(config, dml, data);
    130. }
    131. // 从表的操作
    132. for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
    133. if (tableItem.isMain()) {
    134. continue;
    135. }
    136. if (!tableItem.getTableName().equals(dml.getTable())) {
    137. continue;
    138. }
    139. // 关联条件出现在主表查询条件是否为简单字段
    140. boolean allFieldsSimple = true;
    141. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    142. if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
    143. allFieldsSimple = false;
    144. break;
    145. }
    146. }
    147. // 所有查询字段均为简单字段
    148. if (allFieldsSimple) {
    149. // 不是子查询
    150. if (!tableItem.isSubQuery()) {
    151. // ------关联表简单字段插入------
    152. Map<String, Object> esFieldData = new LinkedHashMap<>();
    153. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    154. Object value = esTemplate.getValFromData(config.getEsMapping(),
    155. data,
    156. fieldItem.getFieldName(),
    157. fieldItem.getColumn().getColumnName());
    158. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
    159. }
    160. joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
    161. } else {
    162. // ------关联子表简单字段插入------
    163. subTableSimpleFieldOperation(config, dml, data, null, tableItem);
    164. }
    165. } else {
    166. // ------关联子表复杂字段插入 执行全sql更新es------
    167. wholeSqlOperation(config, dml, data, null, tableItem);
    168. }
    169. }
    170. }
    171. }
    172. }
    173. /**
    174. * 更新操作dml
    175. *
    176. * @param config es配置
    177. * @param dml dml数据
    178. */
    179. private void update(ESSyncConfig config, Dml dml) {
    180. List<Map<String, Object>> dataList = dml.getData();
    181. List<Map<String, Object>> oldList = dml.getOld();
    182. if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
    183. return;
    184. }
    185. SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
    186. int i = 0;
    187. for (Map<String, Object> data : dataList) {
    188. Map<String, Object> old = oldList.get(i);
    189. if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
    190. continue;
    191. }
    192. ESMapppingPlus.rebuildIndex(config, data);
    193. if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
    194. // ------单表 & 所有字段都为简单字段------
    195. singleTableSimpleFiledUpdate(config, dml, data, old);
    196. } else {
    197. // ------主表 查询sql来更新------
    198. if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
    199. ESMapping mapping = config.getEsMapping();
    200. String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
    201. FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
    202. boolean idFieldSimple = true;
    203. if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
    204. idFieldSimple = false;
    205. }
    206. boolean allUpdateFieldSimple = true;
    207. out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
    208. for (ColumnItem columnItem : fieldItem.getColumnItems()) {
    209. if (old.containsKey(columnItem.getColumnName())) {
    210. if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
    211. allUpdateFieldSimple = false;
    212. break out;
    213. }
    214. }
    215. }
    216. }
    217. // 不支持主键更新!!
    218. // 判断是否有外键更新
    219. boolean fkChanged = false;
    220. for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
    221. if (tableItem.isMain()) {
    222. continue;
    223. }
    224. boolean changed = false;
    225. for (List<FieldItem> fieldItems : tableItem.getRelationTableFields().values()) {
    226. for (FieldItem fieldItem : fieldItems) {
    227. if (old.containsKey(fieldItem.getColumn().getColumnName())) {
    228. fkChanged = true;
    229. changed = true;
    230. break;
    231. }
    232. }
    233. }
    234. // 如果外键有修改,则更新所对应该表的所有查询条件数据
    235. if (changed) {
    236. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    237. fieldItem.getColumnItems()
    238. .forEach(columnItem -> old.put(columnItem.getColumnName(), null));
    239. }
    240. }
    241. }
    242. // 判断主键和所更新的字段是否全为简单字段
    243. if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
    244. singleTableSimpleFiledUpdate(config, dml, data, old);
    245. } else {
    246. mainTableUpdate(config, dml, data, old);
    247. }
    248. }
    249. // 从表的操作
    250. for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
    251. if (tableItem.isMain()) {
    252. continue;
    253. }
    254. if (!tableItem.getTableName().equals(dml.getTable())) {
    255. continue;
    256. }
    257. // 关联条件出现在主表查询条件是否为简单字段
    258. boolean allFieldsSimple = true;
    259. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    260. if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
    261. allFieldsSimple = false;
    262. break;
    263. }
    264. }
    265. // 所有查询字段均为简单字段
    266. if (allFieldsSimple) {
    267. // 不是子查询
    268. if (!tableItem.isSubQuery()) {
    269. // ------关联表简单字段更新------
    270. Map<String, Object> esFieldData = new LinkedHashMap<>();
    271. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    272. if (old.containsKey(fieldItem.getColumn().getColumnName())) {
    273. Object value = esTemplate.getValFromData(config.getEsMapping(),
    274. data,
    275. fieldItem.getFieldName(),
    276. fieldItem.getColumn().getColumnName());
    277. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
    278. }
    279. }
    280. joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
    281. } else {
    282. // ------关联子表简单字段更新------
    283. subTableSimpleFieldOperation(config, dml, data, old, tableItem);
    284. }
    285. } else {
    286. // ------关联子表复杂字段更新 执行全sql更新es------
    287. wholeSqlOperation(config, dml, data, old, tableItem);
    288. }
    289. }
    290. }
    291. i++;
    292. }
    293. }
    294. /**
    295. * 删除操作dml
    296. *
    297. * @param config es配置
    298. * @param dml dml数据
    299. */
    300. private void delete(ESSyncConfig config, Dml dml) {
    301. List<Map<String, Object>> dataList = dml.getData();
    302. if (dataList == null || dataList.isEmpty()) {
    303. return;
    304. }
    305. SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
    306. for (Map<String, Object> data : dataList) {
    307. if (data == null || data.isEmpty()) {
    308. continue;
    309. }
    310. ESMapppingPlus.rebuildIndex(config, data);
    311. ESMapping mapping = config.getEsMapping();
    312. // ------是主表------
    313. if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
    314. if (mapping.get_id() != null) {
    315. FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
    316. // 主键为简单字段
    317. if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
    318. Object idVal = esTemplate.getValFromData(mapping,
    319. data,
    320. idFieldItem.getFieldName(),
    321. idFieldItem.getColumn().getColumnName());
    322. if (logger.isTraceEnabled()) {
    323. logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
    324. config.getDestination(),
    325. dml.getTable(),
    326. mapping.get_index(),
    327. idVal);
    328. }
    329. esTemplate.delete(mapping, idVal, null);
    330. } else {
    331. // ------主键带函数, 查询sql获取主键删除------
    332. // FIXME 删除时反查sql为空记录, 无法获获取 id field 值
    333. mainTableDelete(config, dml, data);
    334. }
    335. } else {
    336. FieldItem pkFieldItem = schemaItem.getIdFieldItem(mapping);
    337. if (!pkFieldItem.isMethod() && !pkFieldItem.isBinaryOp()) {
    338. Map<String, Object> esFieldData = new LinkedHashMap<>();
    339. Object pkVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
    340. if (logger.isTraceEnabled()) {
    341. logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
    342. config.getDestination(),
    343. dml.getTable(),
    344. mapping.get_index(),
    345. pkVal);
    346. }
    347. esFieldData.remove(pkFieldItem.getFieldName());
    348. esFieldData.keySet().forEach(key -> esFieldData.put(key, null));
    349. esTemplate.delete(mapping, pkVal, esFieldData);
    350. } else {
    351. // ------主键带函数, 查询sql获取主键删除------
    352. mainTableDelete(config, dml, data);
    353. }
    354. }
    355. }
    356. // 从表的操作
    357. for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
    358. if (tableItem.isMain()) {
    359. continue;
    360. }
    361. if (!tableItem.getTableName().equals(dml.getTable())) {
    362. continue;
    363. }
    364. // 关联条件出现在主表查询条件是否为简单字段
    365. boolean allFieldsSimple = true;
    366. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    367. if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
    368. allFieldsSimple = false;
    369. break;
    370. }
    371. }
    372. // 所有查询字段均为简单字段
    373. if (allFieldsSimple) {
    374. // 不是子查询
    375. if (!tableItem.isSubQuery()) {
    376. // ------关联表简单字段更新为null------
    377. Map<String, Object> esFieldData = new LinkedHashMap<>();
    378. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    379. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), null);
    380. }
    381. joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
    382. } else {
    383. // ------关联子表简单字段更新------
    384. subTableSimpleFieldOperation(config, dml, data, null, tableItem);
    385. }
    386. } else {
    387. // ------关联子表复杂字段更新 执行全sql更新es------
    388. wholeSqlOperation(config, dml, data, null, tableItem);
    389. }
    390. }
    391. }
    392. }
    393. /**
    394. * 单表简单字段insert
    395. *
    396. * @param config es配置
    397. * @param dml dml信息
    398. * @param data 单行dml数据
    399. */
    400. private void singleTableSimpleFiledInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
    401. ESMapping mapping = config.getEsMapping();
    402. Map<String, Object> esFieldData = new LinkedHashMap<>();
    403. Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
    404. if (logger.isTraceEnabled()) {
    405. logger.trace("Single table insert to es index, destination:{}, table: {}, index: {}, id: {}",
    406. config.getDestination(),
    407. dml.getTable(),
    408. mapping.get_index(),
    409. idVal);
    410. }
    411. esTemplate.insert(mapping, idVal, esFieldData);
    412. }
    413. /**
    414. * 主表(单表)复杂字段insert
    415. *
    416. * @param config es配置
    417. * @param dml dml信息
    418. * @param data 单行dml数据
    419. */
    420. private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
    421. ESMapping mapping = config.getEsMapping();
    422. String sql = mapping.getSql();
    423. String condition = ESSyncUtil.pkConditionSql(mapping, data);
    424. sql = ESSyncUtil.appendCondition(sql, condition);
    425. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    426. if (logger.isTraceEnabled()) {
    427. logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
    428. config.getDestination(),
    429. dml.getTable(),
    430. mapping.get_index(),
    431. sql.replace("\n", " "));
    432. }
    433. Util.sqlRS(ds, sql, rs -> {
    434. try {
    435. while (rs.next()) {
    436. Map<String, Object> esFieldData = new LinkedHashMap<>();
    437. Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
    438. if (logger.isTraceEnabled()) {
    439. logger.trace(
    440. "Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
    441. config.getDestination(),
    442. dml.getTable(),
    443. mapping.get_index(),
    444. idVal);
    445. }
    446. esTemplate.insert(mapping, idVal, esFieldData);
    447. }
    448. } catch (Exception e) {
    449. throw new RuntimeException(e);
    450. }
    451. return 0;
    452. });
    453. }
    454. private void mainTableDelete(ESSyncConfig config, Dml dml, Map<String, Object> data) {
    455. ESMapping mapping = config.getEsMapping();
    456. String sql = mapping.getSql();
    457. String condition = ESSyncUtil.pkConditionSql(mapping, data);
    458. sql = ESSyncUtil.appendCondition(sql, condition);
    459. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    460. if (logger.isTraceEnabled()) {
    461. logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
    462. config.getDestination(),
    463. dml.getTable(),
    464. mapping.get_index(),
    465. sql.replace("\n", " "));
    466. }
    467. Util.sqlRS(ds, sql, rs -> {
    468. try {
    469. Map<String, Object> esFieldData = null;
    470. if (mapping.getPk() != null) {
    471. esFieldData = new LinkedHashMap<>();
    472. esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
    473. esFieldData.remove(mapping.getPk());
    474. for (String key : esFieldData.keySet()) {
    475. esFieldData.put(Util.cleanColumn(key), null);
    476. }
    477. }
    478. while (rs.next()) {
    479. Object idVal = esTemplate.getIdValFromRS(mapping, rs);
    480. if (logger.isTraceEnabled()) {
    481. logger.trace(
    482. "Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
    483. config.getDestination(),
    484. dml.getTable(),
    485. mapping.get_index(),
    486. idVal);
    487. }
    488. esTemplate.delete(mapping, idVal, esFieldData);
    489. }
    490. } catch (Exception e) {
    491. throw new RuntimeException(e);
    492. }
    493. return 0;
    494. });
    495. }
    496. /**
    497. * 关联表主表简单字段operation
    498. *
    499. * @param config es配置
    500. * @param dml dml信息
    501. * @param data 单行dml数据
    502. * @param tableItem 当前表配置
    503. */
    504. private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
    505. TableItem tableItem, Map<String, Object> esFieldData) {
    506. ESMapping mapping = config.getEsMapping();
    507. Map<String, Object> paramsTmp = new LinkedHashMap<>();
    508. for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
    509. for (FieldItem fieldItem : entry.getValue()) {
    510. if (fieldItem.getColumnItems().size() == 1) {
    511. Object value = esTemplate.getValFromData(mapping,
    512. data,
    513. fieldItem.getFieldName(),
    514. entry.getKey().getColumn().getColumnName());
    515. String fieldName = fieldItem.getFieldName();
    516. // 判断是否是主键
    517. if (fieldName.equals(mapping.get_id())) {
    518. fieldName = "_id";
    519. }
    520. paramsTmp.put(fieldName, value);
    521. }
    522. }
    523. }
    524. if (logger.isDebugEnabled()) {
    525. logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
    526. config.getDestination(),
    527. dml.getTable(),
    528. mapping.get_index());
    529. }
    530. esTemplate.updateByQuery(config, paramsTmp, esFieldData);
    531. }
    532. /**
    533. * 关联子查询, 主表简单字段operation
    534. *
    535. * @param config es配置
    536. * @param dml dml信息
    537. * @param data 单行dml数据
    538. * @param old 单行old数据
    539. * @param tableItem 当前表配置
    540. */
    541. private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
    542. Map<String, Object> old, TableItem tableItem) {
    543. ESMapping mapping = config.getEsMapping();
    544. MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
    545. StringBuilder sql = new StringBuilder();
    546. sql.append("SELECT ")
    547. .append(SqlParser.parse4SQLSelectItem(queryBlock))
    548. .append(" FROM ")
    549. .append(SqlParser.parse4FromTableSource(queryBlock));
    550. String whereSql = SqlParser.parse4WhereItem(queryBlock);
    551. if (whereSql != null) {
    552. sql.append(" WHERE ").append(whereSql);
    553. } else {
    554. sql.append(" WHERE 1=1 ");
    555. }
    556. List<Object> values = new ArrayList<>();
    557. for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
    558. String columnName = fkFieldItem.getColumn().getColumnName();
    559. Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
    560. sql.append(" AND ").append(columnName).append("=? ");
    561. values.add(value);
    562. }
    563. String groupSql = SqlParser.parse4GroupBy(queryBlock);
    564. if (groupSql != null) {
    565. sql.append(groupSql);
    566. }
    567. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    568. if (logger.isTraceEnabled()) {
    569. logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
    570. config.getDestination(),
    571. dml.getTable(),
    572. mapping.get_index(),
    573. sql.toString().replace("\n", " "));
    574. }
    575. Util.sqlRS(ds, sql.toString(), values, rs -> {
    576. try {
    577. while (rs.next()) {
    578. Map<String, Object> esFieldData = new LinkedHashMap<>();
    579. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    580. if (old != null) {
    581. out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
    582. for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
    583. if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
    584. for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
    585. if (old.containsKey(columnItem.getColumnName())) {
    586. Object val = esTemplate.getValFromRS(mapping,
    587. rs,
    588. fieldItem.getFieldName(),
    589. fieldItem.getColumn().getColumnName());
    590. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
    591. break out;
    592. }
    593. }
    594. }
    595. }
    596. } else {
    597. Object val = esTemplate.getValFromRS(mapping,
    598. rs,
    599. fieldItem.getFieldName(),
    600. fieldItem.getColumn().getColumnName());
    601. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
    602. }
    603. }
    604. Map<String, Object> paramsTmp = new LinkedHashMap<>();
    605. for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
    606. for (FieldItem fieldItem : entry.getValue()) {
    607. if (fieldItem.getColumnItems().size() == 1) {
    608. Object value = esTemplate.getValFromRS(mapping,
    609. rs,
    610. fieldItem.getFieldName(),
    611. entry.getKey().getColumn().getColumnName());
    612. String fieldName = fieldItem.getFieldName();
    613. // 判断是否是主键
    614. if (fieldName.equals(mapping.get_id())) {
    615. fieldName = "_id";
    616. }
    617. paramsTmp.put(fieldName, value);
    618. }
    619. }
    620. }
    621. if (logger.isDebugEnabled()) {
    622. logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
    623. config.getDestination(),
    624. dml.getTable(),
    625. mapping.get_index());
    626. }
    627. esTemplate.updateByQuery(config, paramsTmp, esFieldData);
    628. }
    629. } catch (Exception e) {
    630. throw new RuntimeException(e);
    631. }
    632. return 0;
    633. });
    634. }
    635. /**
    636. * 关联(子查询), 主表复杂字段operation, 全sql执行
    637. *
    638. * @param config es配置
    639. * @param dml dml信息
    640. * @param data 单行dml数据
    641. * @param tableItem 当前表配置
    642. */
    643. private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
    644. TableItem tableItem) {
    645. ESMapping mapping = config.getEsMapping();
    646. // 防止最后出现groupby 导致sql解析异常
    647. String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");
    648. String sqlNoWhere = sqlSplit[0];
    649. String sqlGroupBy = "";
    650. if (sqlSplit.length > 1) {
    651. sqlGroupBy = "GROUP BY " + sqlSplit[1];
    652. }
    653. StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");
    654. for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
    655. String columnName = fkFieldItem.getColumn().getColumnName();
    656. Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
    657. ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
    658. }
    659. int len = sql.length();
    660. sql.delete(len - 5, len);
    661. sql.append(sqlGroupBy);
    662. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    663. if (logger.isTraceEnabled()) {
    664. logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
    665. config.getDestination(),
    666. dml.getTable(),
    667. mapping.get_index(),
    668. sql.toString().replace("\n", " "));
    669. }
    670. Util.sqlRS(ds, sql.toString(), rs -> {
    671. try {
    672. while (rs.next()) {
    673. Map<String, Object> esFieldData = new LinkedHashMap<>();
    674. for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
    675. if (old != null) {
    676. // 从表子查询
    677. out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
    678. for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
    679. if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
    680. for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
    681. if (old.containsKey(columnItem.getColumnName())) {
    682. Object val = esTemplate.getValFromRS(mapping,
    683. rs,
    684. fieldItem.getFieldName(),
    685. fieldItem.getFieldName());
    686. esFieldData.put(fieldItem.getFieldName(), val);
    687. break out;
    688. }
    689. }
    690. }
    691. }
    692. // 从表非子查询
    693. for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {
    694. if (fieldItem1.equals(fieldItem)) {
    695. for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
    696. if (old.containsKey(columnItem.getColumnName())) {
    697. Object val = esTemplate.getValFromRS(mapping,
    698. rs,
    699. fieldItem.getFieldName(),
    700. fieldItem.getFieldName());
    701. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
    702. break;
    703. }
    704. }
    705. }
    706. }
    707. } else {
    708. Object val = esTemplate
    709. .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
    710. esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
    711. }
    712. }
    713. Map<String, Object> paramsTmp = new LinkedHashMap<>();
    714. for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
    715. for (FieldItem fieldItem : entry.getValue()) {
    716. Object value = esTemplate
    717. .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
    718. String fieldName = fieldItem.getFieldName();
    719. // 判断是否是主键
    720. if (fieldName.equals(mapping.get_id())) {
    721. fieldName = "_id";
    722. }
    723. paramsTmp.put(fieldName, value);
    724. }
    725. }
    726. if (logger.isDebugEnabled()) {
    727. logger.trace(
    728. "Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
    729. config.getDestination(),
    730. dml.getTable(),
    731. mapping.get_index());
    732. }
    733. esTemplate.updateByQuery(config, paramsTmp, esFieldData);
    734. }
    735. } catch (Exception e) {
    736. throw new RuntimeException(e);
    737. }
    738. return 0;
    739. });
    740. }
    741. /**
    742. * 单表简单字段update
    743. *
    744. * @param config es配置
    745. * @param dml dml信息
    746. * @param data 单行data数据
    747. * @param old 单行old数据
    748. */
    749. private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,
    750. Map<String, Object> old) {
    751. ESMapping mapping = config.getEsMapping();
    752. Map<String, Object> esFieldData = new LinkedHashMap<>();
    753. Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
    754. if (logger.isTraceEnabled()) {
    755. logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",
    756. config.getDestination(),
    757. dml.getTable(),
    758. mapping.get_index(),
    759. idVal);
    760. }
    761. esTemplate.update(mapping, idVal, esFieldData);
    762. }
    763. /**
    764. * 主表(单表)复杂字段update
    765. *
    766. * @param config es配置
    767. * @param dml dml信息
    768. * @param data 单行dml数据
    769. */
    770. private void mainTableUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old) {
    771. ESMapping mapping = config.getEsMapping();
    772. String sql = mapping.getSql();
    773. String condition = ESSyncUtil.pkConditionSql(mapping, data);
    774. sql = ESSyncUtil.appendCondition(sql, condition);
    775. DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    776. if (logger.isTraceEnabled()) {
    777. logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
    778. config.getDestination(),
    779. dml.getTable(),
    780. mapping.get_index(),
    781. sql.replace("\n", " "));
    782. }
    783. Util.sqlRS(ds, sql, rs -> {
    784. try {
    785. while (rs.next()) {
    786. Map<String, Object> esFieldData = new LinkedHashMap<>();
    787. Object idVal = esTemplate.getESDataFromRS(mapping, rs, old, esFieldData);
    788. if (logger.isTraceEnabled()) {
    789. logger.trace(
    790. "Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
    791. config.getDestination(),
    792. dml.getTable(),
    793. mapping.get_index(),
    794. idVal);
    795. }
    796. esTemplate.update(mapping, idVal, esFieldData);
    797. }
    798. } catch (Exception e) {
    799. throw new RuntimeException(e);
    800. }
    801. return 0;
    802. });
    803. }
    804. /**
    805. * 提交批次
    806. */
    807. public void commit() {
    808. esTemplate.commit();
    809. }
    810. }