概述

官方网站 http://spark.apache.org/
官方文档地址:http://spark.apache.org/docs/latest/
Spark基础概述 - 图1

下载

安装Java环境
openjdk 官方教程地址 http://openjdk.java.net/install/

  1. # yum search openjdk-dev

Spark基础概述 - 图2

http://spark.apache.org/downloads.html

Spark基础概述 - 图3

配置环境变量

  1. export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
  2. export HADOOP_HOME=$HOME/app/hadoop
  3. export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
  4. export SPARK_HOME=$HOME/app/spark
  5. export PATH=$PATH:$SPARK_HOME/bin

目录结构

目录 说明
bin 可执行脚本,Spark相关命令
conf spark配置文件
data spark自带例子用到的数据
examples spark自带样例程序
jars spark相关jar包
sbin 集群启停,因为spark有自带的集群环境

Spark软件包bin目录说明:

命令 说明
spark-shell spark shell模式启动命令
spark-submit spark应用程序提交脚本
run-example 运行spark提供的样例程序
spark-sql spark SQL命令启动命令

启动

/root/app/spark/sbin

  1. ./start-all.sh

提交任务

  1. spark-submit --class org.apache.spark.examples.SparkPi --master spark://aliyun:7077 /root/app/spark/examples/jars/spark-examples_2.11-2.3.3.jar 100

计算结果

  1. 2019-08-16 20:17:34 INFO DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 8.394727 s
  2. Pi is roughly 3.1414743141474313

命令解释:
spark-submint :提交命令,提交应用程序,该命令在spark安装目录下的bin底下
–class org.apache.spark.examples.SparkPi:应用程序的主类
–master spark://aliyun:7077 :运行的master
/root/app/spark/examples/jars/spark-examples_2.11-2.3.3.jar:jar包所在路径

spak-shell

local模式

Local模式就是运行在一台计算机上的模式。
local 所有计算都运行在一个线程当中
locak[K]指定使用的几个线程运行计算
local[*]安装CPU最大core来设置线程数

