背景:

支持数据还原在异构数据源之间的ddl进行转换,达到ddl自动执行

方案

目标:

  • 异构数据源之间ddl需要相关转换
  • 库名 表名 列名需要做到映射转换

流程

一个完整的自动执行DDL 数据还原流程
image.png

MappingAndDdlConventFlatMap设计

何时存在此算子:

  1. 存在nameMapping设置
  2. ddl自动执行 且 source以及sink是异构数据源

a和b是或条件

image.png

nameMapping转换
image.png

  1. 如果存在NameMaping进行映射,则先进行名称转换
  2. 如果需要ddl转换且数据是ddlRowData 则
    1. 通过source对应的ddlConvent转为中间数据
    2. 中间数据进行NameMapping映射转换
    3. 中间数据通过sink对应的ddlCOnvent进行转换为sql
    4. 将sql替换掉rowData里的源sql

image.png

DdlConvent设计

ddlConvent设计

image.png

DdlConvent

DdlConvent类作为顶层接口提供了三个基础接口

  1. //ddlRowData转为中间数据
  2. Ddldata rowConventToDdlData(RowData row);
  3. //中间数据转为sql
  4. String ddlDataConventToSql(Ddldata ddldata);
  5. //convent对应的数据源类型
  6. String getDataSource();

BaseCalciteDdlConvent

�基于calcite的基础实现

  1. protected final TypeConvent typeConvent;
  2. // 解析配置
  3. protected final SqlParser.Config sqlParseConfig;
  4. public BaseCalciteDdlConvent(TypeConvent typeConvent, SqlParser.Config sqlParseConfig) {
  5. this.typeConvent = typeConvent;
  6. this.sqlParseConfig = sqlParseConfig;
  7. }
  8. @Override
  9. public String ddlDataConventToSql(Ddldata ddldata) {
  10. switch (ddldata.getType()) {
  11. case CREATE_SCHEMA:
  12. return doParseCreateSchemaDataToSql(ddldata);
  13. case ALTER_TABLE:
  14. return doParseAlterTableDataToSql(ddldata);
  15. case CREATE_TABLE:
  16. return doParseCreateTableDataToSql(ddldata);
  17. default:
  18. throw new UnsupportedOperationException("not support " + ddldata.getType());
  19. }
  20. }
  21. @Override
  22. public Ddldata rowConventToDdlData(RowData row) {
  23. String sql = getSql(row);
  24. // 创建解析器 这儿后续需要替换为flinkxSqlParse
  25. SqlParser parser = SqlParser.create(sql, sqlParseConfig);
  26. SqlNode sqlNode;
  27. try {
  28. // 解析sql
  29. sqlNode = parser.parseStmt();
  30. } catch (SqlParseException e) {
  31. throw new RuntimeException(e);
  32. }
  33. switch (sqlNode.getKind().toString()) {
  34. case "CREATE_TABLE":
  35. return doParseCreateTableSqlToData(sqlNode, row);
  36. case "CREATE_SCHEMA":
  37. return doParseCreateSchemaSqlToData(sqlNode, row);
  38. case "ALTER_TABLE":
  39. return doParseAlterTableSqlToData(sqlNode, row);
  40. default:
  41. //todo 异常如何管理
  42. throw new RuntimeException("not support parse sql type " + sqlNode.getKind());
  43. }
  44. }
  45. protected Ddldata doParseCreateTableSqlToData(SqlNode sql, RowData row) {
  46. throw new UnsupportedOperationException("not support CreateTable ");
  47. }
  48. protected Ddldata doParseAlterTableSqlToData(SqlNode sql, RowData row) {
  49. throw new UnsupportedOperationException("not support AlterTable ");
  50. }
  51. protected Ddldata doParseCreateSchemaSqlToData(SqlNode sql, RowData row) {
  52. throw new UnsupportedOperationException("not support CreateSchema");
  53. }
  54. protected String doParseAlterTableDataToSql(Ddldata ddldata) {
  55. throw new UnsupportedOperationException("not support " + ddldata.getType());
  56. }
  57. protected String doParseCreateSchemaDataToSql(Ddldata ddldata) {
  58. throw new UnsupportedOperationException("not support " + ddldata.getType());
  59. }
  60. protected String doParseCreateTableDataToSql(Ddldata ddldata) {
  61. throw new UnsupportedOperationException("not support " + ddldata.getType());
  62. }
  63. private String getSql(RowData row) {
  64. DdlRowData ddlRowData = (DdlRowData) row;
  65. String sql = "";
  66. for (int i = 0; i < ddlRowData.getHeaders().length; i++) {
  67. if (ddlRowData.getHeaders()[i].equals("sql")) {
  68. sql = ddlRowData.getInfo(i);
  69. }
  70. }
  71. return sql;
  72. }

