MaxCompute
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
Redis
Redis 文档地址为:https://help.aliyun.com/product/26340.html
- 创建实例
- 这里创建按量付费本地盘实例
- 将 Redis 实例创建进和 Flink 全托管相同的网段内
- 获取专有网络地址
import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runSinkRedis();
}
public static void runSinkRedis() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps", "default",
"0.36.4-public", "AccessKeyID", "AccessKeySecret",
"Project",
"EndPoint",
"Project"
);
// schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category string
CatalogSourceBatchOp source = new CatalogSourceBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
RedisSinkBatchOp sink = new RedisSinkBatchOp()
.setRedisIPs("ip:port")
.setRedisPassword("PASSWORD")
.setClusterMode(true)
.setDatabaseIndex(0)
.setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width")
.setValueCols("category")
.setPluginVersion("2.9.0");
source.link(sink);
BatchOperator.execute();
}
} ```
- 特性
- 支持集群模式
- 支持选择数据库索引号
- 当前存储为 byte[], 未来版本中会进行扩展