ElasticSearch source design documention

Background and Motivation

  • the connector of flink with ES just support sink but doesn’t support source, some company truly has the scenario of reading elasticsearch
  • We propose a throughout es source connector implementation in Flink.

Scope

  • We’ll design and develop a es source connector with the following features:
    • Es as bounded source
    • support multiple es version(5/6/7)
    • support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance

Overall Design

  • We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
  • implementation DynamicTableSource to support sql

ElasticsearchInputSplit

  • we split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
    EsSource design doc - 图1

ElasticsearchRowDataInputformat

public class ElasticSearchRowFormat extends RichInputFormat {

private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchRowDataInputFormat.class);

//for parser json data

private DeserializationSchema serializationSchema;

// for FilterPushDown

private QueryBuilder predict;

private String index;

private SearchHit[] currentScrollWindowHits = null;

private int nextRecordIndex = 0;

private transient long currentReadCount = 0L;

// fir LimitPushDown to limit data to fecth

private long limit;

private final int scrollTimeout;

private final int scrollSize;

// ————————————————————————————————————

// User-facing API and configuration

// ————————————————————————————————————

/**

  • The config map that contains configuration for the bulk flushing behaviours.

    *

  • For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config

  • map would also contain Elasticsearch-shipped configuration, and therefore this config map

  • would also be forwarded when creating the Elasticsearch client.

    */

private final Map userConfig;

// ————————————————————————————————————

// Internals for the Flink Elasticsearch Sink

// ————————————————————————————————————

