- FilterPushDown:
* [LimitPushDown:](#limitpushdown)
 
ElasticSearch source design documention
Background and Motivation
- the connector of flink with ES just support sink but doesn’t support source, some company truly has the scenario of reading elasticsearch
 - We propose a throughout es source connector implementation in Flink.
 
Scope
- We’ll design and develop a es source connector with the following features:
- Es as bounded source
 - support multiple es version(5/6/7)
 - support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance
 
 
Overall Design
- We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
 - implementation DynamicTableSource to support sql
 
ElasticsearchInputSplit
- we split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.

 
ElasticsearchRowDataInputformat
public class ElasticSearchRowFormat
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchRowDataInputFormat.class);
//for parser json data
private DeserializationSchema
// for FilterPushDown
private QueryBuilder predict;
private String index;
private SearchHit[] currentScrollWindowHits = null;
private int nextRecordIndex = 0;
private transient long currentReadCount = 0L;
// fir LimitPushDown to limit data to fecth
private long limit;
private final int scrollTimeout;
private final int scrollSize;
// ————————————————————————————————————
// User-facing API and configuration
// ————————————————————————————————————
/**
The config map that contains configuration for the bulk flushing behaviours.
*
For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config
map would also contain Elasticsearch-shipped configuration, and therefore this config map
would also be forwarded when creating the Elasticsearch client.
*/
private final Map
// ————————————————————————————————————
// Internals for the Flink Elasticsearch Sink
// ————————————————————————————————————
/* Call bridge for different version-specific. /
private final ElasticsearchApiCallBridge
/* Elasticsearch client created using the call bridge. /
private transient C client;
public ElasticSearchRowInputFormat(
ElasticsearchApiCallBridge<C> callBridge,Map<String, String> userConfig,DeserializationSchema<RowData> serializationSchema,QueryBuilder queryBuilder,String index,long limit) {this.serializationSchema = serializationSchema;this.queryBuilder = queryBuilder;this.index = index;this.limit = limit;this.callBridge = checkNotNull(callBridge);checkNotNull(userConfig);// copy config so we can remove entries without side-effectsthis.userConfig = new HashMap<>(userConfig);//todo
}
@Override
public void configure(Configuration parameters) {
client = callBridge.createClient(userConfig);callBridge.verifyClientConnection(client);//todo
}
@Override
public void open(ElasticsearchInputSplit split) throws IOException {
LOG.info("Opening split: {}", split);SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type).setSearchType(QUERY_THEN_FETCH).setScroll(new TimeValue(scrollTimeout.toMillis())).setFetchSource(fields).setQuery(predict).setPreference("_shards:" + shard).setSize(size);SearchRequest searchRequest searchRequestBuilder.request()SearchResponse searchResponse = null;try {searchResponse = client.search(searchRequest);} catch (IOException e) {LOG.error("Search has error: {}", e.getMessage());}if (searchResponse != null) {currentScrollWindowId = searchResponse.getScrollId();currentScrollWindowHits = searchResponse.getHits().getHits();nextRecordIndex = 0;}
}
@Override
public ElasticsearchInputSplit[] createInputSplits(int minNumSplits) throws IOException {
return callBridge.createInputSplitsInternal(client, minNumSplits);
}
@Override
public InputSplitAssigner getInputSplitAssigner(ElasticsearchInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}
@Override
public boolean reachedEnd() throws IOException {
if (limit > 0 && currentReadCount >= limit) {return true;}if (currentScrollWindowHits.length > 0 && nextRecordIndex > currentScrollWindowHits.length - 1) {fetchNextScrollWindow();}return currentScrollWindowHits.length == 0;
}
@Override
public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {LOG.warn("Already reached the end of the split.");}SearchHit hit = currentScrollWindowHits[nextRecordIndex];nextRecordIndex += 1;currentReadCount++;LOG.debug("Yielding new record for hit: {}", hit);return parseSearchHit(hit);
}
}

Adapt to different es versions
- In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add two methods in ElasticsearchApiCallBridge
 
interface ElasticsearchApiCallBridge {
//this is for getSearchShards for different es client
ElasticsearchInputSplit[] createInputSplitsInternal(String index, C client, int minNumSplits);
SearchResponse search(C client, SearchRequest searchRequest) throws IOException;
SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;
void close(C client) throws IOException;
}
- take search as an exmaple- for Elasticsearch5ApiCallBridge```java//for Elasticsearch5ApiCallBridge@Overridepublic SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {//convert elasticsearchBuilder to SearchRequestBuilerreturn client.search(searchRequest).actionGet();}
- for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
 
//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
@Override
public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {
//convert elasticsearchBuilder to SearchRequest
return client.search(searchRequest);
}
how to construct SearchRequest
//converts ElasticSearchBuilder to SearchRequestBuilder in es 5 for TransportClient api
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indices)
.setTypes(type).setSearchType(QUERY_THEN_FETCH).setScroll(new TimeValue(scrollTimeout.toMillis())).setFetchSource(fields.toArray(new String[0]), null).setQuery(BoolQueryBuilder).setPreference("_shards:" + shard).addSort("_doc", ASC).setSize(scrollSize);
SearchRequest searchRequest searchRequestBuilder.request()
DDL and configuration
- for FLIP-122 New Connector Property Keys for New Factory
- for es DynamicTableSource
 
 
CREATE TABLE es_table (...) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9092','index' = 'MyIndex','document-type' = 'MyType''scan.xx'= 'xx',);
ElasticSearchTableSource

FilterPushDown and ProjectPushDown and LimitPushDown
FilterPushDown:
- extract List, which can be pushdown to es and convert to es BoolQueryBuilder
 
@Override
public TableSource applyPredicate(List
BoolQueryBuilder builder = new BoolQueryBuilder();// try to convert Flink filter expressions to Parquet FilterPredicatesList<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());for (Expression toConvert : predicates) {if(toElasticSearchQueryBuilder(toConvert, builder) == null) {unsupportedExpressions.add(toConvert);}}// update list of Flink expressions to unsupported expressionspredicates.clear();predicates.addAll(unsupportedExpressions);// create and return a new ParquetTableSource with Parquet FilterPredicatereturn new ElasticsearchTableSource(schema, hosts, index, deserializer, sourceOptions, projectedFields, builder);
}
private boolean isValid(BinaryComparison comp) {
return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||(comp.left() instanceof Attribute && comp.right() instanceof Literal);
}
/**
Converts Flink Expression to ElasticSearch FilterPredicate.
*/
@Nullable
private QueryBuilder toElasticSearchQueryBuilder(Expression exp, BoolQueryBuilder builder) {
if (exp instanceof Not) {QueryBuilder c = toElasticSearchQueryBuilder(((Not) exp).child(), builder);if (c == null) {return null;} else {builder.mustNot(c);}} else if (exp instanceof BinaryComparison) {BinaryComparison binComp = (BinaryComparison) exp;if (!isValid(binComp)) {// unsupported literal TypeLOG.debug("Unsupported predict [{}] cannot be pushed to ElasticSearchTableSource.", exp);return null;}boolean onRight = literalOnRight(binComp);String columnName = getColumnName(binComp);Object value = getLiteral(binComp);if (exp instanceof EqualTo) {return builder.must(new TermQueryBuilder(columnName, value));} else if (exp instanceof NotEqualTo) {return builder.mustNot(new TermQueryBuilder(columnName, value));} else if (exp instanceof GreaterThan) {if (onRight) {return builder.must(greaterThan(columnName, value));} else {return builder.must(lessThan(columnName, value));}} else if (exp instanceof GreaterThanOrEqual) {if (onRight) {return builder.must(greaterThanOrEqual(columnName, value));} else {return builder.must(lessThanOrEqual(columnName, value));}} else if (exp instanceof LessThan) {if (onRight) {return builder.must(lessThan(columnName, value));} else {return builder.must(greaterThan(columnName, value));}} else if (exp instanceof LessThanOrEqual) {if (onRight) {return builder.must(lessThanOrEqual(columnName, value));} else {return builder.must(greaterThanOrEqual(columnName, value));}} else {// Unsupported PredicateLOG.debug("Unsupported predicate [{}] cannot be pushed into ElasticSearchTableSource.", exp);return null;}} else if (exp instanceof BinaryExpression) {if (exp instanceof And) {LOG.debug("All of the predicates should be in CNF. Found an AND expression: {}.", exp);} else if (exp instanceof Or) {QueryBuilder c1 = toElasticSearchQueryBuilder(((Or) exp).left(), builder);QueryBuilder c2 = toElasticSearchQueryBuilder(((Or) exp).right(), builder);if (c1 == null || c2 == null) {return null;} else {return builder.should(c1).should(c2);}} else {// Unsupported PredicateLOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);return null;}} else if (exp instanceof UnaryExpression) {String columnName = ((Attribute) ((UnaryExpression) exp).child()).name();if (exp instanceof IsNull) {builder.must(new ExistsQueryBuilder(columnName));} else if (exp instanceof IsNotNull) {builder.mustNot(new ExistsQueryBuilder(columnName));}}return null;
}
private QueryBuilder greaterThan(String columnName, Object value) {
return new RangeQueryBuilder(columnName).gt(value);
}
private QueryBuilder greaterThanOrEqual(String columnName, Object value) {
return new RangeQueryBuilder(columnName).gte(value);
}
private QueryBuilder lessThan(String columnName, Object value) {
return new RangeQueryBuilder(columnName).lt(value);
}
private QueryBuilder lessThanOrEqual(String columnName, Object value) {
return new RangeQueryBuilder(columnName).lte(value);
}
private Object getLiteral(BinaryComparison comp) {
if (literalOnRight(comp)) {return ((Literal) comp.right()).value();} else {return ((Literal) comp.left()).value();}
}
private String getColumnName(BinaryComparison comp) {
if (literalOnRight(comp)) {return ((Attribute) comp.left()).name();} else {return ((Attribute) comp.right()).name();}
}
private boolean literalOnRight(BinaryComparison comp) {
if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {return false;} else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {return true;} else {throw new RuntimeException("Invalid binary comparison.");}
}
@Override
public boolean isFilterPushedDown() {
return queryBuilder != null;
}
LimitPushDown:
- just transfer limit to ElasticSearchRowDataInputFormat
 
Test Plan
- after discussion, we will implement it and add unit and ttegrate test
 - and we achieve the initial simple implementation for es7
 
Future plan
- support es translog connector just like mysql binlog/hbase wal connector
 
