Java 类名:com.alibaba.alink.operator.stream.recommendation.FlattenKObjectStreamOp
Python 类名:FlattenKObjectStreamOp

功能介绍

将流式推荐结果从json序列化格式转为table格式。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
outputCols 输出结果列列名数组 输出结果列列名数组,必选 String[]
selectedCol 选中的列名 计算列对应的列名 String 所选列类型为 [STRING]
outputColTypes 输出结果列列类型数组 输出结果列类型数组 String[] null
reservedCols 算法保留列名 算法保留列名 String[] null

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df_data = pd.DataFrame([
  5. [1,'{"rating":"[0.6]","object":"[1]"}'],
  6. [2,'{"rating":"[0.8,0.6]","object":"[2,3]"}'],
  7. [3,'{"rating":"[0.6,0.3,0.4]","object":"[1,2,3]"}']
  8. ])
  9. data = BatchOperator.fromDataframe(df_data, schemaStr='user bigint, rec string')
  10. sdata = StreamOperator.fromDataframe(df_data, schemaStr='user bigint, rec string')
  11. recList = FlattenKObjectStreamOp()\
  12. .setSelectedCol("rec")\
  13. .setOutputCols(["object", "rating"])\
  14. .setOutputColTypes(["long", "double"])\
  15. .setReservedCols(["user"])\
  16. .linkFrom(sdata).print();
  17. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  4. import com.alibaba.alink.operator.stream.StreamOperator;
  5. import com.alibaba.alink.operator.stream.recommendation.FlattenKObjectStreamOp;
  6. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  7. import org.junit.Test;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. public class FlattenKObjectStreamOpTest {
  11. @Test
  12. public void testFlattenKObjectStreamOp() throws Exception {
  13. List <Row> df_data = Arrays.asList(
  14. Row.of(1, "{\"rating\":\"[0.6]\",\"object\":\"[1]\"}"),
  15. Row.of(2, "{\"rating\":\"[0.8,0.6]\",\"object\":\"[2,3]\"}"),
  16. Row.of(3, "{\"rating\":\"[0.6,0.3,0.4]\",\"object\":\"[1,2,3]\"}")
  17. );
  18. BatchOperator <?> data = new MemSourceBatchOp(df_data, "user int, rec string");
  19. StreamOperator <?> sdata = new MemSourceStreamOp(df_data, "user int, rec string");
  20. StreamOperator <?> recList = new FlattenKObjectStreamOp()
  21. .setSelectedCol("rec")
  22. .setOutputCols("object", "rating")
  23. .setOutputColTypes("long", "double")
  24. .setReservedCols("user")
  25. .linkFrom(sdata).print();
  26. StreamOperator.execute();
  27. }
  28. }

运行结果

user object rating
1 1 0.6000
2 2 0.8000
2 3 0.6000
3 1 0.6000
3 2 0.3000
3 3 0.4000