2.1 转换器


  1. transform(dataset, params=None)
  2. Transforms the input dataset with optional parameters(使用可选参数转换输入数据集。).
  3. Parameters:
  4. **dataset** input dataset, which is an instance of pyspark.sql.DataFrame
  5. **params** an optional param map that overrides embedded params.
  6. Returns: transformed dataset


pyspark.ml.feature.Binarizer(self, threshold=0.0, inputCol=None, outputCol=None)

  1. >>> df = spark.createDataFrame([(0.5,)], ["values"])
  2. >>> binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
  3. >>> binarizer.transform(df).head().features
  4. 0.0
  5. >>> binarizer.setParams(outputCol="freqs").transform(df).head().freqs
  6. 0.0
  7. >>> params = {binarizer.threshold: -0.5, binarizer.outputCol: "vector"}
  8. >>> binarizer.transform(df, params).head().vector
  9. 1.0
  10. >>> binarizerPath = temp_path + "/binarizer"
  11. >>> binarizer.save(binarizerPath)
  12. >>> loadedBinarizer = Binarizer.load(binarizerPath)
  13. >>> loadedBinarizer.getThreshold() == binarizer.getThreshold()
  14. True

pyspark.ml.feature.Bucketizer(self, splits=None, inputCol=None, outputCol=None, handleInvalid=”error”)

  1. >>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)]
  2. >>> df = spark.createDataFrame(values, ["values"])
  3. >>> bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],
  4. ... inputCol="values", outputCol="buckets")
  5. >>> bucketed = bucketizer.setHandleInvalid("keep").transform(df).collect()
  6. >>> len(bucketed)
  7. 6
  8. >>> bucketed[0].buckets
  9. 0.0
  10. >>> bucketed[1].buckets
  11. 0.0
  12. >>> bucketed[2].buckets
  13. 1.0
  14. >>> bucketed[3].buckets
  15. 2.0
  16. >>> bucketizer.setParams(outputCol="b").transform(df).head().b
  17. 0.0
  18. >>> bucketizerPath = temp_path + "/bucketizer"
  19. >>> bucketizer.save(bucketizerPath)
  20. >>> loadedBucketizer = Bucketizer.load(bucketizerPath)
  21. >>> loadedBucketizer.getSplits() == bucketizer.getSplits()
  22. True
  23. >>> bucketed = bucketizer.setHandleInvalid("skip").transform(df).collect()
  24. >>> len(bucketed)
  25. 4

pyspark.ml.feature.ChiSqSelector(self, numTopFeatures=50, featuresCol=”features”, outputCol=None, labelCol=”label”, selectorType=”numTopFeatures”, percentile=0.1, fpr=0.05, fdr=0.05, fwe=0.05)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame(
  3. ... [(Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
  4. ... (Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),
  5. ... (Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],
  6. ... ["features", "label"])
  7. >>> selector = ChiSqSelector(numTopFeatures=1, outputCol="selectedFeatures")
  8. >>> model = selector.fit(df)
  9. >>> model.transform(df).head().selectedFeatures
  10. DenseVector([18.0])
  11. >>> model.selectedFeatures
  12. [2]
  13. >>> chiSqSelectorPath = temp_path + "/chi-sq-selector"
  14. >>> selector.save(chiSqSelectorPath)
  15. >>> loadedSelector = ChiSqSelector.load(chiSqSelectorPath)
  16. >>> loadedSelector.getNumTopFeatures() == selector.getNumTopFeatures()
  17. True
  18. >>> modelPath = temp_path + "/chi-sq-selector-model"
  19. >>> model.save(modelPath)
  20. >>> loadedModel = ChiSqSelectorModel.load(modelPath)
  21. >>> loadedModel.selectedFeatures == model.selectedFeatures
  22. True

pyspark.ml.feature.CountVectorizer(self, minTF=1.0, minDF=1.0, vocabSize=1 << 18, binary=False, inputCol=None, outputCol=None)

  1. >>> df = spark.createDataFrame(
  2. ... [(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])],
  3. ... ["label", "raw"])
  4. >>> cv = CountVectorizer(inputCol="raw", outputCol="vectors")
  5. >>> model = cv.fit(df)
  6. >>> model.transform(df).show(truncate=False)
  7. +-----+---------------+-------------------------+
  8. |label|raw |vectors |
  9. +-----+---------------+-------------------------+
  10. |0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
  11. |1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
  12. +-----+---------------+-------------------------+
  13. ...
  14. >>> sorted(model.vocabulary) == ['a', 'b', 'c']
  15. True
  16. >>> countVectorizerPath = temp_path + "/count-vectorizer"
  17. >>> cv.save(countVectorizerPath)
  18. >>> loadedCv = CountVectorizer.load(countVectorizerPath)
  19. >>> loadedCv.getMinDF() == cv.getMinDF()
  20. True
  21. >>> loadedCv.getMinTF() == cv.getMinTF()
  22. True
  23. >>> loadedCv.getVocabSize() == cv.getVocabSize()
  24. True
  25. >>> modelPath = temp_path + "/count-vectorizer-model"
  26. >>> model.save(modelPath)
  27. >>> loadedModel = CountVectorizerModel.load(modelPath)
  28. >>> loadedModel.vocabulary == model.vocabulary
  29. True

pyspark.ml.feature.Imputer(args, *kwargs)
用于完成缺失值的插补估计器,使用缺失值所在列的平均值或中值。 输入列应该是DoubleType或FloatType。 目前的Imputer不支持分类特征,可能会为分类特征创建不正确的值。
请注意,平均值/中值是在过滤出缺失值之后计算的。 输入列中的所有Null值都被视为缺失,所以也被归类。 为了计算中位数,使用pyspark.sql.DataFrame.approxQuantile(),相对误差为0.001。

  1. >>> df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0),
  2. ... (4.0, 4.0), (5.0, 5.0)], ["a", "b"])
  3. >>> imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
  4. >>> model = imputer.fit(df)
  5. >>> model.surrogateDF.show()
  6. +---+---+
  7. | a| b|
  8. +---+---+
  9. |3.0|4.0|
  10. +---+---+
  11. ...
  12. >>> model.transform(df).show()
  13. +---+---+-----+-----+
  14. | a| b|out_a|out_b|
  15. +---+---+-----+-----+
  16. |1.0|NaN| 1.0| 4.0|
  17. |2.0|NaN| 2.0| 4.0|
  18. |NaN|3.0| 3.0| 3.0|
  19. ...
  20. >>> imputer.setStrategy("median").setMissingValue(1.0).fit(df).transform(df).show()
  21. +---+---+-----+-----+
  22. | a| b|out_a|out_b|
  23. +---+---+-----+-----+
  24. |1.0|NaN| 4.0| NaN|
  25. ...
  26. >>> imputerPath = temp_path + "/imputer"
  27. >>> imputer.save(imputerPath)
  28. >>> loadedImputer = Imputer.load(imputerPath)
  29. >>> loadedImputer.getStrategy() == imputer.getStrategy()
  30. True
  31. >>> loadedImputer.getMissingValue()
  32. 1.0
  33. >>> modelPath = temp_path + "/imputer-model"
  34. >>> model.save(modelPath)
  35. >>> loadedModel = ImputerModel.load(modelPath)
  36. >>> loadedModel.transform(df).head().out_a == model.transform(df).head().out_a
  37. True