/* Call bridge for different version-specific. /

private final ElasticsearchApiCallBridge callBridge;

/* Elasticsearch client created using the call bridge. /

private transient C client;

public ElasticSearchRowInputFormat(

  1. ElasticsearchApiCallBridge<C> callBridge,
  2. Map<String, String> userConfig,
  3. DeserializationSchema<RowData> serializationSchema,
  4. QueryBuilder queryBuilder,
  5. String index,
  6. long limit) {
  7. this.serializationSchema = serializationSchema;
  8. this.queryBuilder = queryBuilder;
  9. this.index = index;
  10. this.limit = limit;
  11. this.callBridge = checkNotNull(callBridge);
  12. checkNotNull(userConfig);
  13. // copy config so we can remove entries without side-effects
  14. this.userConfig = new HashMap<>(userConfig);
  15. //todo

}

@Override

public void configure(Configuration parameters) {

  1. client = callBridge.createClient(userConfig);
  2. callBridge.verifyClientConnection(client);
  3. //todo

}

@Override

public void open(ElasticsearchInputSplit split) throws IOException {

  1. LOG.info("Opening split: {}", split);
  2. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
  3. .setTypes(type)
  4. .setSearchType(QUERY_THEN_FETCH)
  5. .setScroll(new TimeValue(scrollTimeout.toMillis()))
  6. .setFetchSource(fields)
  7. .setQuery(predict)
  8. .setPreference("_shards:" + shard)
  9. .setSize(size);
  10. SearchRequest searchRequest searchRequestBuilder.request()
  11. SearchResponse searchResponse = null;
  12. try {
  13. searchResponse = client.search(searchRequest);
  14. } catch (IOException e) {
  15. LOG.error("Search has error: {}", e.getMessage());
  16. }
  17. if (searchResponse != null) {
  18. currentScrollWindowId = searchResponse.getScrollId();
  19. currentScrollWindowHits = searchResponse.getHits().getHits();
  20. nextRecordIndex = 0;
  21. }

}

@Override

public ElasticsearchInputSplit[] createInputSplits(int minNumSplits) throws IOException {

  1. return callBridge.createInputSplitsInternal(client, minNumSplits);

}

@Override

public InputSplitAssigner getInputSplitAssigner(ElasticsearchInputSplit[] inputSplits) {

  1. return new LocatableInputSplitAssigner(inputSplits);

}

@Override

public boolean reachedEnd() throws IOException {

  1. if (limit > 0 && currentReadCount >= limit) {
  2. return true;
  3. }
  4. if (currentScrollWindowHits.length > 0 && nextRecordIndex > currentScrollWindowHits.length - 1) {
  5. fetchNextScrollWindow();
  6. }
  7. return currentScrollWindowHits.length == 0;

}

@Override

public Row nextRecord(Row reuse) throws IOException {

  1. if (reachedEnd()) {
  2. LOG.warn("Already reached the end of the split.");
  3. }
  4. SearchHit hit = currentScrollWindowHits[nextRecordIndex];
  5. nextRecordIndex += 1;
  6. currentReadCount++;
  7. LOG.debug("Yielding new record for hit: {}", hit);
  8. return parseSearchHit(hit);

}

}

EsSource design doc - 图2

Adapt to different es versions

  • In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add two methods in ElasticsearchApiCallBridge

interface ElasticsearchApiCallBridge {

//this is for getSearchShards for different es client

ElasticsearchInputSplit[] createInputSplitsInternal(String index, C client, int minNumSplits);

SearchResponse search(C client, SearchRequest searchRequest) throws IOException;

SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;

void close(C client) throws IOException;

}

  1. - take search as an exmaple
  2. - for Elasticsearch5ApiCallBridge
  3. ```java
  4. //for Elasticsearch5ApiCallBridge
  5. @Override
  6. public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {
  7. //convert elasticsearchBuilder to SearchRequestBuiler
  8. return client.search(searchRequest).actionGet();
  9. }
  • for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge

//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge

@Override

public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {

  1. //convert elasticsearchBuilder to SearchRequest

return client.search(searchRequest);

}

how to construct SearchRequest

//converts ElasticSearchBuilder to SearchRequestBuilder in es 5 for TransportClient api

SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indices)

  1. .setTypes(type)
  2. .setSearchType(QUERY_THEN_FETCH)
  3. .setScroll(new TimeValue(scrollTimeout.toMillis()))
  4. .setFetchSource(fields.toArray(new String[0]), null)
  5. .setQuery(BoolQueryBuilder)
  6. .setPreference("_shards:" + shard)
  7. .addSort("_doc", ASC)
  8. .setSize(scrollSize);

SearchRequest searchRequest searchRequestBuilder.request()

DDL and configuration

  • for FLIP-122 New Connector Property Keys for New Factory
    • for es DynamicTableSource
  1. CREATE TABLE es_table (
  2. ...
  3. ) WITH (
  4. 'connector' = 'elasticsearch-7',
  5. 'hosts' = 'http://localhost:9092',
  6. 'index' = 'MyIndex',
  7. 'document-type' = 'MyType'
  8. 'scan.xx'= 'xx',
  9. );

ElasticSearchTableSource

EsSource design doc - 图3

FilterPushDown and ProjectPushDown and LimitPushDown

FilterPushDown:
  • extract List, which can be pushdown to es and convert to es BoolQueryBuilder

@Override

public TableSource applyPredicate(List predicates) {

  1. BoolQueryBuilder builder = new BoolQueryBuilder();
  2. // try to convert Flink filter expressions to Parquet FilterPredicates
  3. List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
  4. for (Expression toConvert : predicates) {
  5. if(toElasticSearchQueryBuilder(toConvert, builder) == null) {
  6. unsupportedExpressions.add(toConvert);
  7. }
  8. }
  9. // update list of Flink expressions to unsupported expressions
  10. predicates.clear();
  11. predicates.addAll(unsupportedExpressions);
  12. // create and return a new ParquetTableSource with Parquet FilterPredicate
  13. return new ElasticsearchTableSource(schema, hosts, index, deserializer, sourceOptions, projectedFields, builder);

}

private boolean isValid(BinaryComparison comp) {

  1. return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
  2. (comp.left() instanceof Attribute && comp.right() instanceof Literal);

}

/**

  • Converts Flink Expression to ElasticSearch FilterPredicate.

    */

@Nullable

