- FilterPushDown:
* [LimitPushDown:](#limitpushdown)
ElasticSearch source design documention
Background and Motivation
- the connector of flink with ES just support sink but doesn’t support source, some company truly has the scenario of reading elasticsearch
- We propose a throughout es source connector implementation in Flink.
Scope
- We’ll design and develop a es source connector with the following features:
- Es as bounded source
- support multiple es version(5/6/7)
- support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance
Overall Design
- We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
- implementation DynamicTableSource to support sql
ElasticsearchInputSplit
- we split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
ElasticsearchRowDataInputformat
public class ElasticSearchRowFormat
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchRowDataInputFormat.class);
//for parser json data
private DeserializationSchema
// for FilterPushDown
private QueryBuilder predict;
private String index;
private SearchHit[] currentScrollWindowHits = null;
private int nextRecordIndex = 0;
private transient long currentReadCount = 0L;
// fir LimitPushDown to limit data to fecth
private long limit;
private final int scrollTimeout;
private final int scrollSize;
// ————————————————————————————————————
// User-facing API and configuration
// ————————————————————————————————————
/**
The config map that contains configuration for the bulk flushing behaviours.
*
For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config
map would also contain Elasticsearch-shipped configuration, and therefore this config map
would also be forwarded when creating the Elasticsearch client.
*/
private final Map
// ————————————————————————————————————
// Internals for the Flink Elasticsearch Sink
// ————————————————————————————————————
/* Call bridge for different version-specific. /
private final ElasticsearchApiCallBridge
/* Elasticsearch client created using the call bridge. /
private transient C client;
public ElasticSearchRowInputFormat(
ElasticsearchApiCallBridge<C> callBridge,
Map<String, String> userConfig,
DeserializationSchema<RowData> serializationSchema,
QueryBuilder queryBuilder,
String index,
long limit) {
this.serializationSchema = serializationSchema;
this.queryBuilder = queryBuilder;
this.index = index;
this.limit = limit;
this.callBridge = checkNotNull(callBridge);
checkNotNull(userConfig);
// copy config so we can remove entries without side-effects
this.userConfig = new HashMap<>(userConfig);
//todo
}
@Override
public void configure(Configuration parameters) {
client = callBridge.createClient(userConfig);
callBridge.verifyClientConnection(client);
//todo
}
@Override
public void open(ElasticsearchInputSplit split) throws IOException {
LOG.info("Opening split: {}", split);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
.setTypes(type)
.setSearchType(QUERY_THEN_FETCH)
.setScroll(new TimeValue(scrollTimeout.toMillis()))
.setFetchSource(fields)
.setQuery(predict)
.setPreference("_shards:" + shard)
.setSize(size);
SearchRequest searchRequest searchRequestBuilder.request()
SearchResponse searchResponse = null;
try {
searchResponse = client.search(searchRequest);
} catch (IOException e) {
LOG.error("Search has error: {}", e.getMessage());
}
if (searchResponse != null) {
currentScrollWindowId = searchResponse.getScrollId();
currentScrollWindowHits = searchResponse.getHits().getHits();
nextRecordIndex = 0;
}
}
@Override
public ElasticsearchInputSplit[] createInputSplits(int minNumSplits) throws IOException {
return callBridge.createInputSplitsInternal(client, minNumSplits);
}
@Override
public InputSplitAssigner getInputSplitAssigner(ElasticsearchInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}
@Override
public boolean reachedEnd() throws IOException {
if (limit > 0 && currentReadCount >= limit) {
return true;
}
if (currentScrollWindowHits.length > 0 && nextRecordIndex > currentScrollWindowHits.length - 1) {
fetchNextScrollWindow();
}
return currentScrollWindowHits.length == 0;
}
@Override
public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {
LOG.warn("Already reached the end of the split.");
}
SearchHit hit = currentScrollWindowHits[nextRecordIndex];
nextRecordIndex += 1;
currentReadCount++;
LOG.debug("Yielding new record for hit: {}", hit);
return parseSearchHit(hit);
}
}
Adapt to different es versions
- In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add two methods in ElasticsearchApiCallBridge
interface ElasticsearchApiCallBridge {
//this is for getSearchShards for different es client
ElasticsearchInputSplit[] createInputSplitsInternal(String index, C client, int minNumSplits);
SearchResponse search(C client, SearchRequest searchRequest) throws IOException;
SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;
void close(C client) throws IOException;
}
- take search as an exmaple
- for Elasticsearch5ApiCallBridge
```java
//for Elasticsearch5ApiCallBridge
@Override
public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {
//convert elasticsearchBuilder to SearchRequestBuiler
return client.search(searchRequest).actionGet();
}
- for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
@Override
public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {
//convert elasticsearchBuilder to SearchRequest
return client.search(searchRequest);
}
how to construct SearchRequest
//converts ElasticSearchBuilder to SearchRequestBuilder in es 5 for TransportClient api
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indices)
.setTypes(type)
.setSearchType(QUERY_THEN_FETCH)
.setScroll(new TimeValue(scrollTimeout.toMillis()))
.setFetchSource(fields.toArray(new String[0]), null)
.setQuery(BoolQueryBuilder)
.setPreference("_shards:" + shard)
.addSort("_doc", ASC)
.setSize(scrollSize);
SearchRequest searchRequest searchRequestBuilder.request()
DDL and configuration
- for FLIP-122 New Connector Property Keys for New Factory
- for es DynamicTableSource
CREATE TABLE es_table (
...
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9092',
'index' = 'MyIndex',
'document-type' = 'MyType'
'scan.xx'= 'xx',
);
ElasticSearchTableSource
FilterPushDown and ProjectPushDown and LimitPushDown
FilterPushDown:
- extract List, which can be pushdown to es and convert to es BoolQueryBuilder
@Override
public TableSource applyPredicate(List
BoolQueryBuilder builder = new BoolQueryBuilder();
// try to convert Flink filter expressions to Parquet FilterPredicates
List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
for (Expression toConvert : predicates) {
if(toElasticSearchQueryBuilder(toConvert, builder) == null) {
unsupportedExpressions.add(toConvert);
}
}
// update list of Flink expressions to unsupported expressions
predicates.clear();
predicates.addAll(unsupportedExpressions);
// create and return a new ParquetTableSource with Parquet FilterPredicate
return new ElasticsearchTableSource(schema, hosts, index, deserializer, sourceOptions, projectedFields, builder);
}
private boolean isValid(BinaryComparison comp) {
return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
(comp.left() instanceof Attribute && comp.right() instanceof Literal);
}
/**
Converts Flink Expression to ElasticSearch FilterPredicate.
*/
@Nullable
private QueryBuilder toElasticSearchQueryBuilder(Expression exp, BoolQueryBuilder builder) {
if (exp instanceof Not) {
QueryBuilder c = toElasticSearchQueryBuilder(((Not) exp).child(), builder);
if (c == null) {
return null;
} else {
builder.mustNot(c);
}
} else if (exp instanceof BinaryComparison) {
BinaryComparison binComp = (BinaryComparison) exp;
if (!isValid(binComp)) {
// unsupported literal Type
LOG.debug("Unsupported predict [{}] cannot be pushed to ElasticSearchTableSource.", exp);
return null;
}
boolean onRight = literalOnRight(binComp);
String columnName = getColumnName(binComp);
Object value = getLiteral(binComp);
if (exp instanceof EqualTo) {
return builder.must(new TermQueryBuilder(columnName, value));
} else if (exp instanceof NotEqualTo) {
return builder.mustNot(new TermQueryBuilder(columnName, value));
} else if (exp instanceof GreaterThan) {
if (onRight) {
return builder.must(greaterThan(columnName, value));
} else {
return builder.must(lessThan(columnName, value));
}
} else if (exp instanceof GreaterThanOrEqual) {
if (onRight) {
return builder.must(greaterThanOrEqual(columnName, value));
} else {
return builder.must(lessThanOrEqual(columnName, value));
}
} else if (exp instanceof LessThan) {
if (onRight) {
return builder.must(lessThan(columnName, value));
} else {
return builder.must(greaterThan(columnName, value));
}
} else if (exp instanceof LessThanOrEqual) {
if (onRight) {
return builder.must(lessThanOrEqual(columnName, value));
} else {
return builder.must(greaterThanOrEqual(columnName, value));
}
} else {
// Unsupported Predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into ElasticSearchTableSource.", exp);
return null;
}
} else if (exp instanceof BinaryExpression) {
if (exp instanceof And) {
LOG.debug("All of the predicates should be in CNF. Found an AND expression: {}.", exp);
} else if (exp instanceof Or) {
QueryBuilder c1 = toElasticSearchQueryBuilder(((Or) exp).left(), builder);
QueryBuilder c2 = toElasticSearchQueryBuilder(((Or) exp).right(), builder);
if (c1 == null || c2 == null) {
return null;
} else {
return builder.should(c1).should(c2);
}
} else {
// Unsupported Predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
return null;
}
} else if (exp instanceof UnaryExpression) {
String columnName = ((Attribute) ((UnaryExpression) exp).child()).name();
if (exp instanceof IsNull) {
builder.must(new ExistsQueryBuilder(columnName));
} else if (exp instanceof IsNotNull) {
builder.mustNot(new ExistsQueryBuilder(columnName));
}
}
return null;
}
private QueryBuilder greaterThan(String columnName, Object value) {
return new RangeQueryBuilder(columnName).gt(value);
}
private QueryBuilder greaterThanOrEqual(String columnName, Object value) {
return new RangeQueryBuilder(columnName).gte(value);
}
private QueryBuilder lessThan(String columnName, Object value) {
return new RangeQueryBuilder(columnName).lt(value);
}
private QueryBuilder lessThanOrEqual(String columnName, Object value) {
return new RangeQueryBuilder(columnName).lte(value);
}
private Object getLiteral(BinaryComparison comp) {
if (literalOnRight(comp)) {
return ((Literal) comp.right()).value();
} else {
return ((Literal) comp.left()).value();
}
}
private String getColumnName(BinaryComparison comp) {
if (literalOnRight(comp)) {
return ((Attribute) comp.left()).name();
} else {
return ((Attribute) comp.right()).name();
}
}
private boolean literalOnRight(BinaryComparison comp) {
if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
return false;
} else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
return true;
} else {
throw new RuntimeException("Invalid binary comparison.");
}
}
@Override
public boolean isFilterPushedDown() {
return queryBuilder != null;
}
LimitPushDown:
- just transfer limit to ElasticSearchRowDataInputFormat
Test Plan
- after discussion, we will implement it and add unit and ttegrate test
- and we achieve the initial simple implementation for es7
Future plan
- support es translog connector just like mysql binlog/hbase wal connector