Java 类名:com.alibaba.alink.operator.stream.recommendation.UserCfRateRecommStreamOp
Python 类名:UserCfRateRecommStreamOp

功能介绍

UserCF 打分是使用UserCF模型,实时预测user对item的评分。

参数说明

好的,我已经把你的内容转换为 markdown table 格式了。你可以复制下面的代码块到你想要的地方。

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
itemCol Item列列名 Item列列名 String
recommCol 推荐结果列名 推荐结果列名 String
userCol User列列名 User列列名 String
reservedCols 算法保留列名 算法保留列名 String[] null
numThreads 组件多线程线程个数 组件多线程线程个数 Integer 1
modelStreamFilePath 模型流的文件路径 模型流的文件路径 String null
modelStreamScanInterval 扫描模型路径的时间间隔 扫描模型路径的时间间隔,单位秒。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s)。 Integer 10
modelStreamStartTime 模型流的起始时间 模型流的起始时间。默认从当前时刻开始读。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s)。 String null

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df_data = pd.DataFrame([
  5. [1, 1, 0.6],
  6. [2, 2, 0.8],
  7. [2, 3, 0.6],
  8. [4, 1, 0.6],
  9. [4, 2, 0.3],
  10. [4, 3, 0.4],
  11. ])
  12. data = BatchOperator.fromDataframe(df_data, schemaStr='user bigint, item bigint, rating double')
  13. sdata = StreamOperator.fromDataframe(df_data, schemaStr='user bigint, item bigint, rating double')
  14. model = UserCfTrainBatchOp()\
  15. .setUserCol("user")\
  16. .setItemCol("item")\
  17. .setRateCol("rating").linkFrom(data);
  18. predictor = UserCfRateRecommStreamOp(model)\
  19. .setUserCol("user")\
  20. .setItemCol("item")\
  21. .setRecommCol("prediction_result");
  22. predictor.linkFrom(sdata).print()
  23. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.recommendation.UserCfTrainBatchOp;
  4. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  5. import com.alibaba.alink.operator.stream.StreamOperator;
  6. import com.alibaba.alink.operator.stream.recommendation.UserCfRateRecommStreamOp;
  7. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  8. import org.junit.Test;
  9. import java.util.Arrays;
  10. import java.util.List;
  11. public class UserCfRateRecommStreamOpTest {
  12. @Test
  13. public void testUserCfRateRecommStreamOp() throws Exception {
  14. List <Row> df_data = Arrays.asList(
  15. Row.of(1, 1, 0.6),
  16. Row.of(2, 2, 0.8),
  17. Row.of(2, 3, 0.6),
  18. Row.of(4, 1, 0.6),
  19. Row.of(4, 2, 0.3),
  20. Row.of(4, 3, 0.4)
  21. );
  22. BatchOperator <?> data = new MemSourceBatchOp(df_data, "user int, item int, rating double");
  23. StreamOperator <?> sdata = new MemSourceStreamOp(df_data, "user int, item int, rating double");
  24. BatchOperator <?> model = new UserCfTrainBatchOp()
  25. .setUserCol("user")
  26. .setItemCol("item")
  27. .setRateCol("rating").linkFrom(data);
  28. StreamOperator <?> predictor = new UserCfRateRecommStreamOp(model)
  29. .setUserCol("user")
  30. .setItemCol("item")
  31. .setRecommCol("prediction_result");
  32. predictor.linkFrom(sdata).print();
  33. StreamOperator.execute();
  34. }
  35. }

运行结果

user item rating prediction_result
4 3 0.4000 0.6000
2 3 0.6000 0.4000
2 2 0.8000 0.3000
4 1 0.6000 0.6000
4 2 0.3000 0.8000
1 1 0.6000 0.6000