背景:
支持数据还原在异构数据源之间的ddl进行转换,达到ddl自动执行
方案
目标:
- 异构数据源之间ddl需要相关转换
- 库名 表名 列名需要做到映射转换
流程
一个完整的自动执行DDL 数据还原流程
MappingAndDdlConventFlatMap设计
何时存在此算子:
- 存在nameMapping设置
- ddl自动执行 且 source以及sink是异构数据源
a和b是或条件
nameMapping转换
- 如果存在NameMaping进行映射,则先进行名称转换
- 如果需要ddl转换且数据是ddlRowData 则
- 通过source对应的ddlConvent转为中间数据
- 中间数据进行NameMapping映射转换
- 中间数据通过sink对应的ddlCOnvent进行转换为sql
- 将sql替换掉rowData里的源sql
DdlConvent设计
ddlConvent设计
DdlConvent
DdlConvent类作为顶层接口提供了三个基础接口
//ddlRowData转为中间数据
Ddldata rowConventToDdlData(RowData row);
//中间数据转为sql
String ddlDataConventToSql(Ddldata ddldata);
//convent对应的数据源类型
String getDataSource();
BaseCalciteDdlConvent
�基于calcite的基础实现
protected final TypeConvent typeConvent;
// 解析配置
protected final SqlParser.Config sqlParseConfig;
public BaseCalciteDdlConvent(TypeConvent typeConvent, SqlParser.Config sqlParseConfig) {
this.typeConvent = typeConvent;
this.sqlParseConfig = sqlParseConfig;
}
@Override
public String ddlDataConventToSql(Ddldata ddldata) {
switch (ddldata.getType()) {
case CREATE_SCHEMA:
return doParseCreateSchemaDataToSql(ddldata);
case ALTER_TABLE:
return doParseAlterTableDataToSql(ddldata);
case CREATE_TABLE:
return doParseCreateTableDataToSql(ddldata);
default:
throw new UnsupportedOperationException("not support " + ddldata.getType());
}
}
@Override
public Ddldata rowConventToDdlData(RowData row) {
String sql = getSql(row);
// 创建解析器 这儿后续需要替换为flinkxSqlParse
SqlParser parser = SqlParser.create(sql, sqlParseConfig);
SqlNode sqlNode;
try {
// 解析sql
sqlNode = parser.parseStmt();
} catch (SqlParseException e) {
throw new RuntimeException(e);
}
switch (sqlNode.getKind().toString()) {
case "CREATE_TABLE":
return doParseCreateTableSqlToData(sqlNode, row);
case "CREATE_SCHEMA":
return doParseCreateSchemaSqlToData(sqlNode, row);
case "ALTER_TABLE":
return doParseAlterTableSqlToData(sqlNode, row);
default:
//todo 异常如何管理
throw new RuntimeException("not support parse sql type " + sqlNode.getKind());
}
}
protected Ddldata doParseCreateTableSqlToData(SqlNode sql, RowData row) {
throw new UnsupportedOperationException("not support CreateTable ");
}
protected Ddldata doParseAlterTableSqlToData(SqlNode sql, RowData row) {
throw new UnsupportedOperationException("not support AlterTable ");
}
protected Ddldata doParseCreateSchemaSqlToData(SqlNode sql, RowData row) {
throw new UnsupportedOperationException("not support CreateSchema");
}
protected String doParseAlterTableDataToSql(Ddldata ddldata) {
throw new UnsupportedOperationException("not support " + ddldata.getType());
}
protected String doParseCreateSchemaDataToSql(Ddldata ddldata) {
throw new UnsupportedOperationException("not support " + ddldata.getType());
}
protected String doParseCreateTableDataToSql(Ddldata ddldata) {
throw new UnsupportedOperationException("not support " + ddldata.getType());
}
private String getSql(RowData row) {
DdlRowData ddlRowData = (DdlRowData) row;
String sql = "";
for (int i = 0; i < ddlRowData.getHeaders().length; i++) {
if (ddlRowData.getHeaders()[i].equals("sql")) {
sql = ddlRowData.getInfo(i);
}
}
return sql;
}
TypeConvent
类型转换,因为异构数据源之间除了Sql格式需要转换以外,还需要类型的转换,因此类型转换单独抽取出来作为ddlConvent的一部分
public interface TypeConvent {
//数据源类型转为内部类型
Type conventInternal(String name);
//内部类型转为数据源类型
String conventExternal(Type type);
}
�将数据源类型转为中间类型,然后再将中间类型转为对应的数据源类型。
中间类型支持
- 每种数据源需要按照FlinkSqlParserImpl方式 实现 SqlAbstractParserImpl 类进行解析
中间数据设计
中间数据设计按照create alter等不同语法都有不同的ddlData。如create table语句就会转为一个createTable的ddlData
package com.dtstack.chunjun.cdc.ddl.entity;
import com.dtstack.chunjun.cdc.EventType;
import javax.naming.OperationNotSupportedException;
import java.util.List;
public class CreateTableData extends DdlData implements Identity, ColumnData {
private String schema;
private String table;
private final List<ColumnEntity> columnList;
private final List<IndexEntity> indexList;
private final List<PartitionEntity> partitionList;
public CreateTableData(
EventType type,
String sql,
List<ColumnEntity> columnList,
String schema,
String table) {
super(type, sql);
this.columnList = columnList;
this.schema = schema;
this.table = table;
}
@Override
public List<ColumnEntity> getColumnEntity() {
return columnList;
}
@Override
public String getDataBase() {
throw new UnsupportedOperationException();
}
@Override
public String getSchema() {
return schema;
}
@Override
public String getTable() {
return table;
}
@Override
public void setDataBase(String table) {
}
@Override
public void setSchema(String schema) {
this.schema = schema;
}
@Override
public void setTable(String table) {
this.table = table;
}
}
加载DdlConvent
spi方式加载插件
解析sql
通过calcite工具进行解析sql,但是calcite仅支持解析如下标准格式
createTableStatement:
CREATE TABLE [ IF NOT EXISTS ] name
[ '(' tableElement [, tableElement ]* ')' ]
[ AS query ]
示例:
CREATE TABLE `t123e` (
`column_1` int DEFAULT NULL,
`column_2` varchar(255) DEFAULT NULL
)
Mysql语法:
CREATE TABLE `test120` (
`column_1` int NOT NULL,
`column_2` int DEFAULT '123',
PRIMARY KEY (`column_1`),
UNIQUE KEY `test120_column_1_uindex` (`column_1`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
对于Mysql语法是无法解析的,解决方案:
- 每个数据源继承 SqlAbstractParserImpl 完成对应数据源的解析 熟悉calcite解析逻辑,相关开发需要熟悉calcite 以及 解析,容易出现单点问题。
如何将中间数据转为sql
calcite源码里 每种类型的SqlNode转为sql的时候也是字符串拼接的方式转为sql的