在用 Spark 实现计算 Accuracy 时,代码在 Spark-shell 中可以运行得很好,但是想了一下,实际模型预测时,不可能是通过 Spark-shell 来实现预测,就有了本篇文章——如何在 Spark 中运行 scala 程序。

1. 环境安装

关于本篇文档的运行环境,我是通过 SDKMAN 安装的 Spark、Scala 以及 SBT,非常方便,大家自己查阅一下官网就可以很快安装好环境。

  1. spark == 3.1.2
  2. scala == 2.12.14
  3. sbt == 1.5.5

2. 构建工程项目

我们先来将工程目录创建出来

  1. $ mkdir -p ~/projects/sparkApps/MLAccuracy/src/main/scala/
  2. $ touch ~/projects/sparkApps/MLAccuracy/src/main/scala/calculateAccuracy.scala
  3. $ touch ~/projects/sparkApps/MLAccuracy/calculate.sbt

创建完之后的目录结构如下:

  1. MLAccuracy
  2. ├── calculateAccuracy.sbt
  3. └── src
  4. └── main
  5. └── scala
  6. └── calculateAccuracy.scala

3. 主要代码

计算 Accuracy 的 Scala 源码(calculateAccuracy.scala):

  1. import org.apache.spark.mllib.evaluation.MulticlassMetrics
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object CalulateAccuracy {
  4. def main(args: Array[String]) {
  5. val conf = new SparkConf().setAppName("calculateAccuracy").setMaster("local")
  6. val sc = new SparkContext(conf)
  7. sc.setLogLevel("ERROR")
  8. /**
  9. * 实际为真(P) 实际为假(N)
  10. * 预测为真(T) 0 1
  11. * 预测为假(F) 1 3
  12. */
  13. val TN = Array((1.0, 0.0))
  14. val FP = Array((0.0, 1.0))
  15. val FN = new Array[(Double, Double)](3)
  16. for (i <- FN.indices) {
  17. FN(i) = (0.0, 0.0)
  18. }
  19. val all = TN ++ FP ++ FN
  20. val predictionsAndLabels = sc.parallelize(all)
  21. val metrics = new MulticlassMetrics(predictionsAndLabels)
  22. println(metrics.accuracy)
  23. }
  24. }

SBT 打包文件(calculateAccuracy.sbt):

  1. name := "Calculate Accuracy"
  2. version := "1.0"
  3. scalaVersion := "2.12.14"
  4. libraryDependencies ++= Seq(
  5. "org.apache.spark" %% "spark-core" % "3.1.2",
  6. "org.apache.spark" %% "spark-mllib" % "3.1.2"
  7. )

注意:这里已经是我配置好的 sbt 文件,其实一开始并没有这里顺利,我们在下面一节详细讲解遇到的问题。

4. 使用SBT打包JAR文件

由于我使用 SDKMAN 安装的 scala 为 2.13.6,对应的 sbt 版本为 1.5.5,spark 版本为 3.12,所以一开始 sbt 文件内容如下:

  1. name := "Calculate Accuracy"
  2. version := "1.0"
  3. scalaVersion := "2.13.6"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"

4.1 打包命令

