编写代码
新建Maven项目
- Intellij中点击菜单 File -》new -> Project,弹出如下界面
- 点击 Next, 有如下界面,填写好项目名字,这里叫 AlinkMavenExample, 点击 Finish,完成项目创建
修改 pom.xml
这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错,https://help.aliyun.com/document_detail/169596.html
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>AlinkMavenExample</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.13_2.11</artifactId>
<version>0.1-patches-flink-1.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 直接加上 shade 过程,这样可以使用生成的jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>post-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编写 main 函数
这里以读 http csv 数据源并写系统 oss 为例:
- 在 org.example 中新建 Java 类 HelloAlink
- 编写代码 ```java package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.FlinkFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOssSink();
}
public static void runOssSink() throws Exception {
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
// 修改 xxx 为新建的 oss bucket
.setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx")))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}
3. 编译为 shade 的 jar 包
点击 maven 中 package<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183204-9cbaa6d4-e90f-4039-8f85-c2b1a1982e57.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=457&id=ua676cffe&margin=%5Bobject%20Object%5D&name=image.png&originHeight=914&originWidth=958&originalType=binary&ratio=1&rotation=0&showTitle=false&size=114576&status=done&style=none&taskId=u7d468c84-32f0-4477-acff-0a86956c286&title=&width=479)<br />在 target 目录中找到生成的 shade 包<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183455-9537cb06-72e6-4415-a6f1-c486b5ae3052.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=365&id=u15c73ad7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=730&originWidth=1154&originalType=binary&ratio=1&rotation=0&showTitle=false&size=100542&status=done&style=none&taskId=u13869d54-57d3-4634-8229-bd2cdb3ab0f&title=&width=577)
<a name="g1j5k"></a>
# 上传 jar 包
将上一步生成的jar,作为资源上传到 Flink 全托管集群上<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183938-60ad7def-7c08-4e89-8034-de6e3325fd42.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=506&id=uc31cb68b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1012&originWidth=3452&originalType=binary&ratio=1&rotation=0&showTitle=false&size=237250&status=done&style=none&taskId=uad035ac0-b3ee-4160-9d51-dfbd2c46c73&title=&width=1726)
<a name="CRpD0"></a>
# 作业开发
<a name="cVT1N"></a>
## 新建作业
![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440183885-48942d29-5395-4658-8cbb-87ddcdc29136.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=190&id=u3c718cfb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=380&originWidth=1078&originalType=binary&ratio=1&rotation=0&showTitle=false&size=105756&status=done&style=none&taskId=u75114ee3-30e4-4923-ad58-8487e93f18e&title=&width=539)
<a name="JybWO"></a>
## 输入作业信息
这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440184292-cdd2c9ea-04b9-4765-a3c3-1d227dd34a46.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=499&id=u9bb80cdc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=998&originWidth=1208&originalType=binary&ratio=1&rotation=0&showTitle=false&size=254430&status=done&style=none&taskId=u9a6f95ed-6e0b-48af-a32a-ea0c65dde8d&title=&width=604)
<a name="SDvkS"></a>
## 配置作业
将图中红框框选的内容,填入合适的内容,这里注意几项
1. Entry Point Class 为 org.example.HelloAlink,参考代码中 main 函数的位置
1. 并行度这里我们设置为 1 , 因为这里是一个很小的任务,为了省资源,填写最小值
1. 引擎版本选择 vvr-4.0.12-flink-1.13 ,原因可以参考 "修改 pom.xml" 章节
1. Flink 重启策略配置选择 No Restarts ,因为我们提交的为一个批任务,如果一直重启的话,会造成任务失败,不结束
![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440184913-dbd64ea1-5889-4eec-b7f7-aebaeb2dcf67.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=940&id=u3ead41e1&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1880&originWidth=3042&originalType=binary&ratio=1&rotation=0&showTitle=false&size=439661&status=done&style=none&taskId=u5fced43b-6535-48ba-812e-c6f7a47b7a8&title=&width=1521)
<a name="eoeaz"></a>
## 上线
点击如图按钮即可<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185231-e0cc4795-a07a-4740-b2d9-6efef5780b9f.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=213&id=u71935787&margin=%5Bobject%20Object%5D&name=image.png&originHeight=426&originWidth=3446&originalType=binary&ratio=1&rotation=0&showTitle=false&size=365686&status=done&style=none&taskId=u953950a2-3beb-4802-840c-285f93854d7&title=&width=1723)
<a name="LwtzL"></a>
# 作业运维
<a name="C5Qcb"></a>
## 启动作业
点击下图中红框中按钮即可启动作业<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185267-099c1ceb-5cb7-4280-a9db-edc94c7c41fe.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=301&id=uf0fe9f16&margin=%5Bobject%20Object%5D&name=image.png&originHeight=602&originWidth=3448&originalType=binary&ratio=1&rotation=0&showTitle=false&size=197757&status=done&style=none&taskId=ud43c40d8-39a0-44ec-9c8f-2cf2f7c19ea&title=&width=1724)
<a name="yWdoF"></a>
## 日志查看
![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440185906-42adbb2f-9f22-421f-893d-ca40e1f36ce3.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=356&id=u631ec81f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=712&originWidth=3454&originalType=binary&ratio=1&rotation=0&showTitle=false&size=278830&status=done&style=none&taskId=uf7765fc4-3890-4481-985f-d2a37308bc4&title=&width=1727)
<a name="x33XG"></a>
# 结果查看
当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/286293/1648440186073-e683f6bd-2ad1-4fd4-841f-f0ff9fab4d27.png#clientId=u647a6cbd-0c0f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=144&id=u853361ea&margin=%5Bobject%20Object%5D&name=image.png&originHeight=288&originWidth=2448&originalType=binary&ratio=1&rotation=0&showTitle=false&size=194936&status=done&style=none&taskId=u9504e982-215d-4fb4-8431-ad325370250&title=&width=1224)
<a name="kY6bd"></a>
# Alink实例
<a name="YShpn"></a>
## 读 Http 写入到 自定义 Oss 中
1. 选择Oss Bucket
OSS Region和Endpoint对照表: [https://help.aliyun.com/document_detail/31837.html](https://help.aliyun.com/document_detail/31837.html),这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: [http://oss-cn-beijing-internal.aliyuncs.com/](http://oss-cn-beijing-internal.aliyuncs.com/)
2. 代码如下:
```java
package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.OssFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOthersOssSink();
}
public static void runOthersOssSink() throws Exception {
// 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret
OssFileSystem oss =
new OssFileSystem(
"3.4.1",
"EndPoint",
"Bucket",
"AccessKeyID",
"AccessKeySecret"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
.setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}
读 Http 写入到 MaxCompute 中
- 选择 MaxCompute project
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
- 代码如下: ```java package org.example;
import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runSinkOdps();
}
public static void runSinkOdps() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps", "default",
"0.36.4-public", "AccessKeyID", "AccessKeySecret",
"Project",
"EndPoint",
"RunningProject"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CatalogSinkBatchOp sink = new CatalogSinkBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
source.link(sink);
BatchOperator.execute();
}
} ```