外文官网:http://spark.apache.org/docs/latest/mllib-guide.html#data-types-algorithms-and-utilities
javaAPI: http://spark.apache.org/docs/latest/api/java/index.html

MLlib是Spark的机器学习(ML)库。其目标是使实用的机器学习可扩展且容易。在较高的层次上,它提供了以下工具:

  • ML算法:常见的学习算法,例如分类,回归,聚类和协作过滤
  • 功能化:特征提取,变换,降维和选择
  • 管道:用于构建,评估和调整ML管道的工具
  • 持久性:保存和加载算法,模型和管道
  • 实用程序:线性代数,统计信息,数据处理等

数据类型

MLlib支持存储在一台计算机上的局部向量(Local vector)和矩阵(local matrices ),以及由一个或多个RDD支持的分布式矩阵。 局部向量和局部矩阵是充当公共接口的简单数据模型。 监督学习中使用的训练示例在MLlib中称为“标记点”(Labeled point)。

Local vector

  1. 一个本地向量拥有从0开始的integer类型的索引以及double类型的值,它保存在单台机器上面。MLlib支持两种类型的本地向量:稠密(dense)向量和稀疏(sparse)向量。 一个稠密向量通过一个double类型的数组保存数据,这个数组表示向量的条目值(entry values);一个稀疏向量通过两个并行的数组(indicesvalues)保存数据。例如:<br />向量(5.20.05.5

密集向量表示:[5.2,0.0,5.5]

稀疏向量表示:(3,[0,2],[5.2,5.5]) # 3是向量(5.2,0.0,5.5)的长度,除去0值外,其他两个值的索引和值分别构成了数组[0,2]和数组[5.2,5.5]。

Vector是所有局部向量的基类,Dense-Vector和SparseVector都是Vector的具体实现。

  本地向量的基类是Vector,Spark提供了两种实现: DenseVector和SparseVector。 Spark官方推荐使用Vectors中实现的工厂方法去创建本地向量。下面是创建本地向量的例子。

  1. // Create a dense vector (1.0, 0.0, 3.0).
  2. Vector dv = Vectors.dense(1.0, 0.0, 3.0);
  3. // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
  4. Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});
  5. //[1.0,0.0,3.0]
  6. //(3,[0,2],[1.0,3.0])

Labeled point

标记点是与标签/响应关联的局部矢量,可以是稠密的或稀疏的。 在MLlib中,标记的点用于监督学习算法中。 使用双精度来存储标签,因此我们可以在回归和分类中使用带标签的点。 对于二进制分类,标签应为0(负数)或1(正数)。 对于多类分类,标签应为从零开始的类索引:0、1、2,…。

  1. // Create a labeled point with a positive label and a dense feature vector.
  2. LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));
  3. // Create a labeled point with a negative label and a sparse feature vector.
  4. LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

Sparse data

  1. 在实践中,很少有训练数据。 MLlib支持阅读以LIBSVM格式存储的培训示例。 它是一种文本格式,其中每行使用以下格式表示带标签的稀疏特征向量:<br />label index1:value1 index2:value2 ...

MLUtils.loadLibSVMFile读取以LIBSVM格式存储的培训示例。

  1. JavaRDD<LabeledPoint> examples =
  2. MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();

局部矩阵可以参考:http://spark.apache.org/docs/latest/mllib-data-types.html

1、决策树

  • 决策树和它的集成(ensembles)在机器学习的回归和分类问题中都很常用。由于决策树良好的可解释性、可以处理分类特征、易于扩展到多分类、不需要进行特征缩放、可以捕捉到非线性和关联特征,所以它被广泛使用。基于决策树的随机森林和提升树集成算法在分类和回归问题上有极为出色的表现。
  • 例子
  • 下面这个例子加载了一个LibSVM格式的数据集,这个数据集被分为训练集和测试集。训练集被用来训练决策树模型,测试集被用来评价结果。

