MaxCompute
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
MySQL
Mysql 文档地址为 https://help.aliyun.com/document_detail/95798.html
- 创建实例
- 这里创建非高可用的实例,且 MySQL 版本选择的 5.7,生产中可以考虑创建对应的高可用实例类型
- 将 MySQL 示例创建进和 Flink 全托管相同的网段内
- 创建用户
- 设置内网网段进白名单,使 Flink 集群可以访问到 MySQL 实例
实例
- 代码 ```java package org.example;
import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runReadOdpsWriteMysql();
}
public static void runReadOdpsWriteMysql() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps",
"default",
"0.36.4-public",
"AccessKeyID",
"AccessKeySecret",
"Project",
"EndPoint",
"RunningProject"
);
// 填写好 DefaultDatabase,MySQL URL,PORT, USER, PASSWORD。
MySqlCatalog mySqlCatalog = new MySqlCatalog(
"odps2mysql",
"DefaultDatabase",
"5.1.27",
"MySQL URL",
"PORT",
"USER",
"PASSWORD"
);
CatalogSourceBatchOp odpsSourceBatchOp = new CatalogSourceBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris")
)
);
CatalogSinkBatchOp mysqlSinkBatchOp = new CatalogSinkBatchOp()
.setCatalogObject(
new CatalogObject(
mySqlCatalog,
new ObjectPath("DATABASE", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
odpsSourceBatchOp.link(mysqlSinkBatchOp);
BatchOperator.execute();
}
} ```