导出到Redis (RedisRowSinkStreamOp)

Java 类名:com.alibaba.alink.operator.stream.sink.RedisRowSinkStreamOp

Python 类名:RedisRowSinkStreamOp

功能介绍

将一个流式数据,按行写到Redis里,键和值可以是多列。

在使用时,需要先下载插件,详情请看https://www.yuque.com/pinshu/alink_guide/czg4cx

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
pluginVersion 插件版本号 插件版本号 String
clusterMode Not available! Not available! Boolean false
databaseIndex Not available! Not available! Long
keyCols 多键值列 多键值列 String[] null
pipelineSize Not available! Not available! Integer 1
redisIPs Not available! Not available! String[]
redisPassword Not available! Not available! String
timeout Not available! Not available! Integer
valueCols 多数值列 多数值列 String[] null

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. redisIP = "127.0.0.1:6379"
  2. df = pd.DataFrame([
  3. ["football", 1.0],
  4. ["football", 2.0],
  5. ["football", 3.0],
  6. ["basketball", 4.0],
  7. ["basketball", 5.0],
  8. ["tennis", 6.0],
  9. ["tennis", 7.0],
  10. ["pingpang", 8.0],
  11. ["pingpang", 9.0],
  12. ["baseball", 10.0]])
  13. streamData = StreamOperator.fromDataframe(df, schemaStr='id string,val double')
  14. streamData.link(RedisRowSinkStreamOp()\
  15. .setRedisIPs(redisIP)\
  16. .setKeyCols(["id"])\
  17. .setValueCols(["val"])\
  18. .setPluginVersion("2.9.0"))
  19. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.sink;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.batch.BatchOperator;
  4. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  5. import com.alibaba.alink.testutil.AlinkTestBase;
  6. import org.junit.Test;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class RedisRowSinkStreamOpTest extends AlinkTestBase {
  10. @Test
  11. public void test() throws Exception {
  12. String redisIP = "*";
  13. int redisPort = 0;
  14. List <Row> df = Arrays.asList(
  15. Row.of("football", 1.0),
  16. Row.of("football", 2.0),
  17. Row.of("football", 3.0),
  18. Row.of("basketball", 4.0),
  19. Row.of("basketball", 5.0),
  20. Row.of("tennis", 6.0),
  21. Row.of("tennis", 7.0),
  22. Row.of("pingpang", 8.0),
  23. Row.of("pingpang", 9.0),
  24. Row.of("baseball", 10.0)
  25. );
  26. StreamOperator <?> data = new MemSourceStreamOp(df, "id string,val double");
  27. RedisRowSinkStreamOp sink = new RedisRowSinkStreamOp()
  28. .setRedisIPs(redisIP)
  29. .setKeyCols("id")
  30. .setValueCols("val")
  31. .setPluginVersion("2.9.0");
  32. data.link(sink);
  33. StreamOperator.execute();
  34. }
  35. }