pyspark.ml.feature.MaxAbsScaler(self, inputCol=None, outputCol=None)
通过分割每个特征中的最大绝对值来单独重新缩放每个特征以范围[-1,1]。 它不会移动/居中数据,因此不会破坏任何稀疏性。

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([(Vectors.dense([1.0]),), (Vectors.dense([2.0]),)], ["a"])
  3. >>> maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
  4. >>> model = maScaler.fit(df)
  5. >>> model.transform(df).show()
  6. +-----+------+
  7. | a|scaled|
  8. +-----+------+
  9. |[1.0]| [0.5]|
  10. |[2.0]| [1.0]|
  11. +-----+------+
  12. ...
  13. >>> scalerPath = temp_path + "/max-abs-scaler"
  14. >>> maScaler.save(scalerPath)
  15. >>> loadedMAScaler = MaxAbsScaler.load(scalerPath)
  16. >>> loadedMAScaler.getInputCol() == maScaler.getInputCol()
  17. True
  18. >>> loadedMAScaler.getOutputCol() == maScaler.getOutputCol()
  19. True
  20. >>> modelPath = temp_path + "/max-abs-scaler-model"
  21. >>> model.save(modelPath)
  22. >>> loadedModel = MaxAbsScalerModel.load(modelPath)
  23. >>> loadedModel.maxAbs == model.maxAbs
  24. True

pyspark.ml.feature.MinMaxScaler(self, min=0.0, max=1.0, inputCol=None, outputCol=None)
使用列汇总统计信息,将每个特征单独重新标定为一个常用范围[min,max],这也称为最小 - 最大标准化或重新标定(注意由于零值可能会被转换为非零值,因此即使对于稀疏输入,转换器的输出也将是DenseVector)。 特征E的重新缩放的值被计算为,数据将被缩放到[0.0,1.0]范围内。
Rescaled(e_i) = (e_i - E_min) / (E_max - E_min) (max - min) + min
For the case E_max == E_min, Rescaled(e_i) = 0.5
(max + min)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])
  3. >>> mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
  4. >>> model = mmScaler.fit(df)
  5. >>> model.originalMin
  6. DenseVector([0.0])
  7. >>> model.originalMax
  8. DenseVector([2.0])
  9. >>> model.transform(df).show()
  10. +-----+------+
  11. | a|scaled|
  12. +-----+------+
  13. |[0.0]| [0.0]|
  14. |[2.0]| [1.0]|
  15. +-----+------+
  16. ...
  17. >>> minMaxScalerPath = temp_path + "/min-max-scaler"
  18. >>> mmScaler.save(minMaxScalerPath)
  19. >>> loadedMMScaler = MinMaxScaler.load(minMaxScalerPath)
  20. >>> loadedMMScaler.getMin() == mmScaler.getMin()
  21. True
  22. >>> loadedMMScaler.getMax() == mmScaler.getMax()
  23. True
  24. >>> modelPath = temp_path + "/min-max-scaler-model"
  25. >>> model.save(modelPath)
  26. >>> loadedModel = MinMaxScalerModel.load(modelPath)
  27. >>> loadedModel.originalMin == model.originalMin
  28. True
  29. >>> loadedModel.originalMax == model.originalMax
  30. True

