编写代码
新建Maven项目
- Intellij中点击菜单 File -》new -> Project,弹出如下界面
- 点击 Next, 有如下界面,填写好项目名字,这里叫 AlinkMavenExample, 点击 Finish,完成项目创建
修改 pom.xml
这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错,https://help.aliyun.com/document_detail/169596.html
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>AlinkMavenExample</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.13_2.11</artifactId>
<version>0.1-patches-flink-1.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 直接加上 shade 过程,这样可以使用生成的jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>post-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编写 main 函数
这里以读 http csv 数据源并写系统 oss 为例:
- 在 org.example 中新建 Java 类 HelloAlink
- 编写代码 ```java package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.FlinkFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOssSink();
}
public static void runOssSink() throws Exception {
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
// 修改 xxx 为新建的 oss bucket
.setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx")))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}
3. 编译为 shade 的 jar 包
点击 maven 中 package<br /><br />在 target 目录中找到生成的 shade 包<br />
<a name="g1j5k"></a>
# 上传 jar 包
将上一步生成的jar,作为资源上传到 Flink 全托管集群上<br />
<a name="CRpD0"></a>
# 作业开发
<a name="cVT1N"></a>
## 新建作业

<a name="JybWO"></a>
## 输入作业信息
这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认<br />
<a name="SDvkS"></a>
## 配置作业
将图中红框框选的内容,填入合适的内容,这里注意几项
1. Entry Point Class 为 org.example.HelloAlink,参考代码中 main 函数的位置
1. 并行度这里我们设置为 1 , 因为这里是一个很小的任务,为了省资源,填写最小值
1. 引擎版本选择 vvr-4.0.12-flink-1.13 ,原因可以参考 "修改 pom.xml" 章节
1. Flink 重启策略配置选择 No Restarts ,因为我们提交的为一个批任务,如果一直重启的话,会造成任务失败,不结束

<a name="eoeaz"></a>
## 上线
点击如图按钮即可<br />
<a name="LwtzL"></a>
# 作业运维
<a name="C5Qcb"></a>
## 启动作业
点击下图中红框中按钮即可启动作业<br />
<a name="yWdoF"></a>
## 日志查看

<a name="x33XG"></a>
# 结果查看
当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据<br />
<a name="kY6bd"></a>
# Alink实例
<a name="YShpn"></a>
## 读 Http 写入到 自定义 Oss 中
1. 选择Oss Bucket
OSS Region和Endpoint对照表: [https://help.aliyun.com/document_detail/31837.html](https://help.aliyun.com/document_detail/31837.html),这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: [http://oss-cn-beijing-internal.aliyuncs.com/](http://oss-cn-beijing-internal.aliyuncs.com/)
2. 代码如下:
```java
package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.OssFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOthersOssSink();
}
public static void runOthersOssSink() throws Exception {
// 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret
OssFileSystem oss =
new OssFileSystem(
"3.4.1",
"EndPoint",
"Bucket",
"AccessKeyID",
"AccessKeySecret"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
.setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}
读 Http 写入到 MaxCompute 中
- 选择 MaxCompute project
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
- 代码如下: ```java package org.example;
import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runSinkOdps();
}
public static void runSinkOdps() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps", "default",
"0.36.4-public", "AccessKeyID", "AccessKeySecret",
"Project",
"EndPoint",
"RunningProject"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CatalogSinkBatchOp sink = new CatalogSinkBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
source.link(sink);
BatchOperator.execute();
}
} ```