Java 类名:com.alibaba.alink.operator.stream.clustering.StreamingKMeansStreamOp
Python 类名:StreamingKMeansStreamOp

功能介绍

流式Kmeans聚类算法,对流数据进行Kmeans聚类。流式KMeans聚类,需要三个输入:

  1. 训练好的批式的KMeans模型
  2. 流式的更新模型的数据
  3. 流式的需要预测的数据

若只有两个输入,那么第一个输入被算法识别为训练好的初始Kmeans模型,第二个输入被同时用作”流式的更新模型的数据”和”流式的需要预测的数据”。
本算法组件会根据2流入的数据在固定的timeinterval内更新模型,这个模型会用来预测3的输入数据。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
halfLife 半生命周期 半生命周期 Integer
predictionCol 预测结果列名 预测结果列名 String
timeInterval 时间间隔 时间间隔,单位秒 Long
predictionClusterCol 预测距离列名 预测距离列名 String
predictionDistanceCol 预测距离列名 预测距离列名 String
reservedCols 算法保留列名 算法保留列 String[] null

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. [0, "0 0 0"],
  6. [1, "0.1,0.1,0.1"],
  7. [2, "0.2,0.2,0.2"],
  8. [3, "9 9 9"],
  9. [4, "9.1 9.1 9.1"],
  10. [5, "9.2 9.2 9.2"]
  11. ])
  12. inOp = BatchOperator.fromDataframe(df, schemaStr='id int, vec string')
  13. stream_data = StreamOperator.fromDataframe(df, schemaStr='id int, vec string')
  14. init_model = KMeansTrainBatchOp()\
  15. .setVectorCol("vec")\
  16. .setK(2)\
  17. .linkFrom(inOp)
  18. streamingkmeans = StreamingKMeansStreamOp(init_model) \
  19. .setTimeInterval(1) \
  20. .setHalfLife(1) \
  21. .setReservedCols(["vec"])
  22. pred = streamingkmeans.linkFrom(stream_data, stream_data)
  23. pred.print()
  24. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.clustering.KMeansTrainBatchOp;
  4. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  5. import com.alibaba.alink.operator.stream.StreamOperator;
  6. import com.alibaba.alink.operator.stream.clustering.StreamingKMeansStreamOp;
  7. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  8. import org.junit.Test;
  9. import java.util.Arrays;
  10. import java.util.List;
  11. public class StreamingKMeansStreamOpTest {
  12. @Test
  13. public void testStreamingKMeansStreamOp() throws Exception {
  14. List <Row> df = Arrays.asList(
  15. Row.of(0, "0 0 0"),
  16. Row.of(1, "0.1,0.1,0.1"),
  17. Row.of(2, "0.2,0.2,0.2"),
  18. Row.of(3, "9 9 9"),
  19. Row.of(4, "9.1 9.1 9.1"),
  20. Row.of(5, "9.2 9.2 9.2")
  21. );
  22. BatchOperator <?> inOp = new MemSourceBatchOp(df, "id int, vec string");
  23. StreamOperator <?> stream_data = new MemSourceStreamOp(df, "id int, vec string");
  24. BatchOperator <?> init_model = new KMeansTrainBatchOp()
  25. .setVectorCol("vec")
  26. .setK(2)
  27. .linkFrom(inOp);
  28. StreamOperator <?> streamingkmeans = new StreamingKMeansStreamOp(init_model)
  29. .setTimeInterval(1L)
  30. .setHalfLife(1)
  31. .setReservedCols("vec");
  32. StreamOperator <?> pred = streamingkmeans.linkFrom(stream_data, stream_data);
  33. pred.print();
  34. StreamOperator.execute();
  35. }
  36. }

运行结果

vec cluster_id
0.2,0.2,0.2 1
0 0 0 1
0.1,0.1,0.1 1
9.2 9.2 9.2 0
9.1 9.1 9.1 0
9 9 9 0