MaxCompute

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

MySQL

Mysql 文档地址为 https://help.aliyun.com/document_detail/95798.html

  1. 创建实例

image.png

  1. 这里创建非高可用的实例,且 MySQL 版本选择的 5.7,生产中可以考虑创建对应的高可用实例类型
  2. 将 MySQL 示例创建进和 Flink 全托管相同的网段内
  3. 创建用户
  4. 设置内网网段进白名单,使 Flink 集群可以访问到 MySQL 实例

image.png

实例

  1. 代码 ```java package org.example;

import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

  1. public static void main(String[] args) throws Exception {
  2. runReadOdpsWriteMysql();
  3. }
  4. public static void runReadOdpsWriteMysql() throws Exception {
  5. // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
  6. OdpsCatalog odpsCatalog = new OdpsCatalog(
  7. "odps",
  8. "default",
  9. "0.36.4-public",
  10. "AccessKeyID",
  11. "AccessKeySecret",
  12. "Project",
  13. "EndPoint",
  14. "RunningProject"
  15. );
  16. // 填写好 DefaultDatabase,MySQL URL,PORT, USER, PASSWORD。
  17. MySqlCatalog mySqlCatalog = new MySqlCatalog(
  18. "odps2mysql",
  19. "DefaultDatabase",
  20. "5.1.27",
  21. "MySQL URL",
  22. "PORT",
  23. "USER",
  24. "PASSWORD"
  25. );
  26. CatalogSourceBatchOp odpsSourceBatchOp = new CatalogSourceBatchOp()
  27. .setCatalogObject(
  28. new CatalogObject(
  29. odpsCatalog,
  30. // 填写好 Project
  31. new ObjectPath("Project", "test_alink_iris")
  32. )
  33. );
  34. CatalogSinkBatchOp mysqlSinkBatchOp = new CatalogSinkBatchOp()
  35. .setCatalogObject(
  36. new CatalogObject(
  37. mySqlCatalog,
  38. new ObjectPath("DATABASE", "test_alink_iris"),
  39. new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
  40. )
  41. );
  42. odpsSourceBatchOp.link(mysqlSinkBatchOp);
  43. BatchOperator.execute();
  44. }

} ```