private QueryBuilder toElasticSearchQueryBuilder(Expression exp, BoolQueryBuilder builder) {

  1. if (exp instanceof Not) {
  2. QueryBuilder c = toElasticSearchQueryBuilder(((Not) exp).child(), builder);
  3. if (c == null) {
  4. return null;
  5. } else {
  6. builder.mustNot(c);
  7. }
  8. } else if (exp instanceof BinaryComparison) {
  9. BinaryComparison binComp = (BinaryComparison) exp;
  10. if (!isValid(binComp)) {
  11. // unsupported literal Type
  12. LOG.debug("Unsupported predict [{}] cannot be pushed to ElasticSearchTableSource.", exp);
  13. return null;
  14. }
  15. boolean onRight = literalOnRight(binComp);
  16. String columnName = getColumnName(binComp);
  17. Object value = getLiteral(binComp);
  18. if (exp instanceof EqualTo) {
  19. return builder.must(new TermQueryBuilder(columnName, value));
  20. } else if (exp instanceof NotEqualTo) {
  21. return builder.mustNot(new TermQueryBuilder(columnName, value));
  22. } else if (exp instanceof GreaterThan) {
  23. if (onRight) {
  24. return builder.must(greaterThan(columnName, value));
  25. } else {
  26. return builder.must(lessThan(columnName, value));
  27. }
  28. } else if (exp instanceof GreaterThanOrEqual) {
  29. if (onRight) {
  30. return builder.must(greaterThanOrEqual(columnName, value));
  31. } else {
  32. return builder.must(lessThanOrEqual(columnName, value));
  33. }
  34. } else if (exp instanceof LessThan) {
  35. if (onRight) {
  36. return builder.must(lessThan(columnName, value));
  37. } else {
  38. return builder.must(greaterThan(columnName, value));
  39. }
  40. } else if (exp instanceof LessThanOrEqual) {
  41. if (onRight) {
  42. return builder.must(lessThanOrEqual(columnName, value));
  43. } else {
  44. return builder.must(greaterThanOrEqual(columnName, value));
  45. }
  46. } else {
  47. // Unsupported Predicate
  48. LOG.debug("Unsupported predicate [{}] cannot be pushed into ElasticSearchTableSource.", exp);
  49. return null;
  50. }
  51. } else if (exp instanceof BinaryExpression) {
  52. if (exp instanceof And) {
  53. LOG.debug("All of the predicates should be in CNF. Found an AND expression: {}.", exp);
  54. } else if (exp instanceof Or) {
  55. QueryBuilder c1 = toElasticSearchQueryBuilder(((Or) exp).left(), builder);
  56. QueryBuilder c2 = toElasticSearchQueryBuilder(((Or) exp).right(), builder);
  57. if (c1 == null || c2 == null) {
  58. return null;
  59. } else {
  60. return builder.should(c1).should(c2);
  61. }
  62. } else {
  63. // Unsupported Predicate
  64. LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
  65. return null;
  66. }
  67. } else if (exp instanceof UnaryExpression) {
  68. String columnName = ((Attribute) ((UnaryExpression) exp).child()).name();
  69. if (exp instanceof IsNull) {
  70. builder.must(new ExistsQueryBuilder(columnName));
  71. } else if (exp instanceof IsNotNull) {
  72. builder.mustNot(new ExistsQueryBuilder(columnName));
  73. }
  74. }
  75. return null;

}

private QueryBuilder greaterThan(String columnName, Object value) {

  1. return new RangeQueryBuilder(columnName).gt(value);

}

private QueryBuilder greaterThanOrEqual(String columnName, Object value) {

  1. return new RangeQueryBuilder(columnName).gte(value);

}

private QueryBuilder lessThan(String columnName, Object value) {

  1. return new RangeQueryBuilder(columnName).lt(value);

}

private QueryBuilder lessThanOrEqual(String columnName, Object value) {

  1. return new RangeQueryBuilder(columnName).lte(value);

}

private Object getLiteral(BinaryComparison comp) {

  1. if (literalOnRight(comp)) {
  2. return ((Literal) comp.right()).value();
  3. } else {
  4. return ((Literal) comp.left()).value();
  5. }

}

private String getColumnName(BinaryComparison comp) {

  1. if (literalOnRight(comp)) {
  2. return ((Attribute) comp.left()).name();
  3. } else {
  4. return ((Attribute) comp.right()).name();
  5. }

}

private boolean literalOnRight(BinaryComparison comp) {

  1. if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
  2. return false;
  3. } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
  4. return true;
  5. } else {
  6. throw new RuntimeException("Invalid binary comparison.");
  7. }

}

@Override

public boolean isFilterPushedDown() {

  1. return queryBuilder != null;

}

LimitPushDown:

  • just transfer limit to ElasticSearchRowDataInputFormat

Test Plan

  • after discussion, we will implement it and add unit and ttegrate test
  • and we achieve the initial simple implementation for es7

Future plan

  • support es translog connector just like mysql binlog/hbase wal connector