spark

  1. $ spark-shell --master spark://aliyun:7077
  1. #spark-shell
  2. 2019-08-05 19:31:54 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. Setting default log level to "WARN".
  4. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  5. Spark context Web UI available at http://aliyun:4040
  6. Spark context available as 'sc' (master = local[*], app id = local-1565004727280).
  7. Spark session available as 'spark'.
  8. Welcome to
  9. ____ __
  10. / __/__ ___ _____/ /__
  11. _\ \/ _ \/ _ `/ __/ '_/
  12. /___/ .__/\_,_/_/ /_/\_\ version 2.3.3
  13. /_/
  14. Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
  15. Type in expressions to have them evaluated.
  16. Type :help for more information.

spark laocal本地运行模式 local[*]以当前cpu核数运行

  1. Spark context available as 'sc' (master = local[*], app id = local-1565004727280).
  2. Spark session available as 'spark'.

在spark的安装目录底下创建测试文件

  1. # cd $SPARK_HOME
  2. # mkdir input
  3. # vim word.txt
  4. hello world
  5. hello spark
  6. hello hadoop

启动spark-shell, 读取本地文件input文件夹数据;

  1. # spark-shell
  2. scala> sc.textFile("input")
  3. res0: org.apache.spark.rdd.RDD[String] = input MapPartitionsRDD[1] at textFile at <console>:25

压平操作,按照空格分割符将一行数据映射成一个个单词;

  1. scala> sc.textFile("input").flatMap(_.split(" "))
  2. res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25

对每一个元素操作,将单词映射为元组;

  1. scala> sc.textFile("input").flatMap(_.split(" "))
  2. res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25

按照key将值进行聚合,相加,统计结果

  1. scala> sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  2. res4: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))

Maven 的worldcount

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.bx</groupId>
  7. <artifactId>WordCount</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <spark.version>2.2.3</spark.version>
  11. </properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-core_2.11</artifactId>
  16. <version>${spark.version}</version>
  17. </dependency>
  18. </dependencies>
  19. <build>
  20. <finalName>WordCount</finalName>
  21. <plugins>
  22. <plugin>
  23. <groupId>net.alchim31.maven</groupId>
  24. <artifactId>scala-maven-plugin</artifactId>
  25. <version>4.1.1</version>
  26. <executions>
  27. <execution>
  28. <goals>
  29. <goal>compile</goal>
  30. </goals>
  31. </execution>
  32. </executions>
  33. </plugin>
  34. </plugins>
  35. </build>
  36. </project>

创建测试文件 点击最上层工程目录创建input目录 word.txt

  1. hello spark
  2. hello scala
  3. hello hive
  4. hello hadoop

创建src/main/scala目录下WordCount

  1. package cn.bx.spark
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object WordCount {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
  6. val sc = new SparkContext(conf)
  7. val lines = sc.textFile("input")
  8. val words = lines.flatMap(_.split(" "))
  9. val wordToOne = words.map((_, 1))
  10. val wordToSum = wordToOne.reduceByKey(_ + _)
  11. wordToSum.collect().foreach(println)
  12. }
  13. }

执行结果

  1. (scala,1)
  2. (hive,1)
  3. (hello,4)
  4. (spark,1)
  5. (hadoop,1)

开发hadoop的worldcount

设置maven,修改pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.bx</groupId>
  7. <artifactId>SparkNote</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <scala.version>2.11.12</scala.version>
  11. <spark.version>2.2.3</spark.version>
  12. <hadoop.version>2.6.0</hadoop.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.scala-lang</groupId>
  17. <artifactId>scala-library</artifactId>
  18. <version>${scala.version}</version>
  19. </dependency>
  20. <!--spark core depedency-->
  21. <dependency>
  22. <groupId>org.apache.spark</groupId>
  23. <artifactId>spark-core_2.11</artifactId>
  24. <version>${spark.version}</version>
  25. </dependency>
  26. <!--hadoop-client depedency-->
  27. <!-- <dependency>-->
  28. <!-- <groupId>org.apache.hadoop</groupId>-->
  29. <!-- <artifactId>hadoop-client</artifactId>-->
  30. <!-- <version>${hadoop.version}</version>-->
  31. <!-- </dependency>-->
  32. </dependencies>
  33. <build>
  34. <sourceDirectory>src/main/scala</sourceDirectory>
  35. <plugins>
  36. <plugin>
  37. <groupId>org.scala-tools</groupId>
  38. <artifactId>maven-scala-plugin</artifactId>
  39. <version>2.15.0</version>
  40. <executions>
  41. <execution>
  42. <goals>
  43. <goal>compile</goal>
  44. </goals>
  45. <configuration>
  46. <args>
  47. <arg>-dependencyfile</arg>
  48. <arg>${project.build.directory}/.scala_dependencies</arg>
  49. </args>
  50. </configuration>
  51. </execution>
  52. </executions>
  53. </plugin>
  54. </plugins>
  55. </build>
  56. </project>

在main/scala/创建SparkWordCount.scala

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object SparkWordCount {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setAppName("SparkWordCount")
  6. val sc = new SparkContext(conf)
  7. val lines:RDD[String]= sc.textFile(args(0))
  8. val words:RDD[String]= lines.flatMap(_.split(" "))
  9. val tuples:RDD[(String,Int)] =words.map((_,1))
  10. val sum:RDD[(String,Int)] = tuples.reduceByKey(_+_)
  11. val sored:RDD[(String,Int)]= sum.sortBy(_._2,ascending = false)
  12. sored.saveAsTextFile(args(1))
  13. sc.stop()
  14. }
  15. }

提交jar

vim word.txt

hello world
hello world
hello spark
hello java
hello golang
hello hadoop

将当word.txt放入hdfs中

#hadoop fs -mkdir -p /wc/input
# hadoop fs -put word.txt /wc/input/
# hadoop fs -cat /wc/input/word.txt
hello world
hello world
hello spark
hello java
hello golang
hello hadoop

maven 编译成jar

# spark-submit --class SparkWordCount --master spark://aliyun:7077 SparkNote-1.0-SNAPSHOT.jar hdfs://localhost:9000/wc/input/word.txt hdfs://localhost:9000/wc/output

执行结果

hadoop fs -ls /wc/output/
19/08/07 01:02:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-08-07 01:01 /wc/output/_SUCCESS
-rw-r--r--   3 root supergroup         20 2019-08-07 01:01 /wc/output/part-00000
-rw-r--r--   3 root supergroup         41 2019-08-07 01:01 /wc/output/part-00001

查看结果

# hadoop fs -cat /wc/output/part-00000
(hello,6)
(world,2)
# hadoop fs -cat /wc/output/part-00001
(golang,1)
(java,1)
(spark,1)
(hadoop,1)