MaxCompute

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

Redis

Redis 文档地址为:https://help.aliyun.com/product/26340.html

  1. 创建实例

image.png

  1. 这里创建按量付费本地盘实例
  2. 将 Redis 实例创建进和 Flink 全托管相同的网段内
  3. 获取专有网络地址

image.png

  1. 设置内网网段进白名单,使 Flink 集群可以访问到 Redis 实例

    实例

  2. 代码 ```java package org.example;

import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

  1. public static void main(String[] args) throws Exception {
  2. runSinkRedis();
  3. }
  4. public static void runSinkRedis() throws Exception {
  5. // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
  6. OdpsCatalog odpsCatalog = new OdpsCatalog(
  7. "odps", "default",
  8. "0.36.4-public", "AccessKeyID", "AccessKeySecret",
  9. "Project",
  10. "EndPoint",
  11. "Project"
  12. );
  13. // schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category string
  14. CatalogSourceBatchOp source = new CatalogSourceBatchOp()
  15. .setCatalogObject(
  16. new CatalogObject(
  17. odpsCatalog,
  18. // 填写好 Project
  19. new ObjectPath("Project", "test_alink_iris"),
  20. new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
  21. )
  22. );
  23. RedisSinkBatchOp sink = new RedisSinkBatchOp()
  24. .setRedisIPs("ip:port")
  25. .setRedisPassword("PASSWORD")
  26. .setClusterMode(true)
  27. .setDatabaseIndex(0)
  28. .setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width")
  29. .setValueCols("category")
  30. .setPluginVersion("2.9.0");
  31. source.link(sink);
  32. BatchOperator.execute();
  33. }

} ```

  1. 特性
  • 支持集群模式
  • 支持选择数据库索引号
  • 当前存储为 byte[], 未来版本中会进行扩展