Java 类名:com.alibaba.alink.operator.batch.dataproc.LookupRedisBatchOp
Python 类名:LookupRedisBatchOp

功能介绍

支持数据查找功能,支持多个key的查找,并将查找后的结果中的value列添加到待查询数据后面。
功能类似于 LookUpBatchOp ,不同的是被查找的数据存储在 Redis 中。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
outputSchemaStr Schema Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” String
pluginVersion 插件版本号 插件版本号 String
selectedCols 选择的列名 计算列对应的列名列表 String[]
clusterMode Not available! Not available! Boolean false
databaseIndex Not available! Not available! Long
redisIP Not available! Not available! String
redisIPs Not available! Not available! String[]
redisPassword Not available! Not available! String
redisPort Not available! Not available! Integer 6379
reservedCols 算法保留列名 算法保留列 String[] null
timeout Not available! Not available! Integer

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. df = pd.DataFrame([
  2. ["id001", 123, 45.6, "str"]
  3. ])
  4. inOp = BatchOperator.fromDataframe(df, schemaStr='id string, col0 bigint, col1 double, col2 string')
  5. redisIP = "*"
  6. redisPort = 26379
  7. RedisSinkBatchOp()\
  8. .setRedisIP(redisIP)\
  9. .setRedisPort(redisPort)\
  10. .setKeyCols(["id"])\
  11. .setPluginVersion("2.9.0")\
  12. .setValueCols(["col0", "col1", "col2"])\
  13. .linkFrom(inOp)
  14. BatchOperator.execute()
  15. df2 = pd.DataFrame([
  16. ["id001"]
  17. ])
  18. needToLookup = BatchOperator.fromDataframe(df2, schemaStr="id string")
  19. LookupRedisBatchOp()\
  20. .setRedisIP(redisIP)\
  21. .setRedisPort(redisPort)\
  22. .setPluginVersion("2.9.0")\
  23. .setSelectedCols(["id"])\
  24. .setOutputSchemaStr("col0 bigint, col1 double, col2 string")\
  25. .linkFrom(needToLookup)\
  26. .lazyPrint(10)
  27. BatchOperator.execute()

Java 代码

  1. import com.alibaba.alink.common.AlinkGlobalConfiguration;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.dataproc.LookupRedisBatchOp;
  4. import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.testutil.AlinkTestBase;
  7. import org.junit.Test;
  8. import java.util.Collections;
  9. public class LookupRedisBatchOpTest extends AlinkTestBase {
  10. @Test
  11. public void map() throws Exception {
  12. String redisIP = "*";
  13. int redisPort = 26379;
  14. MemSourceBatchOp memSourceBatchOp = new MemSourceBatchOp(
  15. Collections.singletonList(Row.of("id001", 123L, 45.6, "str")),
  16. "id string, col0 bigint, col1 double, col2 string"
  17. );
  18. new RedisSinkBatchOp()
  19. .setRedisIP(redisIP)
  20. .setRedisPort(redisPort)
  21. .setKeyCols("id")
  22. .setPluginVersion("2.9.0")
  23. .setValueCols("col0", "col1", "col2")
  24. .linkFrom(memSourceBatchOp);
  25. BatchOperator.execute();
  26. MemSourceBatchOp needToLookup = new MemSourceBatchOp(
  27. Collections.singletonList(Row.of("id001")),
  28. "id string"
  29. );
  30. new LookupRedisBatchOp()
  31. .setRedisIP(redisIP)
  32. .setRedisPort(redisPort)
  33. .setPluginVersion("2.9.0")
  34. .setSelectedCols("id")
  35. .setOutputSchemaStr("col0 bigint, col1 double, col2 string")
  36. .linkFrom(needToLookup)
  37. .lazyPrint(10);
  38. BatchOperator.execute();
  39. }
  40. }

运行结果

| id | col0 | col1 | col2 |
|———-+———+————-+———|
| id001 | 123 | 45.6000 | str |