Java 类名:com.alibaba.alink.operator.batch.clustering.GeoKMeansPredictBatchOp
Python 类名:GeoKMeansPredictBatchOp

功能介绍

KMeans 是一个经典的聚类算法。
基本思想是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。
本组件主要针对经纬度距离做Kmeans聚类,包括经纬度KMeans,经纬度KMeans预测, 经纬度KMeans流式预测。

经纬度距离(https://en.wikipedia.org/wiki/Haversine_formula)

经纬度K均值聚类预测 (GeoKMeansPredictBatchOp) - 图1经纬度K均值聚类预测 (GeoKMeansPredictBatchOp) - 图2输入数据中的经度和纬度使用度数表示,得到的距离单位为千米(km)。

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| modelFilePath | 模型的文件路径 | 模型的文件路径 | String | | | null |

| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |

| predictionDistanceCol | 预测距离列名 | 预测距离列名 | String | | | |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. [0, 0],
  6. [8, 8],
  7. [1, 2],
  8. [9, 10],
  9. [3, 1],
  10. [10, 7]
  11. ])
  12. inOp1 = BatchOperator.fromDataframe(df, schemaStr='f0 long, f1 long')
  13. inOp2 = StreamOperator.fromDataframe(df, schemaStr='f0 long, f1 long')
  14. kmeans = GeoKMeansTrainBatchOp()\
  15. .setLongitudeCol("f0")\
  16. .setLatitudeCol("f1")\
  17. .setK(2)\
  18. .linkFrom(inOp1)
  19. kmeans.print()
  20. predict = GeoKMeansPredictBatchOp()\
  21. .setPredictionCol("pred")\
  22. .linkFrom(kmeans, inOp1)
  23. predict.print()
  24. predict = GeoKMeansPredictStreamOp(kmeans)\
  25. .setPredictionCol("pred")\
  26. .linkFrom(inOp2)
  27. predict.print()
  28. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.clustering.KMeans4LongiLatitudePredictBatchOp;
  4. import com.alibaba.alink.operator.batch.clustering.KMeans4LongiLatitudeTrainBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.operator.stream.StreamOperator;
  7. import com.alibaba.alink.operator.stream.clustering.KMeans4LongiLatitudePredictStreamOp;
  8. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  9. import org.junit.Test;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class GeoKMeansPredictBatchOpTest {
  13. @Test
  14. public void testGeoKMeansPredictBatchOp() throws Exception {
  15. List <Row> df = Arrays.asList(
  16. Row.of(0, 0),
  17. Row.of(8, 8),
  18. Row.of(1, 2),
  19. Row.of(9, 10),
  20. Row.of(3, 1),
  21. Row.of(10, 7)
  22. );
  23. BatchOperator <?> inOp1 = new MemSourceBatchOp(df, "f0 int, f1 int");
  24. StreamOperator <?> inOp2 = new MemSourceStreamOp(df, "f0 int, f1 int");
  25. BatchOperator <?> kmeans = new GeoKMeansTrainBatchOp()
  26. .setLongitudeCol("f0")
  27. .setLatitudeCol("f1")
  28. .setK(2)
  29. .linkFrom(inOp1);
  30. kmeans.print();
  31. BatchOperator <?> result = new GeoKMeansPredictBatchOp()
  32. .setPredictionCol("pred")
  33. .linkFrom(kmeans, inOp1);
  34. result.print();
  35. StreamOperator <?> predict = new GeoKMeansPredictStreamOp(kmeans)
  36. .setPredictionCol("pred")
  37. .linkFrom(inOp2);
  38. predict.print();
  39. StreamOperator.execute();
  40. }
  41. }

运行结果

模型数据

| model_id | model_info | | —- | —- |

| 0 | {“vectorCol”:null,”latitudeCol”:””f1””,”longitudeCol”:””f0””,”distanceType”:””HAVERSINE””,”k”:”2”,”vectorSize”:”2”} |

| 1048576 | {“clusterId”:0,”weight”:3.0,”center”:”[8.333333333333332, 9.0]”,”vec”:null} |

| 2097152 | {“clusterId”:1,”weight”:3.0,”center”:”[1.0, 1.3333333333333333]”,”vec”:null} |

预测输出

| f0 | f1 | pred | | —- | —- | —- |

| 0 | 0 | 1 |

| 8 | 8 | 0 |

| 1 | 2 | 1 |

| 9 | 10 | 0 |

| 3 | 1 | 1 |

| 10 | 7 | 0 |