_import _org.apache.spark.SparkConf;
_import _org.apache.spark.api.java.JavaPairRDD;
_import _org.apache.spark.api.java.JavaRDD;
_import _org.apache.spark.api.java.JavaSparkContext;
_import _org.apache.spark.mllib.regression.LabeledPoint;
_import _org.apache.spark.mllib.tree.DecisionTree;
_import _org.apache.spark.mllib.tree.model.DecisionTreeModel;
_import _org.apache.spark.mllib.util.MLUtils;
_import _scala.Tuple2;

import _java.util.HashMap;
_import _java.util._Map
;
_// $example off$

public class _JavaDecisionTreeClassificationExample {
_public static void _main(String[] args) {

  1. SparkConf sparkConf = _new _SparkConf()<br /> .setAppName("JavaDecisionTreeClassificationExample")<br /> .setMaster("local")<br /> .set("spark.driver.host", "localhost").set("spark.testing.memory", "21474800000");<br /> _// Create a SparkSession.<br /> _JavaSparkContext jsc=_new _JavaSparkContext(sparkConf);
  2. _// Load and parse the data file.<br /> _String datapath = "src/main/java/com/kdy/spark/ml/data/mllib/sample_libsvm_data.txt";<br /> JavaRDD<LabeledPoint> data = MLUtils._loadLibSVMFile_(jsc.sc(), datapath).toJavaRDD();<br /> _// Split the data into training and test sets (30% held out for testing)<br /> _JavaRDD<LabeledPoint>[] splits = data.randomSplit(_new double_[]{0.7, 0.3});<br /> JavaRDD<LabeledPoint> trainingData = splits[0];<br /> JavaRDD<LabeledPoint> testData = splits[1];
  3. _// Set parameters.<br /> // Empty categoricalFeaturesInfo indicates all features are continuous._<br />_//_空的categoricalFeaturesInfo表示所有要素都是连续的_<br /> int _numClasses = 2;<br /> _Map_<Integer, Integer> categoricalFeaturesInfo = _new _HashMap<>();<br /> String impurity = "gini";<br /> _int _maxDepth = 5;<br /> _int _maxBins = 32;
  4. _// Train a DecisionTree model for classification.<br /> _DecisionTreeModel model = DecisionTree._trainClassifier_(trainingData, numClasses,<br /> categoricalFeaturesInfo, impurity, maxDepth, maxBins);
  5. _// Evaluate model on test instances and compute test error<br /> _JavaPairRDD<Double, Double> predictionAndLabel =<br /> testData.mapToPair(p -> _new _Tuple2<>(model.predict(p.features()), p.label()));<br /> _double _testErr =<br /> predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (_double_) testData.count();
  6. System._out_.println("Test Error: " + testErr);<br /> System._out_.println("Learned classification tree model:\n" + model.toDebugString());
  7. _// $example off$<br /> _jsc.stop();<br /> }

2、K-means


K-means是一种广泛使用的聚类方法,可以将数据聚类为给定的数量。

// $example on$
import _org.apache.spark.SparkConf;
_import _org.apache.spark.api.java.JavaRDD;
_import _org.apache.spark.api.java.JavaSparkContext;
_import _org.apache.spark.mllib.clustering.KMeans;
_import _org.apache.spark.mllib.clustering.KMeansModel;
_import _org.apache.spark.mllib.linalg._Vector
;
_import _org.apache.spark.mllib.linalg.Vectors;

_/*
An example demonstrating k-means clustering.
Run with


  1. bin/run-example ml.JavaKMeansExample

*/
public class _JavaKMeansExample {

_public static void _main(String[] args) {

  1. SparkConf sparkConf = _new _SparkConf()<br /> .setAppName("Kmeans")<br /> .setMaster("local")<br /> .set("spark.driver.host", "localhost").set("spark.testing.memory", "21474800000");<br /> _// Create a SparkSession.<br /> _JavaSparkContext jsc=_new _JavaSparkContext(sparkConf);
  2. _// $example on$<br /> // Load and parse data<br /> _String path = "src/main/java/com/kdy/spark/ml/data/mllib/kmeans_data.txt";
  3. JavaRDD<String> data = jsc.textFile(path);
  4. JavaRDD<_Vector_> parsedData = data.map(s -> {<br /> String[] sarray = s.split(" ");<br /> _double_[] values = _new double_[sarray.length];<br /> _for _(_int _i = 0; i < sarray.length; i++) {<br /> values[i] = Double._parseDouble_(sarray[i]);<br /> }<br /> _return _Vectors._dense_(values);<br /> });<br /> parsedData.cache();
  5. _// Cluster the data into two classes using KMeans<br /> int _numClusters = 2;<br /> _int _numIterations = 20;<br /> KMeansModel clusters = KMeans._train_(parsedData.rdd(), numClusters, numIterations);
  6. System._out_.println("Cluster centers:");<br /> _for _(_Vector _center: clusters.clusterCenters()) {<br /> System._out_.println(" " + center);<br /> }<br /> _double _cost = clusters.computeCost(parsedData.rdd());<br /> System._out_.println("Cost: " + cost);
  7. _// Evaluate clustering by computing Within Set Sum of Squared Errors<br /> double _WSSSE = clusters.computeCost(parsedData.rdd());<br /> System._out_.println("Within Set Sum of Squared Errors = " + WSSSE);
  8. _// $example off$
  9. _jsc.stop();<br /> }

}

3、逻辑回归

_// $example on$
import _org.apache.spark.SparkConf;
_import _org.apache.spark.api.java.JavaSparkContext;
_import _scala.Tuple2;

import _org.apache.spark.api.java.JavaPairRDD;
_import _org.apache.spark.api.java.JavaRDD;
_import _org.apache.spark.mllib.classification.LogisticRegressionModel;
_import _org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
_import _org.apache.spark.mllib.evaluation.MulticlassMetrics;
_import _org.apache.spark.mllib.regression.LabeledPoint;
_import _org.apache.spark.mllib.util.MLUtils;
// $example off$

public class JavaLinearRegressionWithElasticNetExample {
_public static void _main(String[] args) {
SparkConf sparkConf = _new _SparkConf()
.setAppName(“Kmeans”)
.setMaster(“local”)
.set(“spark.driver.host”, “localhost”).set(“spark.testing.memory”, “21474800000”);
// Create a SparkSession.
_JavaSparkContext javaSparkContext=_new _JavaSparkContext(sparkConf);

  1. _// $example on$<br /> _String path = "src/main/java/com/kdy/spark/ml/data/mllib/sample_libsvm_data.txt";<br /> JavaRDD<LabeledPoint> data = MLUtils._loadLibSVMFile_(javaSparkContext.sc(), path).toJavaRDD();
  2. _// Split initial RDD into two... [60% training data, 40% testing data].<br /> _JavaRDD<LabeledPoint>[] splits = data.randomSplit(_new double_[] {0.6, 0.4}, 11L);<br /> JavaRDD<LabeledPoint> training = splits[0].cache();<br /> JavaRDD<LabeledPoint> test = splits[1];
  3. _// Run training algorithm to build the model.<br /> _LogisticRegressionModel model = _new _LogisticRegressionWithLBFGS()<br /> .setNumClasses(10)<br /> .run(training.rdd());
  4. _// Compute raw scores on the test set.<br /> _JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p -><br /> _new _Tuple2<>(model.predict(p.features()), p.label()));
  5. _// Get evaluation metrics.<br /> _MulticlassMetrics metrics = _new _MulticlassMetrics(predictionAndLabels.rdd());<br /> _double _accuracy = metrics.accuracy();<br /> System._out_.println("Accuracy = " + accuracy);<br /> _// $example off$
  6. _javaSparkContext.stop();<br /> }<br />}