在 MLAccuracy 文件夹下运行 sbt package 就可以将项目进行打包:

  1. $ sbt package
  2. [info] welcome to sbt 1.5.5 (AdoptOpenJDK Java 1.8.0_292)
  3. [info] loading project definition from /home/yumingmin/projects/sparkApps/MLAccuracy/project
  4. [info] loading settings for project mlaccuracy from calcluateAccuracy.sbt ...
  5. [info] set current project to Calcuate Accuracy (in build file:/home/yumingmin/projects/sparkApps/MLAccuracy/)
  6. [info] Updating
  7. https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.6/scala-library-2.13.6.pom
  8. 100.0% [##########] 1.6 KiB (1.6 KiB / s)
  9. [info] Resolved dependencies
  10. [warn]
  11. [warn] Note: Unresolved dependencies path:
  12. [error] sbt.librarymanagement.ResolveException: Error downloading org.apache.spark:spark-core_2.13:3.1.2
  13. [error] Not found
  14. [error] Not found
  15. [error] not found: /home/yumingmin/.ivy2/localorg.apache.spark/spark-core_2.13/3.1.2/ivys/ivy.xml
  16. [error] not found: https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.13/3.1.2/spark-core_2.13-3.1.2.pom

这是因为 Scala 版本与 SBT 版本兼容性存在问题,我们可以用过 sbt about 来查看所使用使用的版本:

  1. $ sdk about
  2. [info] This is sbt 1.5.5
  3. [info] The current project is ProjectRef(uri("file:/home/yumingmin/projects/sparkApps/MLAccuracy/"), "mlaccuracy") 1.0
  4. [info] The current project is built against Scala 2.13.6
  5. [info] Available Plugins
  6. [info] - sbt.ScriptedPlugin
  7. [info] - sbt.plugins.CorePlugin
  8. [info] - sbt.plugins.Giter8TemplatePlugin
  9. [info] - sbt.plugins.IvyPlugin
  10. [info] - sbt.plugins.JUnitXmlReportPlugin
  11. [info] - sbt.plugins.JvmPlugin
  12. [info] - sbt.plugins.MiniDependencyTreePlugin
  13. [info] - sbt.plugins.SbtPlugin
  14. [info] - sbt.plugins.SemanticdbPlugin
  15. [info] sbt, sbt plugins, and build definitions are using Scala 2.12.14

此时依旧还是会报错的,但是我们不用管它,可以看到它说要求的 Scala 版本为 2.12.14,我们来修改一下 sbt 文件:

  1. name := "Calculate Accuracy"
  2. version := "1.0"
  3. scalaVersion := "2.12.14"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2",

继续使用 sbt package 进行打包。

4.2 Spark依赖包问题

可以还是出现问题,现在报错信息如下,说明 mllib 包并未导入到环境中:

  1. sbt package
  2. [info] welcome to sbt 1.5.5 (AdoptOpenJDK Java 1.8.0_292)
  3. [info] loading project definition from /home/yumingmin/projects/sparkApps/MLAccuracy/project
  4. [info] loading settings for project mlaccuracy from calcluateAccuracy.sbt ...
  5. [info] set current project to mlaccuracy (in build file:/home/yumingmin/projects/sparkApps/MLAccuracy/)
  6. [info] compiling 1 Scala source to /home/yumingmin/projects/sparkApps/MLAccuracy/target/scala-2.12/classes ...
  7. [error] /home/yumingmin/projects/sparkApps/MLAccuracy/src/main/scala/calcluateAccuracy.scala:1:12: object apache is not a member of package org
  8. [error] import org.apache.spark.mllib.evaluation.MulticlassMetrics

解决办法:将需要的包都导入进来

  1. name := "Calculate Accuracy"
  2. version := "1.0"
  3. scalaVersion := "2.12.14"
  4. libraryDependencies ++= Seq(
  5. "org.apache.spark" %% "spark-core" % "3.1.2",
  6. "org.apache.spark" %% "spark-mllib" % "3.1.2"
  7. )

也可以这样写 libraryDependencies

  1. name := "Calculate Accuracy"
  2. version := "1.0"
  3. scalaVersion := "2.12.14"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
  5. libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.1.2"

再次进行打包,发现已经可以完全编译成 JAR 文件了,在 MLAccuracy/target/scala-2.12/ 目录下生成一个 calculate-accuracy_2.12-1.0.jar 文件。

  1. $ sbt package
  2. [info] welcome to sbt 1.5.5 (AdoptOpenJDK Java 1.8.0_292)
  3. [info] loading project definition from /home/yumingmin/projects/sparkApps/MLAccuracy/project
  4. [info] loading settings for project mlaccuracy from calculateAccuracy.sbt ...
  5. [info] set current project to Calculate Accuracy (in build file:/home/yumingmin/projects/sparkApps/MLAccuracy/)
  6. [info] compiling 1 Scala source to /home/yumingmin/projects/sparkApps/MLAccuracy/target/scala-2.12/classes ...
  7. [success] Total time: 10 s, completed Sep 22, 2021 3:03:16 PM

5. 运行程序

Spark 中运行 Scala 程序使用 spark-submit 来运行,这里日志输出会很多,我们只截取了最后几行:

  1. $ spark-submit --class "CalculateAccuracy" ~/projects/sparkApps/MLAccuracy/target/scala-2.12/calcuate-accuracy_2.12-1.0.jar
  2. 21/09/22 15:11:58 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, master104088, 38507, None)
  3. 21/09/22 15:11:58 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, master104088, 38507, None)
  4. 0.6

注意:这里 --class 参数后面跟的是 calculateAccuracy.scala 中的 类。

如果整个程序运行没有问题,但是终端不输出结果,需要将 $SPARK_HOME/conf/log4j.properties 修改以下内容:

  1. log4j.rootCategory=INFO, console

6. 参考文档