1 介绍

官网:http://calcite.apache.org/
Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。
Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。
Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。

2 架构与解析步骤

一般来说Calcite解析SQL有以下几步:

  • Parser. 此步中Calcite通过Java CC将SQL解析成未经校验的AST
  • Validate. 该步骤主要作用是校证Parser步骤中的AST是否合法,如验证SQL scheme、字段、函数等是否存在; SQL语句是否合法等. 此步完成之后就生成了RelNode树(关于RelNode树, 请参考下文)
  • Optimize. 该步骤主要的作用优化RelNode树, 并将其转化成物理执行计划。主要涉及SQL规则优化如:基于规则优化(RBO)及基于代价(CBO)优化; Optimze 这一步原则上来说是可选的, 通过Validate后的RelNode树已经可以直接转化物理执行计划,但现代的SQL解析器基本上都包括有这一步,目的是优化SQL执行计划。此步得到的结果为物理执行计划。
  • Execute,即执行阶段。此阶段主要做的是:将物理执行计划转化成可在特定的平台执行的程序。如Hive与Flink都在在此阶段将物理执行计划CodeGen生成相应的可执行代码。

    2.1 查询优化

    1. INSERT INTO tmp_node
    2. SELECT s1.id1, s1.id2, s2.val1
    3. FROM source1 as s1 INNER JOIN source2 AS s2
    4. ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;

    2.2 Parser解析

    1. LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
    2. LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
    3. LogicalFilter(condition=[AND(>($2, 5), =($8, 3))])
    4. LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER])
    5. LogicalTableScan(table=[[SOURCE1]])
    6. LogicalTableScan(table=[[SOURCE2]])

    2.3 Optimize优化

    谓词下推,投影下推,关系代数定律优化
    1. LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
    2. LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
    3. LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])
    4. LogicalFilter(condition=[=($4, 3)])
    5. LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
    6. LogicalTableScan(table=[[SOURCE1]])
    7. LogicalFilter(condition=[>($3,5)])
    8. LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
    9. LogicalTableScan(table=[[SOURCE2]])

    3 LogicalTableScan查询

    如上,节点树中的最后节点均为LogicalTableScan,假设我们不参与(LogicalTableScan)Calcite的查询过程,即不做SQL解析,不做优化,只要把它接入进来,实际Calcite是可以工作的,无非就是可能会有扫全表、数据全部加载到内存里等问题,所以实际中我们可能会参与全部(Translatable)或部分工作(FilterableTable),覆盖Calcite的一些执行计划或过滤条件,让它能更高效的工作。
    值得一提的是,Calcite支持异构数据源查询,比如数据存在es和mysql,可以通过写sql join之类的操作,让calcite分别先从不同的数据源查询数据,然后再在内存里进行合并计算;另外,它本身提供了许多优化规则,也支持我们自定义优化规则,来优化整个查询。

    3.1 ScannableTable

    a simple implementation of Table, using the ScannableTable interface, that enumerates all rows directly
    这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。
    使用方式:实现Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。

    3.2 FilterableTable

    a more advanced implementation that implements FilterableTable, and can filter out rows according to simple predicates
    初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。
    使用方式:实现Enumerable scan(DataContext root, List filters )
    如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理List filters ,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。

    3.3 TranslatableTable

    advanced implementation of Table, using TranslatableTable, that translates to relational operators using planner rules.
    高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(es\mysql\drudi..)来写不同的语法查询。
    当使用ScannableTable的时候,我们只需要实现函数Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数Enumerable scan(DataContext root, List filters );参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。

    4 自定义数据源表接入demo

    如果你的数据源不在官方的支持列表中,或者官方的支持不能满足你的需求,那么则需要自己实现源接入。

    4.1 准备工作

    4.1.1 maven引入

    1. <!--calcite核心包-->
    2. <dependency>
    3. <groupId>org.apache.calcite</groupId>
    4. <artifactId>calcite-core</artifactId>
    5. <version>1.19.0</version>
    6. </dependency>
    7. <!--项目用-->
    8. <dependency>
    9. <groupId>com.alibaba</groupId>
    10. <artifactId>fastjson</artifactId>
    11. <version>1.2.54</version>
    12. </dependency>
    13. <!--项目用-->
    14. <dependency>
    15. <groupId>com.google.guava</groupId>
    16. <artifactId>guava</artifactId>
    17. <version>16.0.1</version>
    18. </dependency>

    4.1.2 开发流程

    calcite中,引入一个数据库通常是通过注册一个SchemaFactory接口实现类来实现。SchemaFactory中只有一个方法,就是生成SchemaSchema最重要的功能是获取所有TableTable有两个功能,一个是获取所有字段的类型,另一个是得到Enumerable迭代器用来读取数据。

    4.1.3 配置信息

    如果将你的数据源引入calcite,一般情况下是使用一个配置文件,以下是配置文件的demo。
    1. {
    2. "version": "1.0",
    3. "defaultSchema": "TEST",
    4. "schemas": [
    5. {
    6. "name": "TEST",
    7. "type": "custom",
    8. "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
    9. "operand": {
    10. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8",
    11. "jdbcDriver":"com.mysql.cj.jdbc.Driver",
    12. "jdbcUser":"test",
    13. "jdbcPassword":"test"
    14. }
    15. }
    16. ]
    17. }

    4.2 CSV表demo

    这里我们先生成一个CSV文件,后边的操作就是通过在calcite中调用SQL访问CSV中的数据。
    TEST01.csv
    1. ID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR
    2. 0,first,second
    3. 1,hello,world
    CsvSchemaFactory类
    1. package com.calcite.csv;
    2. import org.apache.calcite.schema.Schema;
    3. import org.apache.calcite.schema.SchemaFactory;
    4. import org.apache.calcite.schema.SchemaPlus;
    5. import java.util.Map;
    6. public class CsvSchemaFactory implements SchemaFactory {
    7. /**
    8. * parentSchema 他的父节点,一般为root
    9. * name 数据库的名字,它在model中定义的
    10. * operand 也是在mode中定义的,是Map类型,用于传入自定义参数。
    11. * */
    12. @Override
    13. public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
    14. return new CsvSchema(String.valueOf(operand.get("dataFile")));
    15. }
    16. }
    CsvSchema类
    1. package com.calcite.csv;
    2. import com.google.common.collect.ImmutableMap;
    3. import com.google.common.io.Resources;
    4. import org.apache.calcite.schema.Table;
    5. import org.apache.calcite.schema.impl.AbstractSchema;
    6. import org.apache.calcite.util.Source;
    7. import org.apache.calcite.util.Sources;
    8. import java.net.URL;
    9. import java.util.Map;
    10. public class CsvSchema extends AbstractSchema {
    11. private Map<String, Table> tableMap;
    12. private String dataFile;
    13. public CsvSchema(String dataFile) {
    14. this.dataFile = dataFile;
    15. }
    16. @Override
    17. protected Map<String, Table> getTableMap() {
    18. URL url = Resources.getResource(dataFile);
    19. Source source = Sources.of(url);
    20. if (tableMap == null) {
    21. final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
    22. builder.put(this.dataFile.split("\\.")[0],new CsvTable(source));
    23. // 一个数据库有多个表名,这里初始化,大小写要注意了,TEST01是表名。
    24. tableMap = builder.build();
    25. }
    26. return tableMap;
    27. }
    28. }
    CsvTable类
    1. package com.calcite.csv;
    2. import com.google.common.collect.Lists;
    3. import org.apache.calcite.DataContext;
    4. import org.apache.calcite.adapter.java.JavaTypeFactory;
    5. import org.apache.calcite.linq4j.AbstractEnumerable;
    6. import org.apache.calcite.linq4j.Enumerable;
    7. import org.apache.calcite.linq4j.Enumerator;
    8. import org.apache.calcite.rel.type.RelDataType;
    9. import org.apache.calcite.rel.type.RelDataTypeFactory;
    10. import org.apache.calcite.schema.ScannableTable;
    11. import org.apache.calcite.schema.impl.AbstractTable;
    12. import org.apache.calcite.sql.type.SqlTypeName;
    13. import org.apache.calcite.util.Pair;
    14. import org.apache.calcite.util.Source;
    15. import java.io.*;
    16. import java.util.List;
    17. public class CsvTable extends AbstractTable implements ScannableTable {
    18. private Source source;
    19. public CsvTable(Source source) {
    20. this.source = source;
    21. }
    22. /**
    23. * 获取字段类型
    24. */
    25. @Override
    26. public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
    27. JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;
    28. List<String> names = Lists.newLinkedList();
    29. List<RelDataType> types = Lists.newLinkedList();
    30. try {
    31. BufferedReader reader = new BufferedReader(new FileReader(source.file()));
    32. String line = reader.readLine();
    33. List<String> lines = Lists.newArrayList(line.split(","));
    34. lines.forEach(column -> {
    35. String name = column.split(":")[0];
    36. String type = column.split(":")[1];
    37. names.add(name);
    38. types.add(typeFactory.createSqlType(SqlTypeName.get(type)));
    39. });
    40. } catch (FileNotFoundException e) {
    41. e.printStackTrace();
    42. } catch (IOException e) {
    43. e.printStackTrace();
    44. }
    45. return typeFactory.createStructType(Pair.zip(names, types));
    46. }
    47. @Override
    48. public Enumerable<Object[]> scan(DataContext dataContext) {
    49. return new AbstractEnumerable<Object[]>() {
    50. @Override
    51. public Enumerator<Object[]> enumerator() {
    52. return new CsvEnumerator<>(source);
    53. }
    54. };
    55. }
    56. }
    CsvEnumerator类
    1. package com.calcite.csv;
    2. import org.apache.calcite.linq4j.Enumerator;
    3. import org.apache.calcite.util.Source;
    4. import java.io.BufferedReader;
    5. import java.io.IOException;
    6. public class CsvEnumerator <E> implements Enumerator<E> {
    7. private E current;
    8. private BufferedReader br;
    9. public CsvEnumerator(Source source) {
    10. try {
    11. this.br = new BufferedReader(source.reader());
    12. this.br.readLine();
    13. } catch (IOException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. @Override
    18. public E current() {
    19. return current;
    20. }
    21. @Override
    22. public boolean moveNext() {
    23. try {
    24. String line = br.readLine();
    25. if(line == null){
    26. return false;
    27. }
    28. current = (E)line.split(","); // 如果是多列,这里要多个值
    29. } catch (IOException e) {
    30. e.printStackTrace();
    31. return false;
    32. }
    33. return true;
    34. }
    35. /**
    36. * 出现异常走这里
    37. * */
    38. @Override
    39. public void reset() {
    40. System.out.println("报错了兄弟,不支持此操作");
    41. }
    42. /**
    43. * InputStream流在这里关闭
    44. * */
    45. @Override
    46. public void close() {
    47. }
    48. }
    model.json
    1. {
    2. "version": "1.0",
    3. "defaultSchema": "TEST_CSV",
    4. "schemas": [
    5. {
    6. "name": "TEST_CSV",
    7. "type": "custom",
    8. "factory": "com.calcite.csv.CsvSchemaFactory",
    9. "operand": {
    10. "dataFile": "TEST01.csv"
    11. }
    12. }
    13. ]
    14. }
    Main方法调用 ``` package com.calcite; import com.alibaba.fastjson.JSON; import com.calcite.util.ReourceUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.sql.; import java.util.List; import java.util.Map; public class Client { /*
    • 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
    • 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
    • operand 动态参数,ScheamFactory的create方法会接收到这里的数据 */ public static void main(String[] args) { try {
      1. // 用文件的方式
      2. //URL url = Client.class.getResource("/model.json");
      3. //String str = URLDecoder.decode(url.toString(), "UTF-8");
      4. //Properties info = new Properties();
      5. //info.put("model", str.replace("file:", ""));
      6. //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
      7. // 字符串方式
      8. String model = ReourceUtil.getResourceAsString("model.json");
      9. Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
      10. Statement statement = connection.createStatement();
      11. test1(statement);
      } catch (Exception e) {
      1. e.printStackTrace();
      } }
  1. /**
  2. * CSV文件读取
  3. * @param statement
  4. * @throws Exception
  5. */
  6. public static void test1(Statement statement) throws Exception {
  7. ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
  8. System.out.println(JSON.toJSONString(getData(resultSet)));
  9. }
  10. public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
  11. List<Map<String,Object>> list = Lists.newArrayList();
  12. ResultSetMetaData metaData = resultSet.getMetaData();
  13. int columnSize = metaData.getColumnCount();
  14. while (resultSet.next()) {
  15. Map<String, Object> map = Maps.newLinkedHashMap();
  16. for (int i = 1; i < columnSize + 1; i++) {
  17. map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
  18. }
  19. list.add(map);
  20. }
  21. return list;
  22. }

}

  1. <a name="h3_16"></a>
  2. ### 4.3 内存数据源与CSV数据源关联查询demo
  3. 在4.2的演示中,我们能够使用SQL查询CSV文件中的数据。接下来,我们再定义一种内存数据源,主要作用是演示两种数据源间的关联查询。<br />**MemSchemaFactory类**

package com.calcite.memory; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; import java.util.Map; public class MemSchemaFactory implements SchemaFactory { @Override public Schema create(SchemaPlus schemaPlus, String s, Map map) { return new MemSchema(map); } }

  1. **MemSchema类**

package com.calcite.memory; import com.google.common.collect.ImmutableMap; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import java.util.Map; public class MemSchema extends AbstractSchema { private Map map; private Map tableMap; public MemSchema(Map map) { this.map = map; } @Override protected Map getTableMap() { if (tableMap == null) { final ImmutableMap.Builder builder = ImmutableMap.builder(); map.forEach((key, value) -> { builder.put(key, new MemTable(value)); }); tableMap = builder.build(); } return tableMap; } }

  1. **MemTable类**

package com.calcite.memory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.parser.Feature; import com.google.common.collect.Lists; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.List; import java.util.Map; public class MemTable extends AbstractTable implements ScannableTable { private List> list = Lists.newLinkedList(); public MemTable(Object list) { if (list instanceof List) { ((List)list).forEach(o -> { this.list.add( JSON.parseObject(JSON.toJSONString(o), new TypeReference>() {}, Feature.OrderedField)); }); } } @Override public Enumerable scan(DataContext dataContext) { return new AbstractEnumerable() { @Override public Enumerator enumerator() { return new MemEnumerator(list); } }; } @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory; List names = Lists.newLinkedList(); List types = Lists.newLinkedList(); if (list.size() != 0) { list.get(0).forEach((key, value) -> { names.add(key); types.add(typeFactory.createSqlType(SqlTypeName.get(“VARCHAR”))); }); } return typeFactory.createStructType(Pair.zip(names, types)); } }

  1. **MemEnumerator类**

package com.calcite.memory; import com.google.common.collect.Lists; import org.apache.calcite.linq4j.Enumerator; import java.util.List; import java.util.Map; public class MemEnumerator implements Enumerator { private List> list = Lists.newLinkedList(); private int index = -1; private E e; public MemEnumerator(List> list) { this.list = list; } @Override public E current() { return e; } @Override public boolean moveNext() { if (index+1 >= list.size()){ return false; }else { e = (E)list.get(index+1).values().toArray(); index++; return true; } } @Override public void reset() { index = -1; e = null; } @Override public void close() { } }

  1. **model.json**

{ “version”: “1.0”, “defaultSchema”: “TEST_CSV”, “schemas”: [ { “name”: “TEST_CSV”, “type”: “custom”, “factory”: “com.calcite.csv.CsvSchemaFactory”, “operand”: { “dataFile”: “TEST01.csv” } }, { “name”: “TEST_MEM”, “type”: “custom”, “factory”: “com.calcite.memory.MemSchemaFactory”, “operand”: { “MEM_TABLE_1”: [ { “ID”: 0, “MEM_STR”: “str0” }, { “ID”: 1, “MEM_STR”: “str1” }, { “ID”: 2, “MEM_STR”: “str2” } ] } } ] }

  1. **Main方法调用**

package com.calcite; import com.alibaba.fastjson.JSON; import com.calcite.util.ReourceUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.sql.; import java.util.List; import java.util.Map; public class Client { /*

  1. * 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
  2. * 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
  3. * operand 动态参数,ScheamFactorycreate方法会接收到这里的数据
  4. */
  5. public static void main(String[] args) {
  6. try {
  7. // 用文件的方式
  8. //URL url = Client.class.getResource("/model.json");
  9. //String str = URLDecoder.decode(url.toString(), "UTF-8");
  10. //Properties info = new Properties();
  11. //info.put("model", str.replace("file:", ""));
  12. //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
  13. // 字符串方式
  14. String model = ReourceUtil.getResourceAsString("model.json");
  15. Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
  16. Statement statement = connection.createStatement();
  17. test2(statement);
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. /**
  23. * CSV文件读取
  24. * @param statement
  25. * @throws Exception
  26. */
  27. public static void test1(Statement statement) throws Exception {
  28. ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
  29. System.out.println(JSON.toJSONString(getData(resultSet)));
  30. }
  31. /**
  32. * CSV文件与内存文件关联读取
  33. * @param statement
  34. * @throws Exception
  35. */
  36. public static void test2(Statement statement) throws Exception {
  37. ResultSet resultSet1 = statement.executeQuery("select csv1.id as cid,csv1.name1 as cname ,mem1.id as mid,mem1.mem_str as mstr from test_csv.TEST01 as csv1 left join test_mem.mem_table_1 as mem1 on csv1.id = mem1.id");
  38. System.out.println(JSON.toJSONString(getData(resultSet1)));
  39. }
  40. public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
  41. List<Map<String,Object>> list = Lists.newArrayList();
  42. ResultSetMetaData metaData = resultSet.getMetaData();
  43. int columnSize = metaData.getColumnCount();
  44. while (resultSet.next()) {
  45. Map<String, Object> map = Maps.newLinkedHashMap();
  46. for (int i = 1; i < columnSize + 1; i++) {
  47. map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
  48. }
  49. list.add(map);
  50. }
  51. return list;
  52. }

} ```

小结

calcite对于没有高并发低延时的多数据源间数据有着天然的优势。但需要注意的是,如果一个表中数据量特别大,大到读取速度很慢或内存无法容纳,那么务必在操作该表数据时加入尽可能多的筛选条件,如果自定义实现LogicalTableScan,最好也是实现FilterableTable,从而减少calcite在内存中操作数据行的量。
参考:

转自:http://blog.gavinzh.com/2019/06/29/calcite-learn/