WordCount案例实操

Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDEA中编制程序,然后打成Jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理Jar包的依赖。

编写程序

1)创建一个Maven项目WordCount
2)在项目WordCount上点击右键,Add Framework Support=》勾选scala
3)在main下创建scala文件夹,并右键Mark Directory as Sources Root=>在scala下创建包名为com.atguigu.spark
4)输入文件夹准备:在新建的WordCount项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。
image.png

5)导入项目依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <sourceDirectory>src/main/scala</sourceDirectory>
  10. <plugins>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-compiler-plugin</artifactId>
  14. <version>3.0</version>
  15. <configuration>
  16. <source>1.8</source>
  17. <target>1.8</target>
  18. <encoding>UTF-8</encoding>
  19. </configuration>
  20. </plugin>
  21. <plugin>
  22. <groupId>net.alchim31.maven</groupId>
  23. <artifactId>scala-maven-plugin</artifactId>
  24. <version>3.2.0</version>
  25. <executions>
  26. <execution>
  27. <goals>
  28. <goal>compile</goal>
  29. <goal>testCompile</goal>
  30. </goals>
  31. <configuration>
  32. <args>
  33. <arg>-dependencyfile</arg>
  34. <arg>${project.build.directory}/.scala_dependencies</arg>
  35. </args>
  36. </configuration>
  37. </execution>
  38. </executions>
  39. </plugin>
  40. <plugin>
  41. <groupId>org.apache.maven.plugins</groupId>
  42. <artifactId>maven-shade-plugin</artifactId>
  43. <version>3.1.1</version>
  44. <executions>
  45. <execution>
  46. <phase>package</phase>
  47. <goals>
  48. <goal>shade</goal>
  49. </goals>
  50. <configuration>
  51. <filters>
  52. <filter>
  53. <artifact>*:*</artifact>
  54. <excludes>
  55. <exclude>META-INF/*.SF</exclude>
  56. <exclude>META-INF/*.DSA</exclude>
  57. <exclude>META-INF/*.RSA</exclude>
  58. </excludes>
  59. </filter>
  60. </filters>
  61. <transformers>
  62. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  63. <mainClass></mainClass>
  64. </transformer>
  65. </transformers>
  66. </configuration>
  67. </execution>
  68. </executions>
  69. </plugin>
  70. </plugins>
  71. </build>
  1. 注意:如果maven版本为3.2.x,插件下载报错,那么修改插件版本为3.3.2<br />6)创建伴生对象WordCount,编写代码
  1. package com.atguigu.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. //1.创建SparkConf并设置App名称
  7. val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
  8. //2.创建SparkContext,该对象是提交Spark App的入口
  9. val sc = new SparkContext(conf)
  10. //3.读取指定位置文件:hello atguigu atguigu
  11. val lineRdd: RDD[String] = sc.textFile("input")
  12. //4.读取的一行一行的数据分解成一个一个的单词(扁平化)(hello)(atguigu)(atguigu)
  13. val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
  14. //5. 将数据转换结构:(hello,1)(atguigu,1)(atguigu,1)
  15. val wordToOneRdd: RDD[(String, Int)] = wordRdd.map(word => (word, 1))
  16. //6.将转换结构后的数据进行聚合处理 atguigu:1、1 =》1+1 (atguigu,2)
  17. val wordToSumRdd: RDD[(String, Int)] = wordToOneRdd.reduceByKey((v1, v2) => v1 + v2)
  18. //7.将统计结果采集到控制台打印
  19. val wordToCountArray: Array[(String, Int)] = wordToSumRdd.collect()
  20. wordToCountArray.foreach(println)
  21. //一行搞定
  22. //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(args(1))
  23. //8.关闭连接
  24. sc.stop()
  25. }
  26. }


7)打包插件

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-assembly-plugin</artifactId>
  4. <version>3.0.0</version>
  5. <configuration>
  6. <archive>
  7. <manifest>
  8. <mainClass>com.atguigu.spark.WordCount</mainClass>
  9. </manifest>
  10. </archive>
  11. <descriptorRefs>
  12. <descriptorRef>jar-with-dependencies</descriptorRef>
  13. </descriptorRefs>
  14. </configuration>
  15. <executions>
  16. <execution>
  17. <id>make-assembly</id>
  18. <phase>package</phase>
  19. <goals>
  20. <goal>single</goal>
  21. </goals>
  22. </execution>
  23. </executions>
  24. </plugin>

8)打包到集群测试
(1)点击package打包,然后,查看打完后的jar包
image.pngimage.png
(2)将WordCount.jar上传到/opt/module/spark-yarn目录
(3)在HDFS上创建,存储输入文件的路径/input

  1. [atguigu@hadoop102 spark-yarn]$ hadoop fs -mkdir /input

(4)上传输入文件到/input路径

  1. [atguigu@hadoop102 spark-yarn]$ hadoop fs -put /opt/module/spark-local/input/1.txt /input

(5)执行任务

  1. [atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
  2. --class com.atguigu.spark.WordCount \
  3. --master yarn \
  4. WordCount.jar \
  5. /input \
  6. /output

注意:input和ouput都是HDFS上的集群路径。
(6)查询运行结果

  1. [atguigu@hadoop102 spark-yarn]$ hadoop fs -cat /output/*

9)注意:如果运行发生压缩类没找到
(1)原因
Spark on Yarn会默认使用Hadoop集群配置文件设置编码方式,但是Spark在自己的spark-yarn/jars 包里面没有找到支持lzo压缩的jar包,所以报错。
(2)解决方案一:拷贝lzo的包到/opt/module/spark-yarn/jars目录

  1. [atguigu@hadoop102 common]$
  2. cp /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar /opt/module/spark-yarn/jars

(3)解决方案二:在执行命令的时候指定lzo的包位置

  1. [atguigu@hadoop102 spark-yarn]$
  2. bin/spark-submit \
  3. --class com.atguigu.spark.WordCount \
  4. --master yarn \
  5. --driver-class-path /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar \
  6. WordCount.jar \
  7. /input \
  8. /output

本地调试

本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。如下:
1)代码实现

  1. package com.atguigu.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. //1.创建SparkConf并设置App名称
  7. val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
  8. //2.创建SparkContext,该对象是提交Spark App的入口
  9. val sc = new SparkContext(conf)
  10. //3.读取指定位置文件:hello atguigu atguigu
  11. val lineRdd: RDD[String] = sc.textFile("input")
  12. //4.读取的一行一行的数据分解成一个一个的单词(扁平化)(hello)(atguigu)(atguigu)
  13. val wordRdd: RDD[String] = lineRdd.flatMap(_.split(" "))
  14. //5. 将数据转换结构:(hello,1)(atguigu,1)(atguigu,1)
  15. val wordToOneRdd: RDD[(String, Int)] = wordRdd.map((_, 1))
  16. //6.将转换结构后的数据进行聚合处理 atguigu:1、1 =》1+1 (atguigu,2)
  17. val wordToSumRdd: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)
  18. //7.将统计结果采集到控制台打印
  19. wordToSumRdd.collect().foreach(println)
  20. //8.关闭连接
  21. sc.stop()
  22. }
  23. }

2)调试流程
image.png

关联源码

1)按住ctrl键,点击RDD
image.png
2)提示下载或者绑定源码
image.png
3)解压资料包中spark-3.0.0.tgz到非中文路径。例如解压到:E:\02_software
4)点击Attach Sources…按钮,选择源码路径E:\02_software\spark-3.0.0

异常处理

如果本机操作系统是Windows,如果在程序中使用了Hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:
image.png
出现这个问题的原因,并不是程序的错误,而是用到了Hadoop相关的服务,解决办法
1)配置HADOOP_HOME环境变量
2)在IDEA中配置Run Configuration,添加HADOOP_HOME变量
image.png
image.png