pyspark.ml.feature.Normalizer(self, p=2.0, inputCol=None, outputCol=None)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
  3. >>> df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)], ["dense", "sparse"])
  4. >>> normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
  5. >>> normalizer.transform(df).head().features
  6. DenseVector([0.6, -0.8])
  7. >>> normalizer.setParams(inputCol="sparse", outputCol="freqs").transform(df).head().freqs
  8. SparseVector(4, {1: 0.8, 3: 0.6})
  9. >>> params = {normalizer.p: 1.0, normalizer.inputCol: "dense", normalizer.outputCol: "vector"}
  10. >>> normalizer.transform(df, params).head().vector
  11. DenseVector([0.4286, -0.5714])
  12. >>> normalizerPath = temp_path + "/normalizer"
  13. >>> normalizer.save(normalizerPath)
  14. >>> loadedNormalizer = Normalizer.load(normalizerPath)
  15. >>> loadedNormalizer.getP() == normalizer.getP()
  16. True

pyspark.ml.feature.OneHotEncoder(self, dropLast=True, inputCol=None, outputCol=None)
一个热门的编码器,将一列类别索引映射到一列二进制向量,每行至多有一个单值,表示输入类别索引。 例如,对于5个类别,输入值2.0将映射到[0.0,0.0,1.0,0.0]的输出向量。 最后一个类别默认不包含(可通过dropLast进行配置),因为它使向量条目总和为1,因此线性相关。 所以一个4.0的输入值映射到[0.0,0.0,0.0,0.0]。这与scikit-learn的OneHotEncoder不同,后者保留所有类别。 输出向量是稀疏的。

  1. >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
  2. >>> model = stringIndexer.fit(stringIndDf)
  3. >>> td = model.transform(stringIndDf)
  4. >>> encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
  5. >>> encoder.transform(td).head().features
  6. SparseVector(2, {0: 1.0})
  7. >>> encoder.setParams(outputCol="freqs").transform(td).head().freqs
  8. SparseVector(2, {0: 1.0})
  9. >>> params = {encoder.dropLast: False, encoder.outputCol: "test"}
  10. >>> encoder.transform(td, params).head().test
  11. SparseVector(3, {0: 1.0})
  12. >>> onehotEncoderPath = temp_path + "/onehot-encoder"
  13. >>> encoder.save(onehotEncoderPath)
  14. >>> loadedEncoder = OneHotEncoder.load(onehotEncoderPath)
  15. >>> loadedEncoder.getDropLast() == encoder.getDropLast()
  16. True

pyspark.ml.feature.PCA(self, k=None, inputCol=None, outputCol=None)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
  3. ... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
  4. ... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
  5. >>> df = spark.createDataFrame(data,["features"])
  6. >>> pca = PCA(k=2, inputCol="features", outputCol="pca_features")
  7. >>> model = pca.fit(df)
  8. >>> model.transform(df).collect()[0].pca_features
  9. DenseVector([1.648..., -4.013...])
  10. >>> model.explainedVariance
  11. DenseVector([0.794..., 0.205...])
  12. >>> pcaPath = temp_path + "/pca"
  13. >>> pca.save(pcaPath)
  14. >>> loadedPca = PCA.load(pcaPath)
  15. >>> loadedPca.getK() == pca.getK()
  16. True
  17. >>> modelPath = temp_path + "/pca-model"
  18. >>> model.save(modelPath)
  19. >>> loadedModel = PCAModel.load(modelPath)
  20. >>> loadedModel.pc == model.pc
  21. True
  22. >>> loadedModel.explainedVariance == model.explainedVariance
  23. True

pyspark.ml.feature.QuantileDiscretizer(self, numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, handleInvalid=”error”)

  1. >>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)]
  2. >>> df = spark.createDataFrame(values, ["values"])
  3. >>> qds = QuantileDiscretizer(numBuckets=2,
  4. ... inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
  5. >>> qds.getRelativeError()
  6. 0.01
  7. >>> bucketizer = qds.fit(df)
  8. >>> qds.setHandleInvalid("keep").fit(df).transform(df).count()
  9. 6
  10. >>> qds.setHandleInvalid("skip").fit(df).transform(df).count()
  11. 4
  12. >>> splits = bucketizer.getSplits()
  13. >>> splits[0]
  14. -inf
  15. >>> print("%2.1f" % round(splits[1], 1))
  16. 0.4
  17. >>> bucketed = bucketizer.transform(df).head()
  18. >>> bucketed.buckets
  19. 0.0
  20. >>> quantileDiscretizerPath = temp_path + "/quantile-discretizer"
  21. >>> qds.save(quantileDiscretizerPath)
  22. >>> loadedQds = QuantileDiscretizer.load(quantileDiscretizerPath)
  23. >>> loadedQds.getNumBuckets() == qds.getNumBuckets()
  24. True

pyspark.ml.feature.StandardScaler(self, withMean=False, withStd=True, inputCol=None, outputCol=None)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])
  3. >>> standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
  4. >>> model = standardScaler.fit(df)
  5. >>> model.mean
  6. DenseVector([1.0])
  7. >>> model.std
  8. DenseVector([1.4142])
  9. >>> model.transform(df).collect()[1].scaled
  10. DenseVector([1.4142])
  11. >>> standardScalerPath = temp_path + "/standard-scaler"
  12. >>> standardScaler.save(standardScalerPath)
  13. >>> loadedStandardScaler = StandardScaler.load(standardScalerPath)
  14. >>> loadedStandardScaler.getWithMean() == standardScaler.getWithMean()
  15. True
  16. >>> loadedStandardScaler.getWithStd() == standardScaler.getWithStd()
  17. True
  18. >>> modelPath = temp_path + "/standard-scaler-model"
  19. >>> model.save(modelPath)
  20. >>> loadedModel = StandardScalerModel.load(modelPath)
  21. >>> loadedModel.std == model.std
  22. True
  23. >>> loadedModel.mean == model.mean
  24. True

