MaxCompute
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
MySQL
Mysql 文档地址为 https://help.aliyun.com/document_detail/95798.html
- 创建实例

- 这里创建非高可用的实例,且 MySQL 版本选择的 5.7,生产中可以考虑创建对应的高可用实例类型
- 将 MySQL 示例创建进和 Flink 全托管相同的网段内
- 创建用户
- 设置内网网段进白名单,使 Flink 集群可以访问到 MySQL 实例
实例
- 代码 ```java package org.example;
import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {runReadOdpsWriteMysql();}public static void runReadOdpsWriteMysql() throws Exception {// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同OdpsCatalog odpsCatalog = new OdpsCatalog("odps","default","0.36.4-public","AccessKeyID","AccessKeySecret","Project","EndPoint","RunningProject");// 填写好 DefaultDatabase,MySQL URL,PORT, USER, PASSWORD。MySqlCatalog mySqlCatalog = new MySqlCatalog("odps2mysql","DefaultDatabase","5.1.27","MySQL URL","PORT","USER","PASSWORD");CatalogSourceBatchOp odpsSourceBatchOp = new CatalogSourceBatchOp().setCatalogObject(new CatalogObject(odpsCatalog,// 填写好 Projectnew ObjectPath("Project", "test_alink_iris")));CatalogSinkBatchOp mysqlSinkBatchOp = new CatalogSinkBatchOp().setCatalogObject(new CatalogObject(mySqlCatalog,new ObjectPath("DATABASE", "test_alink_iris"),new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)));odpsSourceBatchOp.link(mysqlSinkBatchOp);BatchOperator.execute();}
} ```
