编写代码

新建Maven项目

  1. Intellij中点击菜单 File -》new -> Project,弹出如下界面

image.png

  1. 点击 Next, 有如下界面,填写好项目名字,这里叫 AlinkMavenExample, 点击 Finish,完成项目创建

image.png

修改 pom.xml

这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错,https://help.aliyun.com/document_detail/169596.html

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>AlinkMavenExample</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>com.alibaba.alink</groupId>
  12. <artifactId>alink_core_flink-1.13_2.11</artifactId>
  13. <version>0.1-patches-flink-1.13-SNAPSHOT</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-streaming-scala_2.11</artifactId>
  18. <version>1.13.0</version>
  19. <scope>provided</scope>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-table-planner_2.11</artifactId>
  24. <version>1.13.0</version>
  25. <scope>provided</scope>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-clients_2.11</artifactId>
  30. <version>1.13.0</version>
  31. <scope>provided</scope>
  32. </dependency>
  33. </dependencies>
  34. <build>
  35. <plugins>
  36. <plugin>
  37. <groupId>org.apache.maven.plugins</groupId>
  38. <artifactId>maven-compiler-plugin</artifactId>
  39. <configuration>
  40. <source>1.8</source>
  41. <target>1.8</target>
  42. </configuration>
  43. </plugin>
  44. <!-- Scala Compiler -->
  45. <plugin>
  46. <groupId>net.alchim31.maven</groupId>
  47. <artifactId>scala-maven-plugin</artifactId>
  48. <version>3.2.0</version>
  49. <executions>
  50. <execution>
  51. <id>scala-compile-first</id>
  52. <phase>process-resources</phase>
  53. <goals>
  54. <goal>compile</goal>
  55. </goals>
  56. </execution>
  57. <execution>
  58. <id>scala-test-compile-first</id>
  59. <phase>process-test-resources</phase>
  60. <goals>
  61. <goal>testCompile</goal>
  62. </goals>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. <!-- 直接加上 shade 过程,这样可以使用生成的jar包 -->
  67. <plugin>
  68. <groupId>org.apache.maven.plugins</groupId>
  69. <artifactId>maven-shade-plugin</artifactId>
  70. <executions>
  71. <execution>
  72. <id>post-shade</id>
  73. <phase>package</phase>
  74. <goals>
  75. <goal>shade</goal>
  76. </goals>
  77. <configuration>
  78. <minimizeJar>false</minimizeJar>
  79. <shadedArtifactAttached>true</shadedArtifactAttached>
  80. <transformers>
  81. <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  82. </transformers>
  83. </configuration>
  84. </execution>
  85. </executions>
  86. </plugin>
  87. </plugins>
  88. </build>
  89. </project>

编写 main 函数

这里以读 http csv 数据源并写系统 oss 为例:

  1. 在 org.example 中新建 Java 类 HelloAlink

