上一篇文章介绍了如何通过 Spark 来运行 Scala 程序,本篇文章介绍下如何通过 Spark 运行 Java 程序。

1. 环境安装

关于本篇文档的运行环境,我是通过 SDKMAN 安装的 Spark、Scala 以及 SBT,非常方便,大家自己查阅一下官网就可以很快安装好环境。

  1. spark == 3.1.2
  2. scala == 2.12.14
  3. sbt == 1.5.5
  4. jdk == 1.8.0_292

2. 构建工程项目

我们先来将工程目录创建出来

  1. $ mkdir -p ~/projects/sparkApps/MLAccuracy/src/main/java/
  2. $ touch ~/projects/sparkApps/MLAccuracy/src/main/java/CalcAccuracy.java
  3. $ touch ~/projects/sparkApps/MLAccuracy/pom.xml

创建完之后的目录结构如下:

  1. MLAccuracy
  2. ├── pom.xml
  3. └── src
  4. └── main
  5. └── java
  6. └── CalcAccuracy.java

3. 主要Java源码

计算 Accuracy 的 Java 源码(CalcAccuracy.java):

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import scala.Tuple2;
  4. import org.apache.spark.api.java.*;
  5. import org.apache.spark.mllib.evaluation.MulticlassMetrics;
  6. import org.apache.spark.SparkConf;
  7. public final class CalcAccuracy {
  8. public static void main(String[] args) throws Exception {
  9. SparkConf sparkConf = new SparkConf().setAppName("CalculateAccuracy").setMaster("local");
  10. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  11. List<Tuple2<Double, Double>> data = Arrays.asList(
  12. new Tuple2<>(1.0, 0.0),
  13. new Tuple2<>(0.0, 1.0),
  14. new Tuple2<>(0.0, 0.0),
  15. new Tuple2<>(0.0, 0.0),
  16. new Tuple2<>(0.0, 0.0)
  17. );
  18. JavaRDD<Tuple2<Double, Double>> predictionsAndLabels = sc.parallelize(data);
  19. MulticlassMetrics metrics = new MulticlassMetrics(predictionsAndLabels.rdd());
  20. System.out.println(metrics.accuracy());
  21. }
  22. }

4. Maven打包文件

Maven 打包文件(pom.xml):

  1. <project>
  2. <groupId>com.y.x</groupId>
  3. <artifactId>calulate-accuracy</artifactId>
  4. <modelVersion>4.0.0</modelVersion>
  5. <name>Calculate Accuracy</name>
  6. <packaging>jar</packaging>
  7. <version>1.0</version>
  8. <repositories>
  9. <repository>
  10. <id>jboss</id>
  11. <name>JBoss Repository</name>
  12. <url>http://repository.jboss.com/maven2/</url>
  13. </repository>
  14. </repositories>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.scala-lang</groupId>
  18. <artifactId>scala-library</artifactId>
  19. <version>2.12.14</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.spark</groupId>
  23. <artifactId>spark-core_2.12</artifactId>
  24. <version>3.1.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.spark</groupId>
  28. <artifactId>spark-mllib_2.12</artifactId>
  29. <version>3.1.2</version>
  30. </dependency>
  31. </dependencies>
  32. <properties>
  33. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  34. <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
  35. <java.version>1.8</java.version>
  36. <maven.compiler.source>1.8</maven.compiler.source>
  37. <maven.compiler.target>1.8</maven.compiler.target>
  38. </properties>
  39. </project>

在项目根目录下运行:

  1. $ mvn package
  2. [INFO] Scanning for projects...
  3. [INFO]
  4. [INFO] ---------------------< com.y.x:calulate-accuracy >----------------------
  5. [INFO] Building Calculate Accuracy 1.0
  6. [INFO] --------------------------------[ jar ]---------------------------------
  7. [INFO]
  8. [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ calulate-accuracy ---
  9. [INFO] Using 'UTF-8' encoding to copy filtered resources.
  10. [INFO] skip non existing resourceDirectory /home/yumingmin/projects/sparkApps/MLAccuracy/src/main/resources
  11. [INFO]
  12. [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ calulate-accuracy ---
  13. [INFO] Changes detected - recompiling the module!
  14. [INFO] Compiling 1 source file to /home/yumingmin/projects/sparkApps/MLAccuracy/target/classes
  15. [INFO]
  16. [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ calulate-accuracy ---
  17. [INFO] Using 'UTF-8' encoding to copy filtered resources.
  18. [INFO] skip non existing resourceDirectory /home/yumingmin/projects/sparkApps/MLAccuracy/src/test/resources
  19. [INFO]
  20. [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ calulate-accuracy ---
  21. [INFO] No sources to compile
  22. [INFO]
  23. [INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ calulate-accuracy ---
  24. [INFO] No tests to run.
  25. [INFO]
  26. [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ calulate-accuracy ---
  27. [INFO] Building jar: /home/yumingmin/projects/sparkApps/MLAccuracy/target/calulate-accuracy-1.0.jar
  28. [INFO] ------------------------------------------------------------------------
  29. [INFO] BUILD SUCCESS
  30. [INFO] ------------------------------------------------------------------------
  31. [INFO] Total time: 2.308 s
  32. [INFO] Finished at: 2021-09-22T19:15:31+08:00
  33. [INFO] ------------------------------------------------------------------------

5. 运行程序

Spark 中运行 Java 程序还是使用 spark-submit 来运行,这里日志输出会很多,我们只截取了最后几行:

  1. $ spark-submit --class "CalcAccuracy" ~/projects/sparkApps/MLAccuracy/target/calcuate-accuracy-1.0.jar
  2. 21/09/22 19:15:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
  3. 21/09/22 19:15:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1582 bytes result sent to driver
  4. 21/09/22 19:15:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 104 ms on master104088 (executor driver) (1/1)
  5. 21/09/22 19:15:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
  6. 21/09/22 19:15:41 INFO DAGScheduler: ResultStage 1 (collectAsMap at MulticlassMetrics.scala:61) finished in 0.118 s
  7. 21/09/22 19:15:41 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
  8. 21/09/22 19:15:41 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
  9. 21/09/22 19:15:41 INFO DAGScheduler: Job 0 finished: collectAsMap at MulticlassMetrics.scala:61, took 0.761263 s
  10. 0.6

注意:这里 --class 参数后面跟的是 CalcAccuracy.java 中的 类。

6. 参考文档