pyspark.ml.feature.VectorAssembler(self, inputCols=None, outputCol=None)

  1. from pyspark.ml import feature as ft
  2. df = spark.createDataFrame([(12, 10, 3), (1, 4, 2)],['a', 'b', 'c'])
  3. ft.VectorAssembler(inputCols=['a', 'b', 'c'],outputCol='features')\
  4. .transform(df) \
  5. .select('features')\
  6. .collect()
  7. 输出结果:
  8. [Row(features=DenseVector([12.0, 10.0, 3.0])),
  9. Row(features=DenseVector([1.0, 4.0, 2.0]))]
  1. >>> df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
  2. >>> vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
  3. >>> vecAssembler.transform(df).head().features
  4. DenseVector([1.0, 0.0, 3.0])
  5. >>> vecAssembler.setParams(outputCol="freqs").transform(df).head().freqs
  6. DenseVector([1.0, 0.0, 3.0])
  7. >>> params = {vecAssembler.inputCols: ["b", "a"], vecAssembler.outputCol: "vector"}
  8. >>> vecAssembler.transform(df, params).head().vector
  9. DenseVector([0.0, 1.0])
  10. >>> vectorAssemblerPath = temp_path + "/vector-assembler"
  11. >>> vecAssembler.save(vectorAssemblerPath)
  12. >>> loadedAssembler = VectorAssembler.load(vectorAssemblerPath)
  13. >>> loadedAssembler.transform(df).head().freqs == vecAssembler.transform(df).head().freqs
  14. True

pyspark.ml.feature.VectorIndexer(self, maxCategories=20, inputCol=None, outputCol=None) 类别列生成索引向量
pyspark.ml.feature.VectorSlicer(self, inputCol=None, outputCol=None, indices=None, names=None) 作用于特征向量,给定一个索引列表,从特征向量中提取值。

2.2 评估器

评估器是需要评估的统计模型,对所观测对象做预测或分类。如果从抽象的评估器类派生,新模型必须实现.fit()方法,该方法用给出的在DataFrame中找到的数据和某些默认或自定义的参数来拟合模型。在PySpark 中,由很多评估器可用,本文以Spark2.2.0中提供的模型。



(1)LogisticRegression :逻辑回归,支持多项逻辑(softmax)和二项逻辑回归。。

pyspark.ml.classification.LogisticRegression(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol=”probability”, rawPredictionCol=”rawPrediction”, standardization=True, weightCol=None, aggregationDepth=2, family=”auto”)

setParams(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol=”probability”, rawPredictionCol=”rawPrediction”, standardization=True, weightCol=None, aggregationDepth=2, family=”auto”)

  1. >>> from pyspark.sql import Row
  2. >>> from pyspark.ml.linalg import Vectors
  3. >>> bdf = sc.parallelize([
  4. ... Row(label=1.0, weight=1.0, features=Vectors.dense(0.0, 5.0)),
  5. ... Row(label=0.0, weight=2.0, features=Vectors.dense(1.0, 2.0)),
  6. ... Row(label=1.0, weight=3.0, features=Vectors.dense(2.0, 1.0)),
  7. ... Row(label=0.0, weight=4.0, features=Vectors.dense(3.0, 3.0))]).toDF()
  8. >>> blor = LogisticRegression(regParam=0.01, weightCol="weight")
  9. >>> blorModel = blor.fit(bdf)
  10. >>> blorModel.coefficients
  11. DenseVector([-1.080..., -0.646...])
  12. >>> blorModel.intercept
  13. 3.112...
  14. >>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
  15. >>> mdf = spark.read.format("libsvm").load(data_path)
  16. >>> mlor = LogisticRegression(regParam=0.1, elasticNetParam=1.0, family="multinomial")
  17. >>> mlorModel = mlor.fit(mdf)
  18. >>> mlorModel.coefficientMatrix
  19. SparseMatrix(3, 4, [0, 1, 2, 3], [3, 2, 1], [1.87..., -2.75..., -0.50...], 1)
  20. >>> mlorModel.interceptVector
  21. DenseVector([0.04..., -0.42..., 0.37...])
  22. >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 1.0))]).toDF()
  23. >>> result = blorModel.transform(test0).head()
  24. >>> result.prediction
  25. 1.0
  26. >>> result.probability
  27. DenseVector([0.02..., 0.97...])
  28. >>> result.rawPrediction
  29. DenseVector([-3.54..., 3.54...])
  30. >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
  31. >>> blorModel.transform(test1).head().prediction
  32. 1.0
  33. >>> blor.setParams("vector")
  34. Traceback (most recent call last):
  35. ...
  36. TypeError: Method setParams forces keyword arguments.
  37. >>> lr_path = temp_path + "/lr"
  38. >>> blor.save(lr_path)
  39. >>> lr2 = LogisticRegression.load(lr_path)
  40. >>> lr2.getRegParam()
  41. 0.01
  42. >>> model_path = temp_path + "/lr_model"
  43. >>> blorModel.save(model_path)
  44. >>> model2 = LogisticRegressionModel.load(model_path)
  45. >>> blorModel.coefficients[0] == model2.coefficients[0]
  46. True
  47. >>> blorModel.intercept == model2.intercept
  48. True


