备注:该代码基于canal1.1.5
client-adapter:
resources下es7配置文件:
添加自定义配置:
indexSuffixfield: created_at #db table 创建时间
indexSuffixPattern: yyyyMM # 时间需要对应ES索引后缀格式
示例:
dataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: payments_id: _idupsert: truesql: "select t.payment_id as _id, t.payment_id, t.created_at,t.deleted_at,t.root_mch_id,t.agent_mch_id,t.bank_id,t.root_bank_id,t.cup_org_id,t.pay_balance_acct_id,t.amount,t.out_order_no,t.remark,t.fee,t.is_refunded,t.refunded_amount,t.extra,t.metadata,t.req_id,t.currency,t.recv_balance_acct_id,t.payment_status,t.finished_at,t.sent_at,t.version,t.reason,t.batch_payment_id from payments t"etlCondition: "where t.created_at>={}"commitBatch: 3000indexSuffixfield: created_atindexSuffixPattern: yyyyMM
创建自定义索引扩展帮助类:
package com.alibaba.otter.canal.client.adapter.es.core.support;import com.alibaba.fastjson.JSONObject;import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import org.springframework.util.StringUtils;import java.text.SimpleDateFormat;import java.util.Map;public class ESMapppingPlus {private static Logger logger = LoggerFactory.getLogger(ESMapppingPlus.class);/*** 重构索引* @param esMapping* @param esFieldData*/public static void rebuildIndex(ESSyncConfig.ESMapping esMapping, Map<String, Object> esFieldData) {if (CollectionUtils.isEmpty(esFieldData)){return;}String indexSuffixfield = esMapping.getIndexSuffixfield();if (StringUtils.isEmpty(indexSuffixfield)) {//索引后缀字段不存在 不带后缀esMapping.set_index(esMapping.getIndexPrefix());return;}String indexSuffix = esFieldData.containsKey(indexSuffixfield) ? esFieldData.get(indexSuffixfield).toString() : null;if (StringUtils.isEmpty(indexSuffix)) {//未匹配到索引后缀字段logger.error("esMapping.indexSuffixfield not matched to dml field, index:{} ,indexSuffixfield:{}, sql:{}, esFieldData:{}",esMapping.getIndexPrefix(), indexSuffixfield, esMapping.getSql(), JSONObject.toJSONString(esFieldData));throw new RuntimeException("esMapping.indexSuffixfield not matched to dml field");}String indexSuffixPattern = esMapping.getIndexSuffixPattern();if (indexSuffix.indexOf("-") != -1) {//时间字符串格式indexSuffix = datePattern(indexSuffix, indexSuffixPattern);String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;esMapping.set_index(index);return;} else if(indexSuffix.length()==13){//时间戳格式SimpleDateFormat formatter = new SimpleDateFormat(indexSuffixPattern);indexSuffix = formatter.format(indexSuffix);String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;esMapping.set_index(index);return;} else {//未匹配到索引后缀字段logger.error("esMapping.indexSuffixPattern can't null, index:{}",esMapping.getIndexPrefix());throw new RuntimeException("esMapping.indexSuffixPattern can't null");}}/*** 重构索引* @param config* @param esFieldData*/public static void rebuildIndex(ESSyncConfig config, Map<String, Object> esFieldData) {rebuildIndex(config.getEsMapping(), esFieldData);}private static String datePattern(String date, String pattern) {switch (pattern) {case "yyyy" :return date.substring(0,4);case "yyyyMM":return date.substring(0,7).replaceAll("-", "");case "yyyyMMdd":return date.substring(0,10).replaceAll("-", "");case "yyyy-MM":return date.substring(0,7);case "yyyy-MM-dd":return date.substring(0,10);default:return date.substring(0,7).replaceAll("-", "");}}}
ES7xTemplate.java添加
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
�或者添加:
ESMapppingPlus.rebuildIndex(config, esFieldData);
修改后的完整代码:
package com.alibaba.otter.canal.client.adapter.es7x.support;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.HashMap;import java.util.LinkedHashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import javax.sql.DataSource;import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;import org.apache.commons.lang.StringUtils;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.cluster.metadata.MappingMetaData;import org.elasticsearch.index.query.BoolQueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESBulkResponse;import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESDeleteRequest;import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESIndexRequest;import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESUpdateRequest;import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;import com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection.ESSearchRequest;import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;import com.alibaba.otter.canal.client.adapter.support.Util;public class ES7xTemplate implements ESTemplate {private static final Logger logger = LoggerFactory.getLogger(ESTemplate.class);private static final int MAX_BATCH_SIZE = 1000;private ESConnection esConnection;private ESBulkRequest esBulkRequest;// es 字段类型本地缓存private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();public ES7xTemplate(ESConnection esConnection){this.esConnection = esConnection;this.esBulkRequest = this.esConnection.new ES7xBulkRequest();}public ESBulkRequest getBulk() {return esBulkRequest;}public void resetBulkRequestBuilder() {this.esBulkRequest.resetBulk();}@Overridepublic void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);if (mapping.get_id() != null) {String parentVal = (String) esFieldData.remove("$parent_routing");if (mapping.isUpsert()) {ESUpdateRequest updateRequest = esConnection.new ES7xUpdateRequest(mapping.get_index(),pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);if (StringUtils.isNotEmpty(parentVal)) {updateRequest.setRouting(parentVal);}getBulk().add(updateRequest);} else {ESIndexRequest indexRequest = esConnection.new ES7xIndexRequest(mapping.get_index(), pkVal.toString()).setSource(esFieldData);if (StringUtils.isNotEmpty(parentVal)) {indexRequest.setRouting(parentVal);}getBulk().add(indexRequest);}commitBulk();} else {ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);SearchResponse response = esSearchRequest.getResponse();for (SearchHit hit : response.getHits()) {ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),hit.getId()).setDoc(esFieldData);getBulk().add(esUpdateRequest);commitBulk();}}}@Overridepublic void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));append4Update(mapping, pkVal, esFieldDataTmp);commitBulk();}@Overridepublic void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(config, esFieldData);if (paramsTmp.isEmpty()) {return;}ESMapping mapping = config.getEsMapping();BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));// 查询sql批量更新DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");List<Object> values = new ArrayList<>();paramsTmp.forEach((fieldName, value) -> {sql.append("_v.").append(fieldName).append("=? AND ");values.add(value);});// TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件int len = sql.length();sql.delete(len - 4, len);Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {int count = 0;try {while (rs.next()) {Object idVal = getIdValFromRS(mapping, rs);append4Update(mapping, idVal, esFieldData);commitBulk();count++;}} catch (Exception e) {throw new RuntimeException(e);}return count;});if (logger.isTraceEnabled()) {logger.trace("Update ES by query affected {} records", syncCount);}}@Overridepublic void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);if (mapping.get_id() != null) {ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(),pkVal.toString());getBulk().add(esDeleteRequest);commitBulk();} else {ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);SearchResponse response = esSearchRequest.getResponse();for (SearchHit hit : response.getHits()) {ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),hit.getId()).setDoc(esFieldData);getBulk().add(esUpdateRequest);commitBulk();}}}@Overridepublic void commit() {if (getBulk().numberOfActions() > 0) {ESBulkResponse response = getBulk().bulk();if (response.hasFailures()) {response.processFailBulkResponse("ES sync commit error ");}resetBulkRequestBuilder();}}@Overridepublic Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,String columnName) throws SQLException {fieldName = Util.cleanColumn(fieldName);columnName = Util.cleanColumn(columnName);String esType = getEsType(mapping, fieldName);Object value = resultSet.getObject(columnName);if (value instanceof Boolean) {if (!"boolean".equals(esType)) {value = resultSet.getByte(columnName);}}// 如果是对象类型if (mapping.getObjFields().containsKey(fieldName)) {return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));} else {return ESSyncUtil.typeConvert(value, esType);}}@Overridepublic Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,Map<String, Object> esFieldData) throws SQLException {ESMapppingPlus.rebuildIndex(mapping, esFieldData);SchemaItem schemaItem = mapping.getSchemaItem();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();Object resultIdVal = null;for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());if (fieldItem.getFieldName().equals(idFieldName)) {resultIdVal = value;}if (!fieldItem.getFieldName().equals(mapping.get_id())&& !mapping.getSkips().contains(fieldItem.getFieldName())) {esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);}}// 添加父子文档关联信息putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);return resultIdVal;}@Overridepublic Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {SchemaItem schemaItem = mapping.getSchemaItem();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();Object resultIdVal = null;for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());if (fieldItem.getFieldName().equals(idFieldName)) {resultIdVal = value;break;}}return resultIdVal;}@Overridepublic Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,Map<String, Object> esFieldData) throws SQLException {ESMapppingPlus.rebuildIndex(mapping, esFieldData);SchemaItem schemaItem = mapping.getSchemaItem();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();Object resultIdVal = null;for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {if (fieldItem.getFieldName().equals(idFieldName)) {resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());}for (ColumnItem columnItem : fieldItem.getColumnItems()) {if (dmlOld.containsKey(columnItem.getColumnName())&& !mapping.getSkips().contains(fieldItem.getFieldName())) {esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));break;}}}// 添加父子文档关联信息putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);return resultIdVal;}@Overridepublic Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {String esType = getEsType(mapping, fieldName);Object value = dmlData.get(columnName);if (value instanceof Byte) {if ("boolean".equals(esType)) {value = ((Byte) value).intValue() != 0;}}// 如果是对象类型if (mapping.getObjFields().containsKey(fieldName)) {return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));} else {return ESSyncUtil.typeConvert(value, esType);}}@Overridepublic Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);SchemaItem schemaItem = mapping.getSchemaItem();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();Object resultIdVal = null;for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);if (fieldItem.getFieldName().equals(idFieldName)) {resultIdVal = value;}if (!fieldItem.getFieldName().equals(mapping.get_id())&& !mapping.getSkips().contains(fieldItem.getFieldName())) {esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);}}// 添加父子文档关联信息putRelationData(mapping, schemaItem, dmlData, esFieldData);return resultIdVal;}@Overridepublic Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);SchemaItem schemaItem = mapping.getSchemaItem();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();Object resultIdVal = null;for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();if (fieldItem.getFieldName().equals(idFieldName)) {resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);}if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));}}// 添加父子文档关联信息putRelationData(mapping, schemaItem, dmlOld, esFieldData);return resultIdVal;}/*** 如果大于批量数则提交批次*/private void commitBulk() {if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {commit();}}private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);if (mapping.get_id() != null) {String parentVal = (String) esFieldData.remove("$parent_routing");if (mapping.isUpsert()) {ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);if (StringUtils.isNotEmpty(parentVal)) {esUpdateRequest.setRouting(parentVal);}getBulk().add(esUpdateRequest);} else {ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),pkVal.toString()).setDoc(esFieldData);if (StringUtils.isNotEmpty(parentVal)) {esUpdateRequest.setRouting(parentVal);}getBulk().add(esUpdateRequest);}} else {ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);SearchResponse response = esSearchRequest.getResponse();for (SearchHit hit : response.getHits()) {ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),hit.getId()).setDoc(esFieldData);getBulk().add(esUpdateRequest);}}}/*** 获取es mapping中的属性类型** @param mapping mapping配置* @param fieldName 属性名* @return 类型*/@SuppressWarnings("unchecked")private String getEsType(ESMapping mapping, String fieldName) {String key = mapping.get_index() + "-" + mapping.get_type();Map<String, String> fieldType = esFieldTypes.get(key);if (fieldType != null) {return fieldType.get(fieldName);} else {// MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index());//模版获取MappingMetaData mappingMetaData = esConnection.getMapping(mapping.getIndexPrefix());if (mappingMetaData == null) {throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());}fieldType = new LinkedHashMap<>();Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");for (Map.Entry<String, Object> entry : esMapping.entrySet()) {Map<String, Object> value = (Map<String, Object>) entry.getValue();if (value.containsKey("properties")) {fieldType.put(entry.getKey(), "object");} else {fieldType.put(entry.getKey(), (String) value.get("type"));}}esFieldTypes.put(key, fieldType);return fieldType.get(fieldName);}}private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,Map<String, Object> esFieldData) {ESMapppingPlus.rebuildIndex(mapping, esFieldData);// 添加父子文档关联信息if (!mapping.getRelations().isEmpty()) {mapping.getRelations().forEach((relationField, relationMapping) -> {Map<String, Object> relations = new HashMap<>();relations.put("name", relationMapping.getName());if (StringUtils.isNotEmpty(relationMapping.getParent())) {FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());Object parentVal;try {parentVal = getValFromRS(mapping,resultSet,parentFieldItem.getFieldName(),parentFieldItem.getFieldName());} catch (SQLException e) {throw new RuntimeException(e);}if (parentVal != null) {relations.put("parent", parentVal.toString());esFieldData.put("$parent_routing", parentVal.toString());}}esFieldData.put(relationField, relations);});}}private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,Map<String, Object> esFieldData) {// 添加父子文档关联信息if (!mapping.getRelations().isEmpty()) {mapping.getRelations().forEach((relationField, relationMapping) -> {Map<String, Object> relations = new HashMap<>();relations.put("name", relationMapping.getName());if (StringUtils.isNotEmpty(relationMapping.getParent())) {FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);if (parentVal != null) {relations.put("parent", parentVal.toString());esFieldData.put("$parent_routing", parentVal.toString());}}esFieldData.put(relationField, relations);});}}}
修改ESSyncService.java文件:
同样添加:
ESMapppingPlus.rebuildIndex(config, data);
或者:
ESMapppingPlus.rebuildIndex(config, esFieldData);
完整修改如下:
package com.alibaba.otter.canal.client.adapter.es.core.service;import java.util.ArrayList;import java.util.Collection;import java.util.LinkedHashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.TableItem;import com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser;import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;import com.alibaba.otter.canal.client.adapter.support.Dml;import com.alibaba.otter.canal.client.adapter.support.Util;/*** ES 同步 Service** @author rewerma 2018-11-01* @version 1.0.0*/public class ESSyncService {private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);private ESTemplate esTemplate;public ESSyncService(ESTemplate esTemplate){this.esTemplate = esTemplate;}public void sync(Collection<ESSyncConfig> esSyncConfigs, Dml dml) {long begin = System.currentTimeMillis();if (esSyncConfigs != null) {if (logger.isTraceEnabled()) {logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",dml.getDestination(),dml.getDatabase(),dml.getTable(),dml.getType(),esSyncConfigs.size());}for (ESSyncConfig config : esSyncConfigs) {if (logger.isTraceEnabled()) {logger.trace("Prepared to sync index: {}, destination: {}",config.getEsMapping().get_index(),dml.getDestination());}this.sync(config, dml);if (logger.isTraceEnabled()) {logger.trace("Sync completed: {}, destination: {}",config.getEsMapping().get_index(),dml.getDestination());}}if (logger.isTraceEnabled()) {logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",(System.currentTimeMillis() - begin),esSyncConfigs.size(),dml.getDestination());}if (logger.isDebugEnabled()) {StringBuilder configIndexes = new StringBuilder();esSyncConfigs.forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));logger.debug("DML: {} \nAffected indexes: {}",JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),configIndexes.toString());}}}public void sync(ESSyncConfig config, Dml dml) {try {// 如果是按时间戳定时更新则返回if (config.getEsMapping().isSyncByTimestamp()) {return;}long begin = System.currentTimeMillis();String type = dml.getType();if (type != null && type.equalsIgnoreCase("INSERT")) {insert(config, dml);} else if (type != null && type.equalsIgnoreCase("UPDATE")) {update(config, dml);} else if (type != null && type.equalsIgnoreCase("DELETE")) {delete(config, dml);} else {return;}if (logger.isTraceEnabled()) {logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",(System.currentTimeMillis() - begin),dml.getDestination(),config.getEsMapping().get_index());}} catch (Throwable e) {logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);throw new RuntimeException(e);}}/*** 插入操作dml** @param config es配置* @param dml dml数据*/private void insert(ESSyncConfig config, Dml dml) {List<Map<String, Object>> dataList = dml.getData();if (dataList == null || dataList.isEmpty()) {return;}SchemaItem schemaItem = config.getEsMapping().getSchemaItem();for (Map<String, Object> data : dataList) {if (data == null || data.isEmpty()) {continue;}ESMapppingPlus.rebuildIndex(config, data);if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {// ------单表 & 所有字段都为简单字段------singleTableSimpleFiledInsert(config, dml, data);} else {// ------是主表 查询sql来插入------if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {mainTableInsert(config, dml, data);}// 从表的操作for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {if (tableItem.isMain()) {continue;}if (!tableItem.getTableName().equals(dml.getTable())) {continue;}// 关联条件出现在主表查询条件是否为简单字段boolean allFieldsSimple = true;for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {allFieldsSimple = false;break;}}// 所有查询字段均为简单字段if (allFieldsSimple) {// 不是子查询if (!tableItem.isSubQuery()) {// ------关联表简单字段插入------Map<String, Object> esFieldData = new LinkedHashMap<>();for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {Object value = esTemplate.getValFromData(config.getEsMapping(),data,fieldItem.getFieldName(),fieldItem.getColumn().getColumnName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);}joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);} else {// ------关联子表简单字段插入------subTableSimpleFieldOperation(config, dml, data, null, tableItem);}} else {// ------关联子表复杂字段插入 执行全sql更新es------wholeSqlOperation(config, dml, data, null, tableItem);}}}}}/*** 更新操作dml** @param config es配置* @param dml dml数据*/private void update(ESSyncConfig config, Dml dml) {List<Map<String, Object>> dataList = dml.getData();List<Map<String, Object>> oldList = dml.getOld();if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {return;}SchemaItem schemaItem = config.getEsMapping().getSchemaItem();int i = 0;for (Map<String, Object> data : dataList) {Map<String, Object> old = oldList.get(i);if (data == null || data.isEmpty() || old == null || old.isEmpty()) {continue;}ESMapppingPlus.rebuildIndex(config, data);if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {// ------单表 & 所有字段都为简单字段------singleTableSimpleFiledUpdate(config, dml, data, old);} else {// ------主表 查询sql来更新------if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {ESMapping mapping = config.getEsMapping();String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);boolean idFieldSimple = true;if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {idFieldSimple = false;}boolean allUpdateFieldSimple = true;out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {for (ColumnItem columnItem : fieldItem.getColumnItems()) {if (old.containsKey(columnItem.getColumnName())) {if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {allUpdateFieldSimple = false;break out;}}}}// 不支持主键更新!!// 判断是否有外键更新boolean fkChanged = false;for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {if (tableItem.isMain()) {continue;}boolean changed = false;for (List<FieldItem> fieldItems : tableItem.getRelationTableFields().values()) {for (FieldItem fieldItem : fieldItems) {if (old.containsKey(fieldItem.getColumn().getColumnName())) {fkChanged = true;changed = true;break;}}}// 如果外键有修改,则更新所对应该表的所有查询条件数据if (changed) {for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {fieldItem.getColumnItems().forEach(columnItem -> old.put(columnItem.getColumnName(), null));}}}// 判断主键和所更新的字段是否全为简单字段if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {singleTableSimpleFiledUpdate(config, dml, data, old);} else {mainTableUpdate(config, dml, data, old);}}// 从表的操作for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {if (tableItem.isMain()) {continue;}if (!tableItem.getTableName().equals(dml.getTable())) {continue;}// 关联条件出现在主表查询条件是否为简单字段boolean allFieldsSimple = true;for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {allFieldsSimple = false;break;}}// 所有查询字段均为简单字段if (allFieldsSimple) {// 不是子查询if (!tableItem.isSubQuery()) {// ------关联表简单字段更新------Map<String, Object> esFieldData = new LinkedHashMap<>();for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (old.containsKey(fieldItem.getColumn().getColumnName())) {Object value = esTemplate.getValFromData(config.getEsMapping(),data,fieldItem.getFieldName(),fieldItem.getColumn().getColumnName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);}}joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);} else {// ------关联子表简单字段更新------subTableSimpleFieldOperation(config, dml, data, old, tableItem);}} else {// ------关联子表复杂字段更新 执行全sql更新es------wholeSqlOperation(config, dml, data, old, tableItem);}}}i++;}}/*** 删除操作dml** @param config es配置* @param dml dml数据*/private void delete(ESSyncConfig config, Dml dml) {List<Map<String, Object>> dataList = dml.getData();if (dataList == null || dataList.isEmpty()) {return;}SchemaItem schemaItem = config.getEsMapping().getSchemaItem();for (Map<String, Object> data : dataList) {if (data == null || data.isEmpty()) {continue;}ESMapppingPlus.rebuildIndex(config, data);ESMapping mapping = config.getEsMapping();// ------是主表------if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {if (mapping.get_id() != null) {FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);// 主键为简单字段if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {Object idVal = esTemplate.getValFromData(mapping,data,idFieldItem.getFieldName(),idFieldItem.getColumn().getColumnName());if (logger.isTraceEnabled()) {logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.delete(mapping, idVal, null);} else {// ------主键带函数, 查询sql获取主键删除------// FIXME 删除时反查sql为空记录, 无法获获取 id field 值mainTableDelete(config, dml, data);}} else {FieldItem pkFieldItem = schemaItem.getIdFieldItem(mapping);if (!pkFieldItem.isMethod() && !pkFieldItem.isBinaryOp()) {Map<String, Object> esFieldData = new LinkedHashMap<>();Object pkVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);if (logger.isTraceEnabled()) {logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",config.getDestination(),dml.getTable(),mapping.get_index(),pkVal);}esFieldData.remove(pkFieldItem.getFieldName());esFieldData.keySet().forEach(key -> esFieldData.put(key, null));esTemplate.delete(mapping, pkVal, esFieldData);} else {// ------主键带函数, 查询sql获取主键删除------mainTableDelete(config, dml, data);}}}// 从表的操作for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {if (tableItem.isMain()) {continue;}if (!tableItem.getTableName().equals(dml.getTable())) {continue;}// 关联条件出现在主表查询条件是否为简单字段boolean allFieldsSimple = true;for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {allFieldsSimple = false;break;}}// 所有查询字段均为简单字段if (allFieldsSimple) {// 不是子查询if (!tableItem.isSubQuery()) {// ------关联表简单字段更新为null------Map<String, Object> esFieldData = new LinkedHashMap<>();for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), null);}joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);} else {// ------关联子表简单字段更新------subTableSimpleFieldOperation(config, dml, data, null, tableItem);}} else {// ------关联子表复杂字段更新 执行全sql更新es------wholeSqlOperation(config, dml, data, null, tableItem);}}}}/*** 单表简单字段insert** @param config es配置* @param dml dml信息* @param data 单行dml数据*/private void singleTableSimpleFiledInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {ESMapping mapping = config.getEsMapping();Map<String, Object> esFieldData = new LinkedHashMap<>();Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);if (logger.isTraceEnabled()) {logger.trace("Single table insert to es index, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.insert(mapping, idVal, esFieldData);}/*** 主表(单表)复杂字段insert** @param config es配置* @param dml dml信息* @param data 单行dml数据*/private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {ESMapping mapping = config.getEsMapping();String sql = mapping.getSql();String condition = ESSyncUtil.pkConditionSql(mapping, data);sql = ESSyncUtil.appendCondition(sql, condition);DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());if (logger.isTraceEnabled()) {logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",config.getDestination(),dml.getTable(),mapping.get_index(),sql.replace("\n", " "));}Util.sqlRS(ds, sql, rs -> {try {while (rs.next()) {Map<String, Object> esFieldData = new LinkedHashMap<>();Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);if (logger.isTraceEnabled()) {logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.insert(mapping, idVal, esFieldData);}} catch (Exception e) {throw new RuntimeException(e);}return 0;});}private void mainTableDelete(ESSyncConfig config, Dml dml, Map<String, Object> data) {ESMapping mapping = config.getEsMapping();String sql = mapping.getSql();String condition = ESSyncUtil.pkConditionSql(mapping, data);sql = ESSyncUtil.appendCondition(sql, condition);DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());if (logger.isTraceEnabled()) {logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",config.getDestination(),dml.getTable(),mapping.get_index(),sql.replace("\n", " "));}Util.sqlRS(ds, sql, rs -> {try {Map<String, Object> esFieldData = null;if (mapping.getPk() != null) {esFieldData = new LinkedHashMap<>();esTemplate.getESDataFromDmlData(mapping, data, esFieldData);esFieldData.remove(mapping.getPk());for (String key : esFieldData.keySet()) {esFieldData.put(Util.cleanColumn(key), null);}}while (rs.next()) {Object idVal = esTemplate.getIdValFromRS(mapping, rs);if (logger.isTraceEnabled()) {logger.trace("Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.delete(mapping, idVal, esFieldData);}} catch (Exception e) {throw new RuntimeException(e);}return 0;});}/*** 关联表主表简单字段operation** @param config es配置* @param dml dml信息* @param data 单行dml数据* @param tableItem 当前表配置*/private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,TableItem tableItem, Map<String, Object> esFieldData) {ESMapping mapping = config.getEsMapping();Map<String, Object> paramsTmp = new LinkedHashMap<>();for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {for (FieldItem fieldItem : entry.getValue()) {if (fieldItem.getColumnItems().size() == 1) {Object value = esTemplate.getValFromData(mapping,data,fieldItem.getFieldName(),entry.getKey().getColumn().getColumnName());String fieldName = fieldItem.getFieldName();// 判断是否是主键if (fieldName.equals(mapping.get_id())) {fieldName = "_id";}paramsTmp.put(fieldName, value);}}}if (logger.isDebugEnabled()) {logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",config.getDestination(),dml.getTable(),mapping.get_index());}esTemplate.updateByQuery(config, paramsTmp, esFieldData);}/*** 关联子查询, 主表简单字段operation** @param config es配置* @param dml dml信息* @param data 单行dml数据* @param old 单行old数据* @param tableItem 当前表配置*/private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,Map<String, Object> old, TableItem tableItem) {ESMapping mapping = config.getEsMapping();MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());StringBuilder sql = new StringBuilder();sql.append("SELECT ").append(SqlParser.parse4SQLSelectItem(queryBlock)).append(" FROM ").append(SqlParser.parse4FromTableSource(queryBlock));String whereSql = SqlParser.parse4WhereItem(queryBlock);if (whereSql != null) {sql.append(" WHERE ").append(whereSql);} else {sql.append(" WHERE 1=1 ");}List<Object> values = new ArrayList<>();for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {String columnName = fkFieldItem.getColumn().getColumnName();Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);sql.append(" AND ").append(columnName).append("=? ");values.add(value);}String groupSql = SqlParser.parse4GroupBy(queryBlock);if (groupSql != null) {sql.append(groupSql);}DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());if (logger.isTraceEnabled()) {logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",config.getDestination(),dml.getTable(),mapping.get_index(),sql.toString().replace("\n", " "));}Util.sqlRS(ds, sql.toString(), values, rs -> {try {while (rs.next()) {Map<String, Object> esFieldData = new LinkedHashMap<>();for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (old != null) {out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))for (ColumnItem columnItem : fieldItem1.getColumnItems()) {if (old.containsKey(columnItem.getColumnName())) {Object val = esTemplate.getValFromRS(mapping,rs,fieldItem.getFieldName(),fieldItem.getColumn().getColumnName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);break out;}}}}} else {Object val = esTemplate.getValFromRS(mapping,rs,fieldItem.getFieldName(),fieldItem.getColumn().getColumnName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);}}Map<String, Object> paramsTmp = new LinkedHashMap<>();for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {for (FieldItem fieldItem : entry.getValue()) {if (fieldItem.getColumnItems().size() == 1) {Object value = esTemplate.getValFromRS(mapping,rs,fieldItem.getFieldName(),entry.getKey().getColumn().getColumnName());String fieldName = fieldItem.getFieldName();// 判断是否是主键if (fieldName.equals(mapping.get_id())) {fieldName = "_id";}paramsTmp.put(fieldName, value);}}}if (logger.isDebugEnabled()) {logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",config.getDestination(),dml.getTable(),mapping.get_index());}esTemplate.updateByQuery(config, paramsTmp, esFieldData);}} catch (Exception e) {throw new RuntimeException(e);}return 0;});}/*** 关联(子查询), 主表复杂字段operation, 全sql执行** @param config es配置* @param dml dml信息* @param data 单行dml数据* @param tableItem 当前表配置*/private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,TableItem tableItem) {ESMapping mapping = config.getEsMapping();// 防止最后出现groupby 导致sql解析异常String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");String sqlNoWhere = sqlSplit[0];String sqlGroupBy = "";if (sqlSplit.length > 1) {sqlGroupBy = "GROUP BY " + sqlSplit[1];}StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {String columnName = fkFieldItem.getColumn().getColumnName();Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);}int len = sql.length();sql.delete(len - 5, len);sql.append(sqlGroupBy);DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());if (logger.isTraceEnabled()) {logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",config.getDestination(),dml.getTable(),mapping.get_index(),sql.toString().replace("\n", " "));}Util.sqlRS(ds, sql.toString(), rs -> {try {while (rs.next()) {Map<String, Object> esFieldData = new LinkedHashMap<>();for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {if (old != null) {// 从表子查询out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))for (ColumnItem columnItem : fieldItem1.getColumnItems()) {if (old.containsKey(columnItem.getColumnName())) {Object val = esTemplate.getValFromRS(mapping,rs,fieldItem.getFieldName(),fieldItem.getFieldName());esFieldData.put(fieldItem.getFieldName(), val);break out;}}}}// 从表非子查询for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {if (fieldItem1.equals(fieldItem)) {for (ColumnItem columnItem : fieldItem1.getColumnItems()) {if (old.containsKey(columnItem.getColumnName())) {Object val = esTemplate.getValFromRS(mapping,rs,fieldItem.getFieldName(),fieldItem.getFieldName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);break;}}}}} else {Object val = esTemplate.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);}}Map<String, Object> paramsTmp = new LinkedHashMap<>();for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {for (FieldItem fieldItem : entry.getValue()) {Object value = esTemplate.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());String fieldName = fieldItem.getFieldName();// 判断是否是主键if (fieldName.equals(mapping.get_id())) {fieldName = "_id";}paramsTmp.put(fieldName, value);}}if (logger.isDebugEnabled()) {logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}",config.getDestination(),dml.getTable(),mapping.get_index());}esTemplate.updateByQuery(config, paramsTmp, esFieldData);}} catch (Exception e) {throw new RuntimeException(e);}return 0;});}/*** 单表简单字段update** @param config es配置* @param dml dml信息* @param data 单行data数据* @param old 单行old数据*/private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,Map<String, Object> old) {ESMapping mapping = config.getEsMapping();Map<String, Object> esFieldData = new LinkedHashMap<>();Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);if (logger.isTraceEnabled()) {logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.update(mapping, idVal, esFieldData);}/*** 主表(单表)复杂字段update** @param config es配置* @param dml dml信息* @param data 单行dml数据*/private void mainTableUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old) {ESMapping mapping = config.getEsMapping();String sql = mapping.getSql();String condition = ESSyncUtil.pkConditionSql(mapping, data);sql = ESSyncUtil.appendCondition(sql, condition);DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());if (logger.isTraceEnabled()) {logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",config.getDestination(),dml.getTable(),mapping.get_index(),sql.replace("\n", " "));}Util.sqlRS(ds, sql, rs -> {try {while (rs.next()) {Map<String, Object> esFieldData = new LinkedHashMap<>();Object idVal = esTemplate.getESDataFromRS(mapping, rs, old, esFieldData);if (logger.isTraceEnabled()) {logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",config.getDestination(),dml.getTable(),mapping.get_index(),idVal);}esTemplate.update(mapping, idVal, esFieldData);}} catch (Exception e) {throw new RuntimeException(e);}return 0;});}/*** 提交批次*/public void commit() {esTemplate.commit();}}
