Java 类名:com.alibaba.alink.operator.stream.dataproc.LookupRedisStreamOp
Python 类名:LookupRedisStreamOp

功能介绍

支持数据查找功能,支持多个key的查找,并将查找后的结果中的value列添加到待查询数据后面。
功能类似于 LookUpStreamOp ,不同的是被查找的数据存储在 Redis 中。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
outputSchemaStr Schema Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” String
pluginVersion 插件版本号 插件版本号 String
selectedCols 选择的列名 计算列对应的列名列表 String[]
clusterMode Not available! Not available! Boolean false
databaseIndex Not available! Not available! Long
redisIP Not available! Not available! String
redisIPs Not available! Not available! String[]
redisPassword Not available! Not available! String
redisPort Not available! Not available! Integer 6379
reservedCols 算法保留列名 算法保留列 String[] null
timeout Not available! Not available! Integer

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. df = pd.DataFrame([
  2. ["id001", 123, 45.6, "str"]
  3. ])
  4. inOp = BatchOperator.fromDataframe(df, schemaStr='id string, col0 bigint, col1 double, col2 string')
  5. redisIP = "*"
  6. redisPort = 26379
  7. RedisSinkBatchOp()\
  8. .setRedisIP(redisIP)\
  9. .setRedisPort(redisPort)\
  10. .setKeyCols(["id"])\
  11. .setPluginVersion("2.9.0")\
  12. .setValueCols(["col0", "col1", "col2"])\
  13. .linkFrom(inOp)
  14. BatchOperator.execute()
  15. df2 = pd.DataFrame([
  16. ["id001"]
  17. ])
  18. needToLookup = StreamOperator.fromDataframe(df2, schemaStr="id string")
  19. LookupRedisStreamOp()\
  20. .setRedisIP(redisIP)\
  21. .setRedisPort(redisPort)\
  22. .setPluginVersion("2.9.0")\
  23. .setSelectedCols(["id"])\
  24. .setOutputSchemaStr("col0 bigint, col1 double, col2 string")\
  25. .linkFrom(needToLookup)\
  26. .print()
  27. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.common.AlinkGlobalConfiguration;
  3. import com.alibaba.alink.operator.batch.BatchOperator;
  4. import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.operator.stream.StreamOperator;
  7. import com.alibaba.alink.operator.stream.dataproc.LookupRedisStreamOp;
  8. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  9. import com.alibaba.alink.testutil.AlinkTestBase;
  10. import org.junit.Test;
  11. import java.util.Collections;
  12. public class LookupRedisStreamOpTest extends AlinkTestBase {
  13. @Test
  14. public void map() throws Exception {
  15. String redisIP = "*";
  16. int redisPort = 26379;
  17. MemSourceBatchOp memSourceBatchOp = new MemSourceBatchOp(
  18. Collections.singletonList(Row.of("id001", 123L, 45.6, "str")),
  19. "id string, col0 bigint, col1 double, col2 string"
  20. );
  21. new RedisSinkBatchOp()
  22. .setRedisIP(redisIP)
  23. .setRedisPort(redisPort)
  24. .setKeyCols("id")
  25. .setPluginVersion("2.9.0")
  26. .setValueCols("col0", "col1", "col2")
  27. .linkFrom(memSourceBatchOp);
  28. BatchOperator.execute();
  29. MemSourceStreamOp needToLookup = new MemSourceStreamOp(
  30. Collections.singletonList(Row.of("id001")),
  31. "id string"
  32. );
  33. new LookupRedisStreamOp()
  34. .setRedisIP(redisIP)
  35. .setRedisPort(redisPort)
  36. .setPluginVersion("2.9.0")
  37. .setSelectedCols("id")
  38. .setOutputSchemaStr("col0 bigint, col1 double, col2 string")
  39. .linkFrom(needToLookup)
  40. .print();
  41. StreamOperator.execute();
  42. }
  43. }

运行结果

| id | col0 | col1 | col2 |
|———-+———+————-+———|
| id001 | 123 | 45.6000 | str |