TypeConvent

类型转换,因为异构数据源之间除了Sql格式需要转换以外,还需要类型的转换,因此类型转换单独抽取出来作为ddlConvent的一部分

  1. public interface TypeConvent {
  2. //数据源类型转为内部类型
  3. Type conventInternal(String name);
  4. //内部类型转为数据源类型
  5. String conventExternal(Type type);
  6. }

�将数据源类型转为中间类型,然后再将中间类型转为对应的数据源类型。

中间类型支持

image.png

  • 每种数据源需要按照FlinkSqlParserImpl方式 实现 SqlAbstractParserImpl 类进行解析

中间数据设计

中间数据设计按照create alter等不同语法都有不同的ddlData。如create table语句就会转为一个createTable的ddlData

  1. package com.dtstack.chunjun.cdc.ddl.entity;
  2. import com.dtstack.chunjun.cdc.EventType;
  3. import javax.naming.OperationNotSupportedException;
  4. import java.util.List;
  5. public class CreateTableData extends DdlData implements Identity, ColumnData {
  6. private String schema;
  7. private String table;
  8. private final List<ColumnEntity> columnList;
  9. private final List<IndexEntity> indexList;
  10. private final List<PartitionEntity> partitionList;
  11. public CreateTableData(
  12. EventType type,
  13. String sql,
  14. List<ColumnEntity> columnList,
  15. String schema,
  16. String table) {
  17. super(type, sql);
  18. this.columnList = columnList;
  19. this.schema = schema;
  20. this.table = table;
  21. }
  22. @Override
  23. public List<ColumnEntity> getColumnEntity() {
  24. return columnList;
  25. }
  26. @Override
  27. public String getDataBase() {
  28. throw new UnsupportedOperationException();
  29. }
  30. @Override
  31. public String getSchema() {
  32. return schema;
  33. }
  34. @Override
  35. public String getTable() {
  36. return table;
  37. }
  38. @Override
  39. public void setDataBase(String table) {
  40. }
  41. @Override
  42. public void setSchema(String schema) {
  43. this.schema = schema;
  44. }
  45. @Override
  46. public void setTable(String table) {
  47. this.table = table;
  48. }
  49. }

加载DdlConvent

spi方式加载插件
image.png

解析sql

通过calcite工具进行解析sql,但是calcite仅支持解析如下标准格式

  1. createTableStatement:
  2. CREATE TABLE [ IF NOT EXISTS ] name
  3. [ '(' tableElement [, tableElement ]* ')' ]
  4. [ AS query ]

示例:

  1. CREATE TABLE `t123e` (
  2. `column_1` int DEFAULT NULL,
  3. `column_2` varchar(255) DEFAULT NULL
  4. )

Mysql语法:

  1. CREATE TABLE `test120` (
  2. `column_1` int NOT NULL,
  3. `column_2` int DEFAULT '123',
  4. PRIMARY KEY (`column_1`),
  5. UNIQUE KEY `test120_column_1_uindex` (`column_1`)
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

对于Mysql语法是无法解析的,解决方案:

  • 每个数据源继承 SqlAbstractParserImpl 完成对应数据源的解析 熟悉calcite解析逻辑,相关开发需要熟悉calcite 以及 解析,容易出现单点问题。

如何将中间数据转为sql

image.png
calcite源码里 每种类型的SqlNode转为sql的时候也是字符串拼接的方式转为sql的