流程

  1. 创建项目
  2. 到pom.xml文件中

看spark版本多少, 还有scala版本

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. </properties>
  5. <dependencies>
  6. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-core_2.13</artifactId>
  10. <version>3.2.1</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-yarn_2.13</artifactId>
  16. <version>3.2.1</version>
  17. <scope>provided</scope>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  20. <dependency>
  21. <groupId>org.apache.spark</groupId>
  22. <artifactId>spark-sql_2.13</artifactId>
  23. <version>3.2.1</version>
  24. <scope>provided</scope>
  25. </dependency>
  26. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  27. <dependency>
  28. <groupId>mysql</groupId>
  29. <artifactId>mysql-connector-java</artifactId>
  30. <version>8.0.28</version>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-hive_2.13</artifactId>
  36. <version>3.2.1</version>
  37. <scope>provided</scope>
  38. </dependency>
  39. <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
  40. <dependency>
  41. <groupId>org.apache.hive</groupId>
  42. <artifactId>hive-exec</artifactId>
  43. <version>3.1.2</version>
  44. </dependency>
  45. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
  46. <dependency>
  47. <groupId>org.apache.spark</groupId>
  48. <artifactId>spark-streaming_2.13</artifactId>
  49. <version>3.2.1</version>
  50. <scope>provided</scope>
  51. </dependency>
  52. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
  53. <dependency>
  54. <groupId>org.apache.spark</groupId>
  55. <artifactId>spark-streaming-kafka_2.11</artifactId>
  56. <version>1.6.3</version>
  57. </dependency>
  58. <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
  59. <dependency>
  60. <groupId>com.fasterxml.jackson.core</groupId>
  61. <artifactId>jackson-core</artifactId>
  62. <version>2.13.2</version>
  63. </dependency>
  64. <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
  65. <dependency>
  66. <groupId>com.alibaba</groupId>
  67. <artifactId>druid</artifactId>
  68. <version>1.2.8</version>
  69. </dependency>
  70. </dependencies>
  71. <build>
  72. <plugins>
  73. <plugin>
  74. <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
  75. <!-- https://mvnrepository.com/artifact/net.alchim31.maven/scala-maven-plugin -->
  76. <groupId>net.alchim31.maven</groupId>
  77. <artifactId>scala-maven-plugin</artifactId>
  78. <version>4.5.6</version>
  79. <executions>
  80. <execution>
  81. <!-- 将声明绑定到maven的compile阶段 -->
  82. <goals>
  83. <goal>testCompile</goal>
  84. </goals>
  85. </execution>
  86. </executions>
  87. </plugin>
  88. <plugin>
  89. <groupId>org.apache.maven.plugins</groupId>
  90. <artifactId>maven-assembly-plugin</artifactId>
  91. <version>3.3.0</version>
  92. <configuration>
  93. <descriptorRefs>
  94. <descriptorRef>jar-with-dependencies</descriptorRef>
  95. </descriptorRefs>
  96. </configuration>
  97. <executions>
  98. <execution>
  99. <id>make-assembly</id>
  100. <phase>package</phase>
  101. <goals>
  102. <goal>single</goal>
  103. </goals>
  104. </execution>
  105. </executions>
  106. </plugin>
  107. </plugins>
  108. </build>

如果是子模块的话, 需要添加scala的支持
image.png

  1. 创建scala类
  1. import org.apache.spark.SparkContext //一切任务的起源,所有计算的开头
  2. import org.apache.spark.SparkConf //spark的配置信息,相当于mr当中的那个conf,他会覆盖掉默认的配置文件(如果你进行了配置),他的主要作用,app的名字,设置运行时本地模式还是集群模式
  1. 写代码(参考官方文档)
    如果是在windows上运行,设置setMaster(‘local[n]’)
    如果是线上运行,把setMaster(‘local[n]’)去掉,或者setMaster(‘spark://master’)(不建议,因为写死了)
    注意两个关键词:transformation,action

    spark中saveAsTextFile如何最终生成一个文件

在默认textfile中,如果从hdfs中读取文件,源码中默认的分区数是2,如果想改变分区数,可以在textfile中设置第二个参数”分区数”

一般而言.saveAsTextFile会按照执行task的多少生成多少个文件,比如part-00000一直到part-0000n,n自然就是task的个数,也就是最后的stage的分区数

在RDD上调用coalesce(1, true).saveAsTextFile(),意味着做完计算之后将数据汇集到一个分区,然后再执行保存的动作.显然,一个分区,spark自然只起一个task来执行保存的动作.也就只有一个文件产生了,又或者可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true.

虽然可以这么做,但是代价是巨大的.因为spark面对的是大量的数据,并且是并行执行的.如果强行要求最后只有一个分区,必然会导致大量的磁盘IO和网络IO产生,并且最终执行reduce操作的节点的内存也会承受很大考验.spark会很慢,甚至会死掉

saveAsTextFile要求保存的目录之前是没有的,否则会报错.所以先判断一下

shell

打jar包,去掉setMaster

  1. spark-submit \
  2. --master spark://master:7077 \
  3. --executor-memory 512M \
  4. --total-executor-cores 2 \
  5. --class org.apache.spark.WordCount \
  6. xxx.jar \
  7. in \
  8. out \