备注:该代码基于canal1.1.5
client-adapter:
resources下es7配置文件:
添加自定义配置:
indexSuffixfield: created_at #db table 创建时间
indexSuffixPattern: yyyyMM # 时间需要对应ES索引后缀格式
示例:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: payments
_id: _id
upsert: true
sql: "select t.payment_id as _id, t.payment_id, t.created_at,t.deleted_at,t.root_mch_id,t.agent_mch_id,t.bank_id,t.root_bank_id,t.cup_org_id,t.pay_balance_acct_id,t.amount,t.out_order_no,t.remark,t.fee,t.is_refunded,t.refunded_amount,t.extra,t.metadata,t.req_id,t.currency,t.recv_balance_acct_id,t.payment_status,t.finished_at,t.sent_at,t.version,t.reason,t.batch_payment_id from payments t"
etlCondition: "where t.created_at>={}"
commitBatch: 3000
indexSuffixfield: created_at
indexSuffixPattern: yyyyMM
创建自定义索引扩展帮助类:
package com.alibaba.otter.canal.client.adapter.es.core.support;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.text.SimpleDateFormat;
import java.util.Map;
public class ESMapppingPlus {
private static Logger logger = LoggerFactory.getLogger(ESMapppingPlus.class);
/**
* 重构索引
* @param esMapping
* @param esFieldData
*/
public static void rebuildIndex(ESSyncConfig.ESMapping esMapping, Map<String, Object> esFieldData) {
if (CollectionUtils.isEmpty(esFieldData)){
return;
}
String indexSuffixfield = esMapping.getIndexSuffixfield();
if (StringUtils.isEmpty(indexSuffixfield)) {
//索引后缀字段不存在 不带后缀
esMapping.set_index(esMapping.getIndexPrefix());
return;
}
String indexSuffix = esFieldData.containsKey(indexSuffixfield) ? esFieldData.get(indexSuffixfield).toString() : null;
if (StringUtils.isEmpty(indexSuffix)) {
//未匹配到索引后缀字段
logger.error("esMapping.indexSuffixfield not matched to dml field, index:{} ,indexSuffixfield:{}, sql:{}, esFieldData:{}",
esMapping.getIndexPrefix(), indexSuffixfield, esMapping.getSql(), JSONObject.toJSONString(esFieldData));
throw new RuntimeException("esMapping.indexSuffixfield not matched to dml field");
}
String indexSuffixPattern = esMapping.getIndexSuffixPattern();
if (indexSuffix.indexOf("-") != -1) {
//时间字符串格式
indexSuffix = datePattern(indexSuffix, indexSuffixPattern);
String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;
esMapping.set_index(index);
return;
} else if(indexSuffix.length()==13){
//时间戳格式
SimpleDateFormat formatter = new SimpleDateFormat(indexSuffixPattern);
indexSuffix = formatter.format(indexSuffix);
String index = esMapping.getIndexPrefix()+"-"+ indexSuffix;
esMapping.set_index(index);
return;
} else {
//未匹配到索引后缀字段
logger.error("esMapping.indexSuffixPattern can't null, index:{}",
esMapping.getIndexPrefix());
throw new RuntimeException("esMapping.indexSuffixPattern can't null");
}
}
/**
* 重构索引
* @param config
* @param esFieldData
*/
public static void rebuildIndex(ESSyncConfig config, Map<String, Object> esFieldData) {
rebuildIndex(config.getEsMapping(), esFieldData);
}
private static String datePattern(String date, String pattern) {
switch (pattern) {
case "yyyy" :
return date.substring(0,4);
case "yyyyMM":
return date.substring(0,7).replaceAll("-", "");
case "yyyyMMdd":
return date.substring(0,10).replaceAll("-", "");
case "yyyy-MM":
return date.substring(0,7);
case "yyyy-MM-dd":
return date.substring(0,10);
default:
return date.substring(0,7).replaceAll("-", "");
}
}
}
ES7xTemplate.java添加
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
�或者添加:
ESMapppingPlus.rebuildIndex(config, esFieldData);
修改后的完整代码:
package com.alibaba.otter.canal.client.adapter.es7x.support;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESBulkResponse;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESDeleteRequest;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESIndexRequest;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESUpdateRequest;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
import com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection.ESSearchRequest;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Util;
public class ES7xTemplate implements ESTemplate {
private static final Logger logger = LoggerFactory
.getLogger(ESTemplate.class);
private static final int MAX_BATCH_SIZE = 1000;
private ESConnection esConnection;
private ESBulkRequest esBulkRequest;
// es 字段类型本地缓存
private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
public ES7xTemplate(ESConnection esConnection){
this.esConnection = esConnection;
this.esBulkRequest = this.esConnection.new ES7xBulkRequest();
}
public ESBulkRequest getBulk() {
return esBulkRequest;
}
public void resetBulkRequestBuilder() {
this.esBulkRequest.resetBulk();
}
@Override
public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
if (mapping.get_id() != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
ESUpdateRequest updateRequest = esConnection.new ES7xUpdateRequest(mapping.get_index(),
pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
if (StringUtils.isNotEmpty(parentVal)) {
updateRequest.setRouting(parentVal);
}
getBulk().add(updateRequest);
} else {
ESIndexRequest indexRequest = esConnection.new ES7xIndexRequest(mapping.get_index(), pkVal.toString())
.setSource(esFieldData);
if (StringUtils.isNotEmpty(parentVal)) {
indexRequest.setRouting(parentVal);
}
getBulk().add(indexRequest);
}
commitBulk();
} else {
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
.setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
.size(10000);
SearchResponse response = esSearchRequest.getResponse();
for (SearchHit hit : response.getHits()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
hit.getId()).setDoc(esFieldData);
getBulk().add(esUpdateRequest);
commitBulk();
}
}
}
@Override
public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());
esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));
append4Update(mapping, pkVal, esFieldDataTmp);
commitBulk();
}
@Override
public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(config, esFieldData);
if (paramsTmp.isEmpty()) {
return;
}
ESMapping mapping = config.getEsMapping();
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
// 查询sql批量更新
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
List<Object> values = new ArrayList<>();
paramsTmp.forEach((fieldName, value) -> {
sql.append("_v.").append(fieldName).append("=? AND ");
values.add(value);
});
// TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
int len = sql.length();
sql.delete(len - 4, len);
Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {
int count = 0;
try {
while (rs.next()) {
Object idVal = getIdValFromRS(mapping, rs);
append4Update(mapping, idVal, esFieldData);
commitBulk();
count++;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return count;
});
if (logger.isTraceEnabled()) {
logger.trace("Update ES by query affected {} records", syncCount);
}
}
@Override
public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
if (mapping.get_id() != null) {
ESDeleteRequest esDeleteRequest = this.esConnection.new ES7xDeleteRequest(mapping.get_index(),
pkVal.toString());
getBulk().add(esDeleteRequest);
commitBulk();
} else {
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
.setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
.size(10000);
SearchResponse response = esSearchRequest.getResponse();
for (SearchHit hit : response.getHits()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
hit.getId()).setDoc(esFieldData);
getBulk().add(esUpdateRequest);
commitBulk();
}
}
}
@Override
public void commit() {
if (getBulk().numberOfActions() > 0) {
ESBulkResponse response = getBulk().bulk();
if (response.hasFailures()) {
response.processFailBulkResponse("ES sync commit error ");
}
resetBulkRequestBuilder();
}
}
@Override
public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
String columnName) throws SQLException {
fieldName = Util.cleanColumn(fieldName);
columnName = Util.cleanColumn(columnName);
String esType = getEsType(mapping, fieldName);
Object value = resultSet.getObject(columnName);
if (value instanceof Boolean) {
if (!"boolean".equals(esType)) {
value = resultSet.getByte(columnName);
}
}
// 如果是对象类型
if (mapping.getObjFields().containsKey(fieldName)) {
return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
} else {
return ESSyncUtil.typeConvert(value, esType);
}
}
@Override
public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
Map<String, Object> esFieldData) throws SQLException {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
SchemaItem schemaItem = mapping.getSchemaItem();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
Object resultIdVal = null;
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
if (fieldItem.getFieldName().equals(idFieldName)) {
resultIdVal = value;
}
if (!fieldItem.getFieldName().equals(mapping.get_id())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
}
}
// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
return resultIdVal;
}
@Override
public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
SchemaItem schemaItem = mapping.getSchemaItem();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
Object resultIdVal = null;
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
if (fieldItem.getFieldName().equals(idFieldName)) {
resultIdVal = value;
break;
}
}
return resultIdVal;
}
@Override
public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
Map<String, Object> esFieldData) throws SQLException {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
SchemaItem schemaItem = mapping.getSchemaItem();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
Object resultIdVal = null;
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
if (fieldItem.getFieldName().equals(idFieldName)) {
resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
}
for (ColumnItem columnItem : fieldItem.getColumnItems()) {
if (dmlOld.containsKey(columnItem.getColumnName())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
break;
}
}
}
// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
return resultIdVal;
}
@Override
public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
String esType = getEsType(mapping, fieldName);
Object value = dmlData.get(columnName);
if (value instanceof Byte) {
if ("boolean".equals(esType)) {
value = ((Byte) value).intValue() != 0;
}
}
// 如果是对象类型
if (mapping.getObjFields().containsKey(fieldName)) {
return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
} else {
return ESSyncUtil.typeConvert(value, esType);
}
}
@Override
public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
SchemaItem schemaItem = mapping.getSchemaItem();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
Object resultIdVal = null;
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
if (fieldItem.getFieldName().equals(idFieldName)) {
resultIdVal = value;
}
if (!fieldItem.getFieldName().equals(mapping.get_id())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
}
}
// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlData, esFieldData);
return resultIdVal;
}
@Override
public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
SchemaItem schemaItem = mapping.getSchemaItem();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
Object resultIdVal = null;
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
if (fieldItem.getFieldName().equals(idFieldName)) {
resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
}
if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
}
}
// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlOld, esFieldData);
return resultIdVal;
}
/**
* 如果大于批量数则提交批次
*/
private void commitBulk() {
if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
commit();
}
}
private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
if (mapping.get_id() != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
if (StringUtils.isNotEmpty(parentVal)) {
esUpdateRequest.setRouting(parentVal);
}
getBulk().add(esUpdateRequest);
} else {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
pkVal.toString()).setDoc(esFieldData);
if (StringUtils.isNotEmpty(parentVal)) {
esUpdateRequest.setRouting(parentVal);
}
getBulk().add(esUpdateRequest);
}
} else {
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
.setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
.size(10000);
SearchResponse response = esSearchRequest.getResponse();
for (SearchHit hit : response.getHits()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
hit.getId()).setDoc(esFieldData);
getBulk().add(esUpdateRequest);
}
}
}
/**
* 获取es mapping中的属性类型
*
* @param mapping mapping配置
* @param fieldName 属性名
* @return 类型
*/
@SuppressWarnings("unchecked")
private String getEsType(ESMapping mapping, String fieldName) {
String key = mapping.get_index() + "-" + mapping.get_type();
Map<String, String> fieldType = esFieldTypes.get(key);
if (fieldType != null) {
return fieldType.get(fieldName);
} else {
// MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index());
//模版获取
MappingMetaData mappingMetaData = esConnection.getMapping(mapping.getIndexPrefix());
if (mappingMetaData == null) {
throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
}
fieldType = new LinkedHashMap<>();
Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
Map<String, Object> value = (Map<String, Object>) entry.getValue();
if (value.containsKey("properties")) {
fieldType.put(entry.getKey(), "object");
} else {
fieldType.put(entry.getKey(), (String) value.get("type"));
}
}
esFieldTypes.put(key, fieldType);
return fieldType.get(fieldName);
}
}
private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
Map<String, Object> esFieldData) {
ESMapppingPlus.rebuildIndex(mapping, esFieldData);
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
Object parentVal;
try {
parentVal = getValFromRS(mapping,
resultSet,
parentFieldItem.getFieldName(),
parentFieldItem.getFieldName());
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
}
}
esFieldData.put(relationField, relations);
});
}
}
private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
Map<String, Object> esFieldData) {
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
}
}
esFieldData.put(relationField, relations);
});
}
}
}
修改ESSyncService.java文件:
同样添加:
ESMapppingPlus.rebuildIndex(config, data);
或者:
ESMapppingPlus.rebuildIndex(config, esFieldData);
完整修改如下:
package com.alibaba.otter.canal.client.adapter.es.core.service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESMapppingPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.TableItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;
import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.Util;
/**
* ES 同步 Service
*
* @author rewerma 2018-11-01
* @version 1.0.0
*/
public class ESSyncService {
private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
private ESTemplate esTemplate;
public ESSyncService(ESTemplate esTemplate){
this.esTemplate = esTemplate;
}
public void sync(Collection<ESSyncConfig> esSyncConfigs, Dml dml) {
long begin = System.currentTimeMillis();
if (esSyncConfigs != null) {
if (logger.isTraceEnabled()) {
logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",
dml.getDestination(),
dml.getDatabase(),
dml.getTable(),
dml.getType(),
esSyncConfigs.size());
}
for (ESSyncConfig config : esSyncConfigs) {
if (logger.isTraceEnabled()) {
logger.trace("Prepared to sync index: {}, destination: {}",
config.getEsMapping().get_index(),
dml.getDestination());
}
this.sync(config, dml);
if (logger.isTraceEnabled()) {
logger.trace("Sync completed: {}, destination: {}",
config.getEsMapping().get_index(),
dml.getDestination());
}
}
if (logger.isTraceEnabled()) {
logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",
(System.currentTimeMillis() - begin),
esSyncConfigs.size(),
dml.getDestination());
}
if (logger.isDebugEnabled()) {
StringBuilder configIndexes = new StringBuilder();
esSyncConfigs
.forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
logger.debug("DML: {} \nAffected indexes: {}",
JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
configIndexes.toString());
}
}
}
public void sync(ESSyncConfig config, Dml dml) {
try {
// 如果是按时间戳定时更新则返回
if (config.getEsMapping().isSyncByTimestamp()) {
return;
}
long begin = System.currentTimeMillis();
String type = dml.getType();
if (type != null && type.equalsIgnoreCase("INSERT")) {
insert(config, dml);
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
update(config, dml);
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
delete(config, dml);
} else {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
(System.currentTimeMillis() - begin),
dml.getDestination(),
config.getEsMapping().get_index());
}
} catch (Throwable e) {
logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
throw new RuntimeException(e);
}
}
/**
* 插入操作dml
*
* @param config es配置
* @param dml dml数据
*/
private void insert(ESSyncConfig config, Dml dml) {
List<Map<String, Object>> dataList = dml.getData();
if (dataList == null || dataList.isEmpty()) {
return;
}
SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
for (Map<String, Object> data : dataList) {
if (data == null || data.isEmpty()) {
continue;
}
ESMapppingPlus.rebuildIndex(config, data);
if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
// ------单表 & 所有字段都为简单字段------
singleTableSimpleFiledInsert(config, dml, data);
} else {
// ------是主表 查询sql来插入------
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
mainTableInsert(config, dml, data);
}
// 从表的操作
for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
if (tableItem.isMain()) {
continue;
}
if (!tableItem.getTableName().equals(dml.getTable())) {
continue;
}
// 关联条件出现在主表查询条件是否为简单字段
boolean allFieldsSimple = true;
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
allFieldsSimple = false;
break;
}
}
// 所有查询字段均为简单字段
if (allFieldsSimple) {
// 不是子查询
if (!tableItem.isSubQuery()) {
// ------关联表简单字段插入------
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
Object value = esTemplate.getValFromData(config.getEsMapping(),
data,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
}
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
} else {
// ------关联子表简单字段插入------
subTableSimpleFieldOperation(config, dml, data, null, tableItem);
}
} else {
// ------关联子表复杂字段插入 执行全sql更新es------
wholeSqlOperation(config, dml, data, null, tableItem);
}
}
}
}
}
/**
* 更新操作dml
*
* @param config es配置
* @param dml dml数据
*/
private void update(ESSyncConfig config, Dml dml) {
List<Map<String, Object>> dataList = dml.getData();
List<Map<String, Object>> oldList = dml.getOld();
if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
return;
}
SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
int i = 0;
for (Map<String, Object> data : dataList) {
Map<String, Object> old = oldList.get(i);
if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
continue;
}
ESMapppingPlus.rebuildIndex(config, data);
if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
// ------单表 & 所有字段都为简单字段------
singleTableSimpleFiledUpdate(config, dml, data, old);
} else {
// ------主表 查询sql来更新------
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
ESMapping mapping = config.getEsMapping();
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
boolean idFieldSimple = true;
if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
idFieldSimple = false;
}
boolean allUpdateFieldSimple = true;
out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
for (ColumnItem columnItem : fieldItem.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
allUpdateFieldSimple = false;
break out;
}
}
}
}
// 不支持主键更新!!
// 判断是否有外键更新
boolean fkChanged = false;
for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
if (tableItem.isMain()) {
continue;
}
boolean changed = false;
for (List<FieldItem> fieldItems : tableItem.getRelationTableFields().values()) {
for (FieldItem fieldItem : fieldItems) {
if (old.containsKey(fieldItem.getColumn().getColumnName())) {
fkChanged = true;
changed = true;
break;
}
}
}
// 如果外键有修改,则更新所对应该表的所有查询条件数据
if (changed) {
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
fieldItem.getColumnItems()
.forEach(columnItem -> old.put(columnItem.getColumnName(), null));
}
}
}
// 判断主键和所更新的字段是否全为简单字段
if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
singleTableSimpleFiledUpdate(config, dml, data, old);
} else {
mainTableUpdate(config, dml, data, old);
}
}
// 从表的操作
for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
if (tableItem.isMain()) {
continue;
}
if (!tableItem.getTableName().equals(dml.getTable())) {
continue;
}
// 关联条件出现在主表查询条件是否为简单字段
boolean allFieldsSimple = true;
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
allFieldsSimple = false;
break;
}
}
// 所有查询字段均为简单字段
if (allFieldsSimple) {
// 不是子查询
if (!tableItem.isSubQuery()) {
// ------关联表简单字段更新------
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (old.containsKey(fieldItem.getColumn().getColumnName())) {
Object value = esTemplate.getValFromData(config.getEsMapping(),
data,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
}
}
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
} else {
// ------关联子表简单字段更新------
subTableSimpleFieldOperation(config, dml, data, old, tableItem);
}
} else {
// ------关联子表复杂字段更新 执行全sql更新es------
wholeSqlOperation(config, dml, data, old, tableItem);
}
}
}
i++;
}
}
/**
* 删除操作dml
*
* @param config es配置
* @param dml dml数据
*/
private void delete(ESSyncConfig config, Dml dml) {
List<Map<String, Object>> dataList = dml.getData();
if (dataList == null || dataList.isEmpty()) {
return;
}
SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
for (Map<String, Object> data : dataList) {
if (data == null || data.isEmpty()) {
continue;
}
ESMapppingPlus.rebuildIndex(config, data);
ESMapping mapping = config.getEsMapping();
// ------是主表------
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
if (mapping.get_id() != null) {
FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
// 主键为简单字段
if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
Object idVal = esTemplate.getValFromData(mapping,
data,
idFieldItem.getFieldName(),
idFieldItem.getColumn().getColumnName());
if (logger.isTraceEnabled()) {
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.delete(mapping, idVal, null);
} else {
// ------主键带函数, 查询sql获取主键删除------
// FIXME 删除时反查sql为空记录, 无法获获取 id field 值
mainTableDelete(config, dml, data);
}
} else {
FieldItem pkFieldItem = schemaItem.getIdFieldItem(mapping);
if (!pkFieldItem.isMethod() && !pkFieldItem.isBinaryOp()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object pkVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
if (logger.isTraceEnabled()) {
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
pkVal);
}
esFieldData.remove(pkFieldItem.getFieldName());
esFieldData.keySet().forEach(key -> esFieldData.put(key, null));
esTemplate.delete(mapping, pkVal, esFieldData);
} else {
// ------主键带函数, 查询sql获取主键删除------
mainTableDelete(config, dml, data);
}
}
}
// 从表的操作
for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
if (tableItem.isMain()) {
continue;
}
if (!tableItem.getTableName().equals(dml.getTable())) {
continue;
}
// 关联条件出现在主表查询条件是否为简单字段
boolean allFieldsSimple = true;
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
allFieldsSimple = false;
break;
}
}
// 所有查询字段均为简单字段
if (allFieldsSimple) {
// 不是子查询
if (!tableItem.isSubQuery()) {
// ------关联表简单字段更新为null------
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), null);
}
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
} else {
// ------关联子表简单字段更新------
subTableSimpleFieldOperation(config, dml, data, null, tableItem);
}
} else {
// ------关联子表复杂字段更新 执行全sql更新es------
wholeSqlOperation(config, dml, data, null, tableItem);
}
}
}
}
/**
* 单表简单字段insert
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
*/
private void singleTableSimpleFiledInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
ESMapping mapping = config.getEsMapping();
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
if (logger.isTraceEnabled()) {
logger.trace("Single table insert to es index, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.insert(mapping, idVal, esFieldData);
}
/**
* 主表(单表)复杂字段insert
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
*/
private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
ESMapping mapping = config.getEsMapping();
String sql = mapping.getSql();
String condition = ESSyncUtil.pkConditionSql(mapping, data);
sql = ESSyncUtil.appendCondition(sql, condition);
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.replace("\n", " "));
}
Util.sqlRS(ds, sql, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
if (logger.isTraceEnabled()) {
logger.trace(
"Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.insert(mapping, idVal, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
private void mainTableDelete(ESSyncConfig config, Dml dml, Map<String, Object> data) {
ESMapping mapping = config.getEsMapping();
String sql = mapping.getSql();
String condition = ESSyncUtil.pkConditionSql(mapping, data);
sql = ESSyncUtil.appendCondition(sql, condition);
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.replace("\n", " "));
}
Util.sqlRS(ds, sql, rs -> {
try {
Map<String, Object> esFieldData = null;
if (mapping.getPk() != null) {
esFieldData = new LinkedHashMap<>();
esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
esFieldData.remove(mapping.getPk());
for (String key : esFieldData.keySet()) {
esFieldData.put(Util.cleanColumn(key), null);
}
}
while (rs.next()) {
Object idVal = esTemplate.getIdValFromRS(mapping, rs);
if (logger.isTraceEnabled()) {
logger.trace(
"Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.delete(mapping, idVal, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
/**
* 关联表主表简单字段operation
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
* @param tableItem 当前表配置
*/
private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
TableItem tableItem, Map<String, Object> esFieldData) {
ESMapping mapping = config.getEsMapping();
Map<String, Object> paramsTmp = new LinkedHashMap<>();
for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
for (FieldItem fieldItem : entry.getValue()) {
if (fieldItem.getColumnItems().size() == 1) {
Object value = esTemplate.getValFromData(mapping,
data,
fieldItem.getFieldName(),
entry.getKey().getColumn().getColumnName());
String fieldName = fieldItem.getFieldName();
// 判断是否是主键
if (fieldName.equals(mapping.get_id())) {
fieldName = "_id";
}
paramsTmp.put(fieldName, value);
}
}
}
if (logger.isDebugEnabled()) {
logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index());
}
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
}
/**
* 关联子查询, 主表简单字段operation
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
* @param old 单行old数据
* @param tableItem 当前表配置
*/
private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
Map<String, Object> old, TableItem tableItem) {
ESMapping mapping = config.getEsMapping();
MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
StringBuilder sql = new StringBuilder();
sql.append("SELECT ")
.append(SqlParser.parse4SQLSelectItem(queryBlock))
.append(" FROM ")
.append(SqlParser.parse4FromTableSource(queryBlock));
String whereSql = SqlParser.parse4WhereItem(queryBlock);
if (whereSql != null) {
sql.append(" WHERE ").append(whereSql);
} else {
sql.append(" WHERE 1=1 ");
}
List<Object> values = new ArrayList<>();
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
String columnName = fkFieldItem.getColumn().getColumnName();
Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
sql.append(" AND ").append(columnName).append("=? ");
values.add(value);
}
String groupSql = SqlParser.parse4GroupBy(queryBlock);
if (groupSql != null) {
sql.append(groupSql);
}
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.toString().replace("\n", " "));
}
Util.sqlRS(ds, sql.toString(), values, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (old != null) {
out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
break out;
}
}
}
}
} else {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
}
}
Map<String, Object> paramsTmp = new LinkedHashMap<>();
for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
for (FieldItem fieldItem : entry.getValue()) {
if (fieldItem.getColumnItems().size() == 1) {
Object value = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
entry.getKey().getColumn().getColumnName());
String fieldName = fieldItem.getFieldName();
// 判断是否是主键
if (fieldName.equals(mapping.get_id())) {
fieldName = "_id";
}
paramsTmp.put(fieldName, value);
}
}
}
if (logger.isDebugEnabled()) {
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index());
}
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
/**
* 关联(子查询), 主表复杂字段operation, 全sql执行
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
* @param tableItem 当前表配置
*/
private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
TableItem tableItem) {
ESMapping mapping = config.getEsMapping();
// 防止最后出现groupby 导致sql解析异常
String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");
String sqlNoWhere = sqlSplit[0];
String sqlGroupBy = "";
if (sqlSplit.length > 1) {
sqlGroupBy = "GROUP BY " + sqlSplit[1];
}
StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
String columnName = fkFieldItem.getColumn().getColumnName();
Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
}
int len = sql.length();
sql.delete(len - 5, len);
sql.append(sqlGroupBy);
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.toString().replace("\n", " "));
}
Util.sqlRS(ds, sql.toString(), rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (old != null) {
// 从表子查询
out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getFieldName());
esFieldData.put(fieldItem.getFieldName(), val);
break out;
}
}
}
}
// 从表非子查询
for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {
if (fieldItem1.equals(fieldItem)) {
for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
if (old.containsKey(columnItem.getColumnName())) {
Object val = esTemplate.getValFromRS(mapping,
rs,
fieldItem.getFieldName(),
fieldItem.getFieldName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
break;
}
}
}
}
} else {
Object val = esTemplate
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
}
}
Map<String, Object> paramsTmp = new LinkedHashMap<>();
for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
for (FieldItem fieldItem : entry.getValue()) {
Object value = esTemplate
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
String fieldName = fieldItem.getFieldName();
// 判断是否是主键
if (fieldName.equals(mapping.get_id())) {
fieldName = "_id";
}
paramsTmp.put(fieldName, value);
}
}
if (logger.isDebugEnabled()) {
logger.trace(
"Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index());
}
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
/**
* 单表简单字段update
*
* @param config es配置
* @param dml dml信息
* @param data 单行data数据
* @param old 单行old数据
*/
private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,
Map<String, Object> old) {
ESMapping mapping = config.getEsMapping();
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
if (logger.isTraceEnabled()) {
logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.update(mapping, idVal, esFieldData);
}
/**
* 主表(单表)复杂字段update
*
* @param config es配置
* @param dml dml信息
* @param data 单行dml数据
*/
private void mainTableUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old) {
ESMapping mapping = config.getEsMapping();
String sql = mapping.getSql();
String condition = ESSyncUtil.pkConditionSql(mapping, data);
sql = ESSyncUtil.appendCondition(sql, condition);
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (logger.isTraceEnabled()) {
logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
sql.replace("\n", " "));
}
Util.sqlRS(ds, sql, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = esTemplate.getESDataFromRS(mapping, rs, old, esFieldData);
if (logger.isTraceEnabled()) {
logger.trace(
"Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
config.getDestination(),
dml.getTable(),
mapping.get_index(),
idVal);
}
esTemplate.update(mapping, idVal, esFieldData);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
});
}
/**
* 提交批次
*/
public void commit() {
esTemplate.commit();
}
}