背景:
支持数据还原在异构数据源之间的ddl进行转换,达到ddl自动执行
方案
目标:
- 异构数据源之间ddl需要相关转换
- 库名 表名 列名需要做到映射转换
流程
一个完整的自动执行DDL 数据还原流程
MappingAndDdlConventFlatMap设计
何时存在此算子:
- 存在nameMapping设置
- ddl自动执行 且 source以及sink是异构数据源
a和b是或条件

nameMapping转换
- 如果存在NameMaping进行映射,则先进行名称转换
- 如果需要ddl转换且数据是ddlRowData 则
- 通过source对应的ddlConvent转为中间数据
- 中间数据进行NameMapping映射转换
- 中间数据通过sink对应的ddlCOnvent进行转换为sql
- 将sql替换掉rowData里的源sql

DdlConvent设计
ddlConvent设计

DdlConvent
DdlConvent类作为顶层接口提供了三个基础接口
//ddlRowData转为中间数据Ddldata rowConventToDdlData(RowData row);//中间数据转为sqlString ddlDataConventToSql(Ddldata ddldata);//convent对应的数据源类型String getDataSource();
BaseCalciteDdlConvent
�基于calcite的基础实现
protected final TypeConvent typeConvent;// 解析配置protected final SqlParser.Config sqlParseConfig;public BaseCalciteDdlConvent(TypeConvent typeConvent, SqlParser.Config sqlParseConfig) {this.typeConvent = typeConvent;this.sqlParseConfig = sqlParseConfig;}@Overridepublic String ddlDataConventToSql(Ddldata ddldata) {switch (ddldata.getType()) {case CREATE_SCHEMA:return doParseCreateSchemaDataToSql(ddldata);case ALTER_TABLE:return doParseAlterTableDataToSql(ddldata);case CREATE_TABLE:return doParseCreateTableDataToSql(ddldata);default:throw new UnsupportedOperationException("not support " + ddldata.getType());}}@Overridepublic Ddldata rowConventToDdlData(RowData row) {String sql = getSql(row);// 创建解析器 这儿后续需要替换为flinkxSqlParseSqlParser parser = SqlParser.create(sql, sqlParseConfig);SqlNode sqlNode;try {// 解析sqlsqlNode = parser.parseStmt();} catch (SqlParseException e) {throw new RuntimeException(e);}switch (sqlNode.getKind().toString()) {case "CREATE_TABLE":return doParseCreateTableSqlToData(sqlNode, row);case "CREATE_SCHEMA":return doParseCreateSchemaSqlToData(sqlNode, row);case "ALTER_TABLE":return doParseAlterTableSqlToData(sqlNode, row);default://todo 异常如何管理throw new RuntimeException("not support parse sql type " + sqlNode.getKind());}}protected Ddldata doParseCreateTableSqlToData(SqlNode sql, RowData row) {throw new UnsupportedOperationException("not support CreateTable ");}protected Ddldata doParseAlterTableSqlToData(SqlNode sql, RowData row) {throw new UnsupportedOperationException("not support AlterTable ");}protected Ddldata doParseCreateSchemaSqlToData(SqlNode sql, RowData row) {throw new UnsupportedOperationException("not support CreateSchema");}protected String doParseAlterTableDataToSql(Ddldata ddldata) {throw new UnsupportedOperationException("not support " + ddldata.getType());}protected String doParseCreateSchemaDataToSql(Ddldata ddldata) {throw new UnsupportedOperationException("not support " + ddldata.getType());}protected String doParseCreateTableDataToSql(Ddldata ddldata) {throw new UnsupportedOperationException("not support " + ddldata.getType());}private String getSql(RowData row) {DdlRowData ddlRowData = (DdlRowData) row;String sql = "";for (int i = 0; i < ddlRowData.getHeaders().length; i++) {if (ddlRowData.getHeaders()[i].equals("sql")) {sql = ddlRowData.getInfo(i);}}return sql;}
TypeConvent
类型转换,因为异构数据源之间除了Sql格式需要转换以外,还需要类型的转换,因此类型转换单独抽取出来作为ddlConvent的一部分
public interface TypeConvent {//数据源类型转为内部类型Type conventInternal(String name);//内部类型转为数据源类型String conventExternal(Type type);}
�将数据源类型转为中间类型,然后再将中间类型转为对应的数据源类型。
中间类型支持

- 每种数据源需要按照FlinkSqlParserImpl方式 实现 SqlAbstractParserImpl 类进行解析
中间数据设计
中间数据设计按照create alter等不同语法都有不同的ddlData。如create table语句就会转为一个createTable的ddlData
package com.dtstack.chunjun.cdc.ddl.entity;import com.dtstack.chunjun.cdc.EventType;import javax.naming.OperationNotSupportedException;import java.util.List;public class CreateTableData extends DdlData implements Identity, ColumnData {private String schema;private String table;private final List<ColumnEntity> columnList;private final List<IndexEntity> indexList;private final List<PartitionEntity> partitionList;public CreateTableData(EventType type,String sql,List<ColumnEntity> columnList,String schema,String table) {super(type, sql);this.columnList = columnList;this.schema = schema;this.table = table;}@Overridepublic List<ColumnEntity> getColumnEntity() {return columnList;}@Overridepublic String getDataBase() {throw new UnsupportedOperationException();}@Overridepublic String getSchema() {return schema;}@Overridepublic String getTable() {return table;}@Overridepublic void setDataBase(String table) {}@Overridepublic void setSchema(String schema) {this.schema = schema;}@Overridepublic void setTable(String table) {this.table = table;}}
加载DdlConvent
spi方式加载插件
解析sql
通过calcite工具进行解析sql,但是calcite仅支持解析如下标准格式
createTableStatement:CREATE TABLE [ IF NOT EXISTS ] name[ '(' tableElement [, tableElement ]* ')' ][ AS query ]
示例:
CREATE TABLE `t123e` (`column_1` int DEFAULT NULL,`column_2` varchar(255) DEFAULT NULL)
Mysql语法:
CREATE TABLE `test120` (`column_1` int NOT NULL,`column_2` int DEFAULT '123',PRIMARY KEY (`column_1`),UNIQUE KEY `test120_column_1_uindex` (`column_1`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
对于Mysql语法是无法解析的,解决方案:
- 每个数据源继承 SqlAbstractParserImpl 完成对应数据源的解析 熟悉calcite解析逻辑,相关开发需要熟悉calcite 以及 解析,容易出现单点问题。
如何将中间数据转为sql

calcite源码里 每种类型的SqlNode转为sql的时候也是字符串拼接的方式转为sql的