image.png

  1. 编写代码 ```java package org.example;

import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.FlinkFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;

public class HelloAlink {

  1. public static void main(String[] args) throws Exception {
  2. runOssSink();
  3. }
  4. public static void runOssSink() throws Exception {
  5. CsvSourceBatchOp source = new CsvSourceBatchOp()
  6. .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
  7. .setSchemaStr(
  8. "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
  9. );
  10. CsvSinkBatchOp sink = new CsvSinkBatchOp()
  11. // 修改 xxx 为新建的 oss bucket
  12. .setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx")))
  13. .setOverwriteSink(true);
  14. source.link(sink);
  15. BatchOperator.execute();
  16. }

}

  1. 3. 编译为 shade jar
  2. 点击 maven package<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183204-9cbaa6d4-e90f-4039-8f85-c2b1a1982e57.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=457&id=ua676cffe&margin=%5Bobject%20Object%5D&name=image.png&originHeight=914&originWidth=958&originalType=binary&ratio=1&rotation=0&showTitle=false&size=114576&status=done&style=none&taskId=u7d468c84-32f0-4477-acff-0a86956c286&title=&width=479)<br />在 target 目录中找到生成的 shade 包<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183455-9537cb06-72e6-4415-a6f1-c486b5ae3052.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=365&id=u15c73ad7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=730&originWidth=1154&originalType=binary&ratio=1&rotation=0&showTitle=false&size=100542&status=done&style=none&taskId=u13869d54-57d3-4634-8229-bd2cdb3ab0f&title=&width=577)
  3. <a name="g1j5k"></a>
  4. # 上传 jar 包
  5. 将上一步生成的jar,作为资源上传到 Flink 全托管集群上<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183938-60ad7def-7c08-4e89-8034-de6e3325fd42.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=506&id=uc31cb68b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1012&originWidth=3452&originalType=binary&ratio=1&rotation=0&showTitle=false&size=237250&status=done&style=none&taskId=uad035ac0-b3ee-4160-9d51-dfbd2c46c73&title=&width=1726)
  6. <a name="CRpD0"></a>
  7. # 作业开发
  8. <a name="cVT1N"></a>
  9. ## 新建作业
  10. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183885-48942d29-5395-4658-8cbb-87ddcdc29136.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=190&id=u3c718cfb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=380&originWidth=1078&originalType=binary&ratio=1&rotation=0&showTitle=false&size=105756&status=done&style=none&taskId=u75114ee3-30e4-4923-ad58-8487e93f18e&title=&width=539)
  11. <a name="JybWO"></a>
  12. ## 输入作业信息
  13. 这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440184292-cdd2c9ea-04b9-4765-a3c3-1d227dd34a46.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=499&id=u9bb80cdc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=998&originWidth=1208&originalType=binary&ratio=1&rotation=0&showTitle=false&size=254430&status=done&style=none&taskId=u9a6f95ed-6e0b-48af-a32a-ea0c65dde8d&title=&width=604)
  14. <a name="SDvkS"></a>
  15. ## 配置作业
  16. 将图中红框框选的内容,填入合适的内容,这里注意几项
  17. 1. Entry Point Class org.example.HelloAlink,参考代码中 main 函数的位置
  18. 1. 并行度这里我们设置为 1 因为这里是一个很小的任务,为了省资源,填写最小值
  19. 1. 引擎版本选择 vvr-4.0.12-flink-1.13 ,原因可以参考 "修改 pom.xml" 章节
  20. 1. Flink 重启策略配置选择 No Restarts ,因为我们提交的为一个批任务,如果一直重启的话,会造成任务失败,不结束
  21. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440184913-dbd64ea1-5889-4eec-b7f7-aebaeb2dcf67.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=940&id=u3ead41e1&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1880&originWidth=3042&originalType=binary&ratio=1&rotation=0&showTitle=false&size=439661&status=done&style=none&taskId=u5fced43b-6535-48ba-812e-c6f7a47b7a8&title=&width=1521)
  22. <a name="eoeaz"></a>
  23. ## 上线
  24. 点击如图按钮即可<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185231-e0cc4795-a07a-4740-b2d9-6efef5780b9f.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=213&id=u71935787&margin=%5Bobject%20Object%5D&name=image.png&originHeight=426&originWidth=3446&originalType=binary&ratio=1&rotation=0&showTitle=false&size=365686&status=done&style=none&taskId=u953950a2-3beb-4802-840c-285f93854d7&title=&width=1723)
  25. <a name="LwtzL"></a>
  26. # 作业运维
  27. <a name="C5Qcb"></a>
  28. ## 启动作业
  29. 点击下图中红框中按钮即可启动作业<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185267-099c1ceb-5cb7-4280-a9db-edc94c7c41fe.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=301&id=uf0fe9f16&margin=%5Bobject%20Object%5D&name=image.png&originHeight=602&originWidth=3448&originalType=binary&ratio=1&rotation=0&showTitle=false&size=197757&status=done&style=none&taskId=ud43c40d8-39a0-44ec-9c8f-2cf2f7c19ea&title=&width=1724)
  30. <a name="yWdoF"></a>
  31. ## 日志查看
  32. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185906-42adbb2f-9f22-421f-893d-ca40e1f36ce3.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=356&id=u631ec81f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=712&originWidth=3454&originalType=binary&ratio=1&rotation=0&showTitle=false&size=278830&status=done&style=none&taskId=uf7765fc4-3890-4481-985f-d2a37308bc4&title=&width=1727)
  33. <a name="x33XG"></a>
  34. # 结果查看
  35. 当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440186073-e683f6bd-2ad1-4fd4-841f-f0ff9fab4d27.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=144&id=u853361ea&margin=%5Bobject%20Object%5D&name=image.png&originHeight=288&originWidth=2448&originalType=binary&ratio=1&rotation=0&showTitle=false&size=194936&status=done&style=none&taskId=u9504e982-215d-4fb4-8431-ad325370250&title=&width=1224)
  36. <a name="kY6bd"></a>
  37. # Alink实例
  38. <a name="YShpn"></a>
  39. ## 读 Http 写入到 自定义 Oss 中
  40. 1. 选择Oss Bucket
  41. OSS RegionEndpoint对照表: [https://help.aliyun.com/document_detail/31837.html](https://help.aliyun.com/document_detail/31837.html),这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: [http://oss-cn-beijing-internal.aliyuncs.com/](http://oss-cn-beijing-internal.aliyuncs.com/)
  42. 2. 代码如下:
  43. ```java
  44. package org.example;
  45. import com.alibaba.alink.common.io.filesystem.FilePath;
  46. import com.alibaba.alink.common.io.filesystem.OssFileSystem;
  47. import com.alibaba.alink.operator.batch.BatchOperator;
  48. import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
  49. import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
  50. public class HelloAlink {
  51. public static void main(String[] args) throws Exception {
  52. runOthersOssSink();
  53. }
  54. public static void runOthersOssSink() throws Exception {
  55. // 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret
  56. OssFileSystem oss =
  57. new OssFileSystem(
  58. "3.4.1",
  59. "EndPoint",
  60. "Bucket",
  61. "AccessKeyID",
  62. "AccessKeySecret"
  63. );
  64. CsvSourceBatchOp source = new CsvSourceBatchOp()
  65. .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
  66. .setSchemaStr(
  67. "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
  68. );
  69. CsvSinkBatchOp sink = new CsvSinkBatchOp()
  70. .setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss))
  71. .setOverwriteSink(true);
  72. source.link(sink);
  73. BatchOperator.execute();
  74. }
  75. }

读 Http 写入到 MaxCompute 中

  1. 选择 MaxCompute project

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

  1. 代码如下: ```java package org.example;

import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

  1. public static void main(String[] args) throws Exception {
  2. runSinkOdps();
  3. }
  4. public static void runSinkOdps() throws Exception {
  5. // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
  6. OdpsCatalog odpsCatalog = new OdpsCatalog(
  7. "odps", "default",
  8. "0.36.4-public", "AccessKeyID", "AccessKeySecret",
  9. "Project",
  10. "EndPoint",
  11. "RunningProject"
  12. );
  13. CsvSourceBatchOp source = new CsvSourceBatchOp()
  14. .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
  15. .setSchemaStr(
  16. "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
  17. );
  18. CatalogSinkBatchOp sink = new CatalogSinkBatchOp()
  19. .setCatalogObject(
  20. new CatalogObject(
  21. odpsCatalog,
  22. // 填写好 Project
  23. new ObjectPath("Project", "test_alink_iris"),
  24. new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
  25. )
  26. );
  27. source.link(sink);
  28. BatchOperator.execute();
  29. }

} ```