Java 类名:com.alibaba.alink.operator.batch.recommendation.RecommendationRankingBatchOp
Python 类名:RecommendationRankingBatchOp

功能介绍

该组件功能是对召回的结果进行排序,并输出排序后的TopK个object,此处排序算法用户可以通过创建PipelineModel的方式定制,具体使用方式参见代码示例。

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| mTableCol | Not available! | Not available! | String | ✓ | 所选列类型为 [M_TABLE] | |

| modelFilePath | 模型的文件路径 | 模型的文件路径 | String | | | null |

| outputCol | 输出结果列 | 输出结果列列名,可选,默认null | String | | | null |

| rankingCol | 用来排序的得分列 | 用来排序的得分列 | String | | | null |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| topN | 前N的数据 | 挑选最近的N个数据 | Integer | | [1, +inf) | 10 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import pandas as pd
  5. data = pd.DataFrame([["u6", "0.0 1.0", 0.0, 1.0, 1, "{\"data\":{\"iid\":[18,19,88]},\"schema\":\"iid INT\"}"]])
  6. predData = BatchOperator.fromDataframe(data, schemaStr='uid string, uf string, f0 double, f1 double, labels int, ilist string')
  7. predData = predData.link(ToMTableBatchOp().setSelectedCol("ilist"))
  8. data = pd.DataFrame([
  9. ["u0", "1.0 1.0", 1.0, 1.0, 1, 18],
  10. ["u1", "1.0 1.0", 1.0, 1.0, 0, 19],
  11. ["u2", "1.0 0.0", 1.0, 0.0, 1, 88],
  12. ["u3", "1.0 0.0", 1.0, 0.0, 0, 18],
  13. ["u4", "0.0 1.0", 0.0, 1.0, 1, 88],
  14. ["u5", "0.0 1.0", 0.0, 1.0, 0, 19],
  15. ["u6", "0.0 1.0", 0.0, 1.0, 1, 88]]);
  16. trainData = BatchOperator.fromDataframe(data, schemaStr='uid string, uf string, f0 double, f1 double, labels int, iid string')
  17. oneHotCols = ["uid", "f0", "f1", "iid"]
  18. multiHotCols = ["uf"]
  19. pipe = Pipeline() \
  20. .add( \
  21. OneHotEncoder() \
  22. .setSelectedCols(oneHotCols) \
  23. .setOutputCols(["ovec"])) \
  24. .add( \
  25. MultiHotEncoder().setDelimiter(" ") \
  26. .setSelectedCols(multiHotCols) \
  27. .setOutputCols(["mvec"])) \
  28. .add( \
  29. VectorAssembler() \
  30. .setSelectedCols(["ovec", "mvec"]) \
  31. .setOutputCol("vec")) \
  32. .add(
  33. LogisticRegression() \
  34. .setVectorCol("vec") \
  35. .setLabelCol("labels") \
  36. .setReservedCols(["uid", "iid"]) \
  37. .setPredictionDetailCol("detail") \
  38. .setPredictionCol("pred")) \
  39. .add( \
  40. JsonValue() \
  41. .setSelectedCol("detail") \
  42. .setJsonPath(["$.1"]) \
  43. .setOutputCols(["score"]))
  44. lrModel = pipe.fit(trainData)
  45. rank = RecommendationRankingBatchOp()\
  46. .setMTableCol("ilist")\
  47. .setOutputCol("il")\
  48. .setTopN(2)\
  49. .setRankingCol("score")\
  50. .setReservedCols(["uid", "labels"])
  51. rank.linkFrom(lrModel.save(), predData).print()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  4. import com.alibaba.alink.pipeline.Pipeline;
  5. import com.alibaba.alink.pipeline.classification.LogisticRegression;
  6. import com.alibaba.alink.pipeline.dataproc.JsonValue;
  7. import com.alibaba.alink.pipeline.dataproc.vector.VectorAssembler;
  8. import com.alibaba.alink.pipeline.feature.MultiHotEncoder;
  9. import com.alibaba.alink.pipeline.feature.OneHotEncoder;
  10. import org.junit.Test;
  11. import java.util.Arrays;
  12. public class RecommendationRankingTest {
  13. @Test
  14. public void test() throws Exception {
  15. Row[] predArray = new Row[] {
  16. Row.of("u6", "0.0 1.0", 0.0, 1.0, 1, "{\"data\":{\"iid\":[18,19,88]},"
  17. + "\"schema\":\"iid INT\"}")
  18. };
  19. Row[] trainArray = new Row[] {
  20. Row.of("u0", "1.0 1.0", 1.0, 1.0, 1, 18),
  21. Row.of("u1", "1.0 1.0", 1.0, 1.0, 0, 19),
  22. Row.of("u2", "1.0 0.0", 1.0, 0.0, 1, 88),
  23. Row.of("u3", "1.0 0.0", 1.0, 0.0, 1, 18),
  24. Row.of("u4", "0.0 1.0", 0.0, 1.0, 1, 88),
  25. Row.of("u5", "0.0 1.0", 0.0, 1.0, 1, 19),
  26. Row.of("u6", "0.0 1.0", 0.0, 1.0, 1, 88)
  27. };
  28. BatchOperator <?> trainData = new MemSourceBatchOp(Arrays.asList(trainArray),
  29. new String[] {"uid", "uf", "f0", "f1", "labels", "iid"});
  30. BatchOperator <?> predData = new MemSourceBatchOp(Arrays.asList(predArray),
  31. new String[] {"uid", "uf", "f0", "f1", "labels", "ilist"});
  32. String[] oneHotCols = new String[] {"uid", "f0", "f1", "iid"};
  33. String[] multiHotCols = new String[] {"uf"};
  34. Pipeline pipe = new Pipeline()
  35. .add(
  36. new OneHotEncoder()
  37. .setSelectedCols(oneHotCols)
  38. .setOutputCols("ovec"))
  39. .add(
  40. new MultiHotEncoder().setDelimiter(" ")
  41. .setSelectedCols(multiHotCols)
  42. .setOutputCols("mvec"))
  43. .add(
  44. new VectorAssembler()
  45. .setSelectedCols("ovec", "mvec")
  46. .setOutputCol("vec"))
  47. .add(
  48. new LogisticRegression()
  49. .setVectorCol("vec")
  50. .setLabelCol("labels")
  51. .setReservedCols("uid", "iid")
  52. .setPredictionDetailCol("detail")
  53. .setPredictionCol("pred"))
  54. .add(
  55. new JsonValue()
  56. .setSelectedCol("detail")
  57. .setJsonPath("$.1")
  58. .setOutputCols("score"));
  59. RecommendationRankingBatchOp rank = new RecommendationRankingBatchOp()
  60. .setMTableCol("ilist")
  61. .setOutputCol("ilist")
  62. .setTopN(2)
  63. .setRankingCol("score")
  64. .setReservedCols("uid", "labels");
  65. rank.linkFrom(pipe.fit(trainData).save(), predData).print();
  66. }
  67. }

运行结果

| uid | uf | f0 | f1 | labels | ilist | | —- | —- | —- | —- | —- | —- |

| u6 | 0.0 1.0 | 0.0000 | 1.0000 | 1 | {“data”:{“iid”:[18,88],”score”:[0.9999999999999553,0.9999999999999472]},”schema”:”iid INT,score DOUBLE”} |