导出到Redis (RedisStringSinkStreamOp)

Java 类名:com.alibaba.alink.operator.stream.sink.RedisStringSinkStreamOp

Python 类名:RedisStringSinkStreamOp

功能介绍

将一个流式数据,(单列String类型键值)按行写到Redis里。

注意事项

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
pluginVersion 插件版本号 插件版本号 String
clusterMode Not available! Not available! Boolean false
databaseIndex Not available! Not available! Long
keyCol 单键列 单键列 String null
pipelineSize Not available! Not available! Integer 1
redisIPs Not available! Not available! String[]
redisPassword Not available! Not available! String
timeout Not available! Not available! Integer
valueCol 单值列 单值列 String null

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. redisIP = "*"
  2. redisPort = 0
  3. df = pd.DataFrame([
  4. ["football", "1.0"],
  5. ["football", "2.0"],
  6. ["football", "3.0"]])
  7. batchData = StreamOperator.fromDataframe(df, schemaStr='id string,val double')
  8. batchData.link(RedisStringSinkStreamOp()\
  9. .setRedisIPs(redisIP)\
  10. .setKeyCol(["id"])\
  11. .setValueCol(["val"])\
  12. .setPluginVersion("2.9.0"))
  13. BatchOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.batch.sink;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.operator.stream.sink.RedisStringSinkStreamOp;
  6. import com.alibaba.alink.testutil.AlinkTestBase;
  7. import org.junit.Test;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. public class RedisStringSinkStreamOpTest extends AlinkTestBase {
  11. @Test
  12. public void test() throws Exception {
  13. String redisIP = "127.0.0.1:6379";
  14. int redisPort = 0;
  15. List <Row> df = Arrays.asList(
  16. Row.of("football", "1.0"),
  17. Row.of("football", "2.0")
  18. );
  19. StreamOperator <?> data = new MemSourceStreamOp(df, "id string,val string");
  20. RedisStringSinkStreamOp sink = new RedisStringSinkStreamOp()
  21. .setRedisIPs(redisIP)
  22. .setKeyCol("id")
  23. .setValueCol("val")
  24. .setPluginVersion("2.9.0");
  25. data.link(sink);
  26. StreamOperator.execute();
  27. }
  28. }