1. 本教程简要介绍如何使用Spark。我们将首先通过Spark的交互式Shell(在PythonScala中)介绍API,然后展示如何用JavaScalaPython编写应用程序。<br />要遵循本指南,请首先从[Spark网站](http://spark.apache.org/downloads.html)下载Spark的打包版本 。由于我们不会使用HDFS,因此您可以下载适用于任何Hadoop版本的软件包。<br />请注意,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。在Spark 2.0之后,RDD被Dataset取代,Dataset类型像RDD一样强,但具有更丰富的优化功能。仍支持RDD界面,您可以在[RDD编程指南中](http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html)获得更完整的参考。但是,我们强烈建议您切换到使用数据集,该数据集的性能比RDD更好。请参阅《[SQL编程指南》](http://spark.apache.org/docs/2.2.0/sql-programming-guide.html)以获取有关数据集的更多信息。

使用Spark Shell进行交互式分析(Scala)

基本

Spark Shell提供了学习API的简单方法,以及强大的工具来交互式地分析数据。它可以在Scala(可在Java VM上运行,因此是使用现有Java库的好方法)。通过在Spark目录中运行以下命令来启动它:

  1. ./bin/spark-shell

Spark的主要抽象是称为数据集的项目的分布式集合。可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建数据集。让我们从Spark源目录中的README文件的文本中创建一个新的数据集:

  1. scala> val textFile = spark.read.textFile("README.md")
  2. textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以通过调用某些操作直接从数据集中获取值,或转换数据集以获取新值。有关更多详细信息,请阅读API文档

  1. scala> textFile.count() // Number of items in this Dataset
  2. res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
  3. scala> textFile.first() // First item in this Dataset
  4. res1: String = # Apache Spark

现在,让我们将此数据集转换为新数据集。我们调用filter返回一个新的数据集,其中包含文件中项的子集。

  1. scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
  2. linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以将转换和动作链接在一起:

  1. scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
  2. res3: Long = 15

有关数据集操作的更多信息

数据集操作和转换可用于更复杂的计算。假设我们要查找包含最多单词的行:

  • scala

    1. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    2. res4: Long = 15

    首先,将一条线映射到一个整数值,以创建一个新的数据集。reduce在该数据集上调用,以找到最大的字数。到的参数mapreduce是Scala的函数文本(关闭),并且可以使用任何语言功能或斯卡拉/ Java库。例如,我们可以轻松地调用在其他地方声明的函数。我们将使用Math.max()函数使此代码更易于理解:

    1. scala> import java.lang.Math
    2. import java.lang.Math
    3. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    4. res5: Int = 15

    一种常见的数据流模式是Hadoop流行的MapReduce。Spark可以轻松实现MapReduce流:

    1. scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    2. wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

    在这里,我们调用flatMap将行的数据集转换为单词的数据集,然后组合groupByKeycount计算文件中每个单词的计数,作为(字符串,长整数)对的数据集。要收集外壳中的字数,我们可以调用collect

    1. scala> wordCounts.collect()
    2. res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

    缓存

    Spark还支持将数据集提取到群集范围的内存中缓存中。当重复访问数据时,例如查询小的“热”数据集或运行迭代算法(如PageRank)时,这非常有用。作为一个简单的示例,让我们将linesWithSpark数据集标记为要缓存:

    1. scala> linesWithSpark.cache()
    2. res7: linesWithSpark.type = [value: string]
    3. scala> linesWithSpark.count()
    4. res8: Long = 15
    5. scala> linesWithSpark.count()
    6. res9: Long = 15

    使用Spark浏览和缓存100行文本文件似乎很愚蠢。有趣的是,即使在数十个或数百个节点上进行条带化时,这些相同的函数也可以用于非常大的数据集。您也可以通过连接bin/spark-shell到集群来交互式地执行此操作,如RDD编程指南中所述

    自包含的应用程序

    假设我们希望使用Spark API编写一个独立的应用程序。我们将逐步介绍Scala(带有sbt),Java(带有Maven)和Python的简单应用程序。
    我们将在Scala中创建一个非常简单的Spark应用程序-实际上如此简单,它名为SimpleApp.scala

    1. /* SimpleApp.scala */
    2. import org.apache.spark.sql.SparkSession
    3. object SimpleApp {
    4. def main(args: Array[String]) {
    5. val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    6. val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    7. val logData = spark.read.textFile(logFile).cache()
    8. val numAs = logData.filter(line => line.contains("a")).count()
    9. val numBs = logData.filter(line => line.contains("b")).count()
    10. println(s"Lines with a: $numAs, Lines with b: $numBs")
    11. spark.stop()
    12. }
    13. }

    注意,应用程序应该定义一个main()方法而不是extend scala.App。的子类scala.App可能无法正常工作。
    该程序只计算Spark自述文件中包含“ a”的行数和包含“ b”的行数。请注意,您需要用安装Spark的位置替换YOUR_SPARK_HOME。与早期带有Spark外壳的示例(其初始化其自己的SparkSession)不同,我们将SparkSession初始化为程序的一部分。
    我们调用SparkSession.builder构造一个[[SparkSession]],然后设置应用程序名称,最后调用getOrCreate以获得[[SparkSession]]实例。
    我们的应用程序依赖于Spark API,因此我们还将包括一个sbt配置文件build.sbt,该文件 解释了Spark是一个依赖项。该文件还添加了Spark依赖的存储库:

    1. name := "Simple Project"
    2. version := "1.0"
    3. scalaVersion := "2.11.8"
    4. libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

    对于SBT正常工作,我们需要布局SimpleApp.scalabuild.sbt 根据典型的目录结构。安装好之后,我们可以创建一个包含应用程序代码的JAR包,然后使用spark-submit脚本来运行我们的程序。

    1. # Your directory layout should look like this
    2. $ find .
    3. .
    4. ./build.sbt
    5. ./src
    6. ./src/main
    7. ./src/main/scala
    8. ./src/main/scala/SimpleApp.scala
    9. # Package a jar containing your application
    10. $ sbt package
    11. ...
    12. [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
    13. # Use spark-submit to run your application
    14. $ YOUR_SPARK_HOME/bin/spark-submit \
    15. --class "SimpleApp" \
    16. --master local[4] \
    17. target/scala-2.11/simple-project_2.11-1.0.jar
    18. ...
    19. Lines with a: 46, Lines with b: 23

    写下来
    祝贺您运行第一个Spark应用程序!

  • 有关API的深入概述,请从RDD编程指南SQL编程指南开始,或参阅“编程指南”菜单以获取其他组件。

  • 要在集群上运行应用程序,请转至部署概述
  • 最后,Spark在examples目录(ScalaJavaPythonR)中包含几个示例。您可以如下运行它们:
    1. # For Scala and Java, use run-example:
    2. ./bin/run-example SparkPi
    3. # For Python examples, use spark-submit directly:
    4. ./bin/spark-submit examples/src/main/python/pi.py
    5. # For R examples, use spark-submit directly:
    6. ./bin/spark-submit examples/src/main/r/dataframe.R