pyspark.ml.classification.DecisionTreeClassifier(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, probabilityCol=”probability”, rawPredictionCol=”rawPrediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity=”gini”, seed=None)


  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> from pyspark.ml.feature import StringIndexer
  3. >>> df = spark.createDataFrame([
  4. ... (1.0, Vectors.dense(1.0)),
  5. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  6. >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
  7. >>> si_model = stringIndexer.fit(df)
  8. >>> td = si_model.transform(df)
  9. >>> dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")
  10. >>> model = dt.fit(td)
  11. >>> model.numNodes
  12. 3
  13. >>> model.depth
  14. 1
  15. >>> model.featureImportances
  16. SparseVector(1, {0: 1.0})
  17. >>> model.numFeatures
  18. 1
  19. >>> model.numClasses
  20. 2
  21. >>> print(model.toDebugString)
  22. DecisionTreeClassificationModel (uid=...) of depth 1 with 3 nodes...
  23. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  24. >>> result = model.transform(test0).head()
  25. >>> result.prediction
  26. 0.0
  27. >>> result.probability
  28. DenseVector([1.0, 0.0])
  29. >>> result.rawPrediction
  30. DenseVector([1.0, 0.0])
  31. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  32. >>> model.transform(test1).head().prediction
  33. 1.0
  1. >>> dtc_path = temp_path + "/dtc"
  2. >>> dt.save(dtc_path)
  3. >>> dt2 = DecisionTreeClassifier.load(dtc_path)
  4. >>> dt2.getMaxDepth()
  5. 2
  6. >>> model_path = temp_path + "/dtc_model"
  7. >>> model.save(model_path)
  8. >>> model2 = DecisionTreeClassificationModel.load(model_path)
  9. >>> model.featureImportances == model2.featureImportances
  10. True

注意由于相关的预测变量,单个决策树的特征重要性可能具有较高的方差。 考虑使用RandomForestClassifier来确定特征的重要性。


用于分类的梯度提升决策树模型。该模型属于集成模型(Ensemble methods)家族。集成模型结合多个弱预测模型而形成一个强健的模型。

pyspark.ml.classification.GBTClassifier(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType=”logistic”, maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0)

  1. >>> from numpy import allclose
  2. >>> from pyspark.ml.linalg import Vectors
  3. >>> from pyspark.ml.feature import StringIndexer
  4. >>> df = spark.createDataFrame([
  5. ... (1.0, Vectors.dense(1.0)),
  6. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  7. >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
  8. >>> si_model = stringIndexer.fit(df)
  9. >>> td = si_model.transform(df)
  10. >>> gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
  11. >>> model = gbt.fit(td)
  12. >>> model.featureImportances
  13. SparseVector(1, {0: 1.0})
  14. >>> allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
  15. True
  16. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  17. >>> model.transform(test0).head().prediction
  18. 0.0
  19. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  20. >>> model.transform(test1).head().prediction
  21. 1.0
  22. >>> model.totalNumNodes
  23. 15
  24. >>> print(model.toDebugString)
  25. GBTClassificationModel (uid=...)...with 5 trees...
  26. >>> gbtc_path = temp_path + "gbtc"
  27. >>> gbt.save(gbtc_path)
  28. >>> gbt2 = GBTClassifier.load(gbtc_path)
  29. >>> gbt2.getMaxDepth()
  30. 2
  31. >>> model_path = temp_path + "gbtc_model"
  32. >>> model.save(model_path)
  33. >>> model2 = GBTClassificationModel.load(model_path)
  34. >>> model.featureImportances == model2.featureImportances
  35. True
  36. >>> model.treeWeights == model2.treeWeights
  37. True
  38. >>> model.trees
  39. [DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]

(4)RandomForestClassifier:随机森林学习算法的分类。 它支持二进制和多类标签,以及连续和分类功能。


pyspark.ml.classification.RandomForestClassifier(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, probabilityCol=”probability”, rawPredictionCol=”rawPrediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity=”gini”, numTrees=20, featureSubsetStrategy=”auto”, seed=None, subsamplingRate=1.0)

  1. >>> import numpy
  2. >>> from numpy import allclose
  3. >>> from pyspark.ml.linalg import Vectors
  4. >>> from pyspark.ml.feature import StringIndexer
  5. >>> df = spark.createDataFrame([
  6. ... (1.0, Vectors.dense(1.0)),
  7. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  8. >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
  9. >>> si_model = stringIndexer.fit(df)
  10. >>> td = si_model.transform(df)
  11. >>> rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
  12. >>> model = rf.fit(td)
  13. >>> model.featureImportances
  14. SparseVector(1, {0: 1.0})
  15. >>> allclose(model.treeWeights, [1.0, 1.0, 1.0])
  16. True
  17. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  18. >>> result = model.transform(test0).head()
  19. >>> result.prediction
  20. 0.0
  21. >>> numpy.argmax(result.probability)
  22. 0
  23. >>> numpy.argmax(result.rawPrediction)
  24. 0
  25. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  26. >>> model.transform(test1).head().prediction
  27. 1.0
  28. >>> model.trees
  29. [DecisionTreeClassificationModel (uid=...) of depth..., DecisionTreeClassificationModel...]
  30. >>> rfc_path = temp_path + "/rfc"
  31. >>> rf.save(rfc_path)
  32. >>> rf2 = RandomForestClassifier.load(rfc_path)
  33. >>> rf2.getNumTrees()
  34. 3
  35. >>> model_path = temp_path + "/rfc_model"
  36. >>> model.save(model_path)
  37. >>> model2 = RandomForestClassificationModel.load(model_path)
  38. >>> model.featureImportances == model2.featureImportances
  39. True

pyspark.ml.classification.LinearSVC(args, *kwargs) 这个二元分类器使用OWLQN优化器来优化the Hinge Loss,目前只支持L2正则化。


pyspark.ml.classification.OneVsRest(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, classifier=None)

  1. >>> from pyspark.sql import Row
  2. >>> from pyspark.ml.linalg import Vectors
  3. >>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
  4. >>> df = spark.read.format("libsvm").load(data_path)
  5. >>> lr = LogisticRegression(regParam=0.01)
  6. >>> ovr = OneVsRest(classifier=lr)
  7. >>> model = ovr.fit(df)
  8. >>> model.models[0].coefficients
  9. DenseVector([0.5..., -1.0..., 3.4..., 4.2...])
  10. >>> model.models[1].coefficients
  11. DenseVector([-2.1..., 3.1..., -2.6..., -2.3...])
  12. >>> model.models[2].coefficients
  13. DenseVector([0.3..., -3.4..., 1.0..., -1.1...])
  14. >>> [x.intercept for x in model.models]
  15. [-2.7..., -2.5..., -1.3...]
  16. >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0, 1.0, 1.0))]).toDF()
  17. >>> model.transform(test0).head().prediction
  18. 0.0
  19. >>> test1 = sc.parallelize([Row(features=Vectors.sparse(4, [0], [1.0]))]).toDF()
  20. >>> model.transform(test1).head().prediction
  21. 2.0
  22. >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4, 0.3, 0.2))]).toDF()
  23. >>> model.transform(test2).head().prediction
  24. 0.0
  25. >>> model_path = temp_path + "/ovr_model"
  26. >>> model.save(model_path)
  27. >>> model2 = OneVsRestModel.load(model_path)
  28. >>> model2.transform(test0).head().prediction
  29. 0.0



