MaxCompute
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
Redis
Redis 文档地址为:https://help.aliyun.com/product/26340.html
- 创建实例

- 这里创建按量付费本地盘实例
- 将 Redis 实例创建进和 Flink 全托管相同的网段内
- 获取专有网络地址

import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {runSinkRedis();}public static void runSinkRedis() throws Exception {// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同OdpsCatalog odpsCatalog = new OdpsCatalog("odps", "default","0.36.4-public", "AccessKeyID", "AccessKeySecret","Project","EndPoint","Project");// schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category stringCatalogSourceBatchOp source = new CatalogSourceBatchOp().setCatalogObject(new CatalogObject(odpsCatalog,// 填写好 Projectnew ObjectPath("Project", "test_alink_iris"),new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)));RedisSinkBatchOp sink = new RedisSinkBatchOp().setRedisIPs("ip:port").setRedisPassword("PASSWORD").setClusterMode(true).setDatabaseIndex(0).setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width").setValueCols("category").setPluginVersion("2.9.0");source.link(sink);BatchOperator.execute();}
} ```
- 特性
- 支持集群模式
- 支持选择数据库索引号
- 当前存储为 byte[], 未来版本中会进行扩展