pyspark.ml.regression.DecisionTreeRegressor(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity=”variance”, seed=None, varianceCol=None)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([
  3. ... (1.0, Vectors.dense(1.0)),
  4. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  5. >>> dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
  6. >>> model = dt.fit(df)
  7. >>> model.depth
  8. 1
  9. >>> model.numNodes
  10. 3
  11. >>> model.featureImportances
  12. SparseVector(1, {0: 1.0})
  13. >>> model.numFeatures
  14. 1
  15. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  16. >>> model.transform(test0).head().prediction
  17. 0.0
  18. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  19. >>> model.transform(test1).head().prediction
  20. 1.0
  21. >>> dtr_path = temp_path + "/dtr"
  22. >>> dt.save(dtr_path)
  23. >>> dt2 = DecisionTreeRegressor.load(dtr_path)
  24. >>> dt2.getMaxDepth()
  25. 2
  26. >>> model_path = temp_path + "/dtr_model"
  27. >>> model.save(model_path)
  28. >>> model2 = DecisionTreeRegressionModel.load(model_path)
  29. >>> model.numNodes == model2.numNodes
  30. True
  31. >>> model.depth == model2.depth
  32. True
  33. >>> model.transform(test1).head().variance
  34. 0.0


pyspark.ml.regression.GBTRegressor(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType=”squared”, maxIter=20, stepSize=0.1, seed=None, impurity=”variance”)

  1. >>> from numpy import allclose
  2. >>> from pyspark.ml.linalg import Vectors
  3. >>> df = spark.createDataFrame([
  4. ... (1.0, Vectors.dense(1.0)),
  5. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  6. >>> gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
  7. >>> print(gbt.getImpurity())
  8. variance
  9. >>> model = gbt.fit(df)
  10. >>> model.featureImportances
  11. SparseVector(1, {0: 1.0})
  12. >>> model.numFeatures
  13. 1
  14. >>> allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
  15. True
  16. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  17. >>> model.transform(test0).head().prediction
  18. 0.0
  19. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  20. >>> model.transform(test1).head().prediction
  21. 1.0
  22. >>> gbtr_path = temp_path + "gbtr"
  23. >>> gbt.save(gbtr_path)
  24. >>> gbt2 = GBTRegressor.load(gbtr_path)
  25. >>> gbt2.getMaxDepth()
  26. 2
  27. >>> model_path = temp_path + "gbtr_model"
  28. >>> model.save(model_path)
  29. >>> model2 = GBTRegressionModel.load(model_path)
  30. >>> model.featureImportances == model2.featureImportances
  31. True
  32. >>> model.treeWeights == model2.treeWeights
  33. True
  34. >>> model.trees
  35. [DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]


拟合通过给出线性预测器(链接函数)的符号描述和错误分布(族)的描述来指定的广义线性模型。 它支持“gaussian”, “binomial”, “poisson”, “gamma” and “tweedie” 。 下面列出了每个系列的有效链接功能。 每个家庭的第一个链接功能是默认的。

“gaussian” -> “identity”, “log”, “inverse”
“binomial” -> “logit”, “probit”, “cloglog”
“poisson” -> “log”, “identity”, “sqrt”
“gamma” -> “inverse”, “identity”, “log”
“tweedie” -> power link function specified through “linkPower”.
The default link power in the tweedie family is 1 - variancePower.

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([
  3. ... (1.0, Vectors.dense(1.0), 1.0),
  4. ... (0.0, Vectors.sparse(1, [], []), 0.0)], ["label", "features", "censor"])
  5. >>> aftsr = AFTSurvivalRegression()
  6. >>> model = aftsr.fit(df)
  7. >>> model.predict(Vectors.dense(6.3))
  8. 1.0
  9. >>> model.predictQuantiles(Vectors.dense(6.3))
  10. DenseVector([0.0101, 0.0513, 0.1054, 0.2877, 0.6931, 1.3863, 2.3026, 2.9957, 4.6052])
  11. >>> model.transform(df).show()
  12. +-----+---------+------+----------+
  13. |label| features|censor|prediction|
  14. +-----+---------+------+----------+
  15. | 1.0| [1.0]| 1.0| 1.0|
  16. | 0.0|(1,[],[])| 0.0| 1.0|
  17. +-----+---------+------+----------+
  18. ...
  19. >>> aftsr_path = temp_path + "/aftsr"
  20. >>> aftsr.save(aftsr_path)
  21. >>> aftsr2 = AFTSurvivalRegression.load(aftsr_path)
  22. >>> aftsr2.getMaxIter()
  23. 100
  24. >>> model_path = temp_path + "/aftsr_model"
  25. >>> model.save(model_path)
  26. >>> model2 = AFTSurvivalRegressionModel.load(model_path)
  27. >>> model.coefficients == model2.coefficients
  28. True
  29. >>> model.intercept == model2.intercept
  30. True
  31. >>> model.scale == model2.scale
  32. True

This supports multiple types of regularization:

none (a.k.a. ordinary least squares)
L2 (ridge regression)
L1 (Lasso)
L2 + L1 (elastic net)

pyspark.ml.regression.LinearRegression(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver=”auto”, weightCol=None, aggregationDepth=2)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> df = spark.createDataFrame([
  3. ... (1.0, 2.0, Vectors.dense(1.0)),
  4. ... (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
  5. >>> lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
  6. >>> model = lr.fit(df)
  7. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  8. >>> abs(model.transform(test0).head().prediction - (-1.0)) < 0.001
  9. True
  10. >>> abs(model.coefficients[0] - 1.0) < 0.001
  11. True
  12. >>> abs(model.intercept - 0.0) < 0.001
  13. True
  14. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  15. >>> abs(model.transform(test1).head().prediction - 1.0) < 0.001
  16. True
  17. >>> lr.setParams("vector")
  18. Traceback (most recent call last):
  19. ...
  20. TypeError: Method setParams forces keyword arguments.
  21. >>> lr_path = temp_path + "/lr"
  22. >>> lr.save(lr_path)
  23. >>> lr2 = LinearRegression.load(lr_path)
  24. >>> lr2.getMaxIter()
  25. 5
  26. >>> model_path = temp_path + "/lr_model"
  27. >>> model.save(model_path)
  28. >>> model2 = LinearRegressionModel.load(model_path)
  29. >>> model.coefficients[0] == model2.coefficients[0]
  30. True
  31. >>> model.intercept == model2.intercept
  32. True
  33. >>> model.numFeatures
  34. 1


pyspark.ml.regression.RandomForestRegressor(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity=”variance”, subsamplingRate=1.0, seed=None, numTrees=20, featureSubsetStrategy=”auto”)

  1. >>> from numpy import allclose
  2. >>> from pyspark.ml.linalg import Vectors
  3. >>> df = spark.createDataFrame([
  4. ... (1.0, Vectors.dense(1.0)),
  5. ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
  6. >>> rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42)
  7. >>> model = rf.fit(df)
  8. >>> model.featureImportances
  9. SparseVector(1, {0: 1.0})
  10. >>> allclose(model.treeWeights, [1.0, 1.0])
  11. True
  12. >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
  13. >>> model.transform(test0).head().prediction
  14. 0.0
  15. >>> model.numFeatures
  16. 1
  17. >>> model.trees
  18. [DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
  19. >>> model.getNumTrees
  20. 2
  21. >>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
  22. >>> model.transform(test1).head().prediction
  23. 0.5
  24. >>> rfr_path = temp_path + "/rfr"
  25. >>> rf.save(rfr_path)
  26. >>> rf2 = RandomForestRegressor.load(rfr_path)
  27. >>> rf2.getNumTrees()
  28. 2
  29. >>> model_path = temp_path + "/rfr_model"
  30. >>> model.save(model_path)
  31. >>> model2 = RandomForestRegressionModel.load(model_path)
  32. >>> model.featureImportances == model2.featureImportances
  33. True


用k-means 进行K均值聚类。

pyspark.ml.clustering.KMeans(self, featuresCol=”features”, predictionCol=”prediction”, k=2, initMode=”k-means||”, initSteps=2, tol=1e-4, maxIter=20, seed=None)

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
  3. ... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
  4. >>> df = spark.createDataFrame(data, ["features"])
  5. >>> kmeans = KMeans(k=2, seed=1)
  6. >>> model = kmeans.fit(df)
  7. >>> centers = model.clusterCenters()
  8. >>> len(centers)
  9. 2
  10. >>> model.computeCost(df)
  11. 2.000...
  12. >>> transformed = model.transform(df).select("features", "prediction")
  13. >>> rows = transformed.collect()
  14. >>> rows[0].prediction == rows[1].prediction
  15. True
  16. >>> rows[2].prediction == rows[3].prediction
  17. True
  18. >>> model.hasSummary
  19. True
  20. >>> summary = model.summary
  21. >>> summary.k
  22. 2
  23. >>> summary.clusterSizes
  24. [2, 2]
  25. >>> kmeans_path = temp_path + "/kmeans"
  26. >>> kmeans.save(kmeans_path)
  27. >>> kmeans2 = KMeans.load(kmeans_path)
  28. >>> kmeans2.getK()
  29. 2
  30. >>> model_path = temp_path + "/kmeans_model"
  31. >>> model.save(model_path)
  32. >>> model2 = KMeansModel.load(model_path)
  33. >>> model2.hasSummary
  34. False
  35. >>> model.clusterCenters()[0] == model2.clusterCenters()[0]
  36. array([ True, True], dtype=bool)
  37. >>> model.clusterCenters()[1] == model2.clusterCenters()[1]
  38. array([ True, True], dtype=bool)

pyspark.ml.clustering.GaussianMixture(self, featuresCol=”features”, predictionCol=”prediction”, k=2, probabilityCol=”probability”, tol=0.01, maxIter=100, seed=None)

GaussianMixture聚类。 该类对多元高斯混合模型(GMM)执行期望最大化。 GMM表示独立高斯分布的合成分布,其具有指定每个对合成的贡献的“混合”权重。

给定一组采样点,该类将最大化k个高斯混合的对数似然性,迭代直到对数似然度变化小于收敛度,或者直到达到最大迭代次数为止。 虽然这个过程一般保证收敛,但不能保证找到全局最优。

注意对于高维数据(具有许多特征),该算法可能表现不佳。 这是由于高维数据(a)使聚类困难(基于统计/理论参数)和(b)高斯分布的数值问题。

  1. >>> from pyspark.ml.linalg import Vectors
  2. >>> data = [(Vectors.dense([-0.1, -0.05 ]),),
  3. ... (Vectors.dense([-0.01, -0.1]),),
  4. ... (Vectors.dense([0.9, 0.8]),),
  5. ... (Vectors.dense([0.75, 0.935]),),
  6. ... (Vectors.dense([-0.83, -0.68]),),
  7. ... (Vectors.dense([-0.91, -0.76]),)]
  8. >>> df = spark.createDataFrame(data, ["features"])
  9. >>> gm = GaussianMixture(k=3, tol=0.0001,
  10. ... maxIter=10, seed=10)
  11. >>> model = gm.fit(df)
  12. >>> model.hasSummary
  13. True
  14. >>> summary = model.summary
  15. >>> summary.k
  16. 3
  17. >>> summary.clusterSizes
  18. [2, 2, 2]
  19. >>> summary.logLikelihood
  20. 8.14636...
  21. >>> weights = model.weights
  22. >>> len(weights)
  23. 3
  24. >>> model.gaussiansDF.select("mean").head()
  25. Row(mean=DenseVector([0.825, 0.8675]))
  26. >>> model.gaussiansDF.select("cov").head()
  27. Row(cov=DenseMatrix(2, 2, [0.0056, -0.0051, -0.0051, 0.0046], False))
  28. >>> transformed = model.transform(df).select("features", "prediction")
  29. >>> rows = transformed.collect()
  30. >>> rows[4].prediction == rows[5].prediction
  31. True
  32. >>> rows[2].prediction == rows[3].prediction
  33. True
  34. >>> gmm_path = temp_path + "/gmm"
  35. >>> gm.save(gmm_path)
  36. >>> gm2 = GaussianMixture.load(gmm_path)
  37. >>> gm2.getK()
  38. 3
  39. >>> model_path = temp_path + "/gmm_model"
  40. >>> model.save(model_path)
  41. >>> model2 = GaussianMixtureModel.load(model_path)
  42. >>> model2.hasSummary
  43. False
  44. >>> model2.weights == model.weights
  45. True
  46. >>> model2.gaussiansDF.select("mean").head()
  47. Row(mean=DenseVector([0.825, 0.8675]))
  48. >>> model2.gaussiansDF.select("cov").head()
  49. Row(cov=DenseMatrix(2, 2, [0.0056, -0.0051, -0.0051, 0.0046], False))

2.3 管道

PySpark ML中管道的概念是用来表示从转换到评估(具有一系列不同阶段)的端到端的过程,这个过程可以对输入的一些原始数据(以DataFrame形式)执行必要的数据加工(转换),最后评估统计模型。




