08 内存计算框架Spark

一、Spark入门

  • Spark是开源类Hadoop MapReduce的通用并行框架。
  • Spark拥有HadoopMapReduce所具有的优点,但不同于MapReduce的是,Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此,Spark能更好地适用于数据挖掘与机器学习等需要迭代的场景

1. Spark概述

  • Spark是一种与Hadoop相似的开源集群计算环境,但是两者之间还是存在一些不同之处的,这些不同之处使Spark在某些工作负载方面表现得更加优越。

  • Spark启用了内存分布数据集,除了能够提供交互式查询外,还可以优化迭代工作负载。

  • Spark是在Scala语言中实现的,它将Scala用作其应用程序框架。

  • 与Hadoop不同,Spark和Scala能够紧密集成,其中的Scala可以像操作本地集合对象一样轻松地操作分布式数据集。

2. Spark伪分布式安装

  • 配置环境变量
    1. vim /etc/profile
    2. # 添加以下内容
    3. export SCALA_HOME=/usr/local/scala
    4. export PATH=$SCALA_HOME/bin
    5. # 通过以下命令使环境变量立即生效
    6. source /etc/profile
  • 配置spark
    1. # 切换到Spark配置文件目录,复制模板并修改其中的配置文件slaves、spark-env.sh、spark-defaults.conf
    2. cd /usr/local/spark/conf
    3. cp slaves.template slaves
    4. cp spark-env.sh.template spark-env.sh
    5. cp spark-defaults.conf.template spark-defaults.conf
    6. # 1
    7. vim slaves
    8. # 修改localhost为hadoop0
    9. # 2
    10. vim spark-env.sh
    11. # 追加以下内容
    12. export JAVA_HOME=/usr/local/jdk
    13. export SCALA_HOME=/usr/local/scala
    14. export HADO0P_HOME=/usr/local/hadoop
    15. export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
    16. export SPARK_MASTER_HOST=hadoop0
    17. export SPARK_PID_DIR=/usr/local/spark/data/pid
    18. export SPARK_LOCAL_DIRS=/usr/local/spark/data/spark_shuffle
    19. export SPARK_EXECUTOR_MEMORY=1G
    20. export SPARK_WORKER_MEMORY=4G
    21. # 3
    22. vim spark-defaults.conf
    23. spark.master spark://hadoop0:7077
    24. spark.eventLog.enabled true
    25. spark.eventLog.dir hdfs://hadoop0:9000/eventLog
    26. spark.serializer org.apache.spark.serializer.KryoSerializer
    27. spark.driver.memory 1g
    28. spark.jars.packages Azure:mmlspark:0.12


slaves

08 内存计算框架Spark - 图1

spark-env.sh

  1. ![](08/spark-env.png#alt=image-20210121000438530)
  2. spark-defaults
  3. ![](08/spark-defaults.png#alt=image-20210121000550032)

配置中要使用hdfs://hadoop0:9000/eventLog来存放日志,我们需要手动创建这个目录

  1. hadoop fs -mkdir /eventLog
  • 测试Spark(启动Spark之前要启动Hadoop)
    1. /usr/local/spark/sbin/start-all.sh


08 内存计算框架Spark - 图2
./examples/src/main目录下有一些Spark的示例程序,包括Scala、Java、Python、R等语言的版本
08 内存计算框架Spark - 图3
通过spark-shell命令启动Spark Shell
可以 通过访问http://hadoop0:4040http://hadoop0:8080两个URL来查看Spark的运行状态

/usr/local/spark/bin/run-example SparkPi

08 内存计算框架Spark - 图4

08 内存计算框架Spark - 图5

3. 由Java到Scala

Spark内存计算框架由Scala语言编写,其交互式环境也支持直接编写Scala表达式来进行数据操作,因此在介绍Spark之前,首先来了解Scala和Java的区别。

Scala是一门多范式的编程语言,一种类似Java的编程语言,设计初衷是实现可伸缩的语言,并集成面向对象编程和函数式编程的各种特性。

Scala具备如下的核心特征

  • Scala中的每个值都是一个对象,包括基本数据类型(即布尔值、数字等)在内,连函数也是对象。
  • 与只支持单继承的语言相比,Scala具有更广泛意义上的类重用。Scala允许定义新类的时候重用“一个类中新增的成员定义”。
  • Scala还包含若干函数式语言的关键概念,包括高阶函数、局部套用、嵌套函数、序列解读等。
  • Scala是静态类型的,这就允许它提供泛型类、内部类,甚至多态方法。

Scala可以与Java互操作。用scalac这个编译器把源文件编译成Java的class文件,可以从Scala中调用所有的Java类库,也同样可以从Java应用程序中调用Scala的代码。

Scala 数据类型

Scala 与 Java有着相同的数据类型,下表列出了 Scala 支持的数据类型:

数据类型 描述
Byte 8位有符号补码整数。数值区间为 -128 到 127
Short 16位有符号补码整数。数值区间为 -32768 到 32767
Int 32位有符号补码整数。数值区间为 -2147483648 到 2147483647
Long 64位有符号补码整数。数值区间为 -9223372036854775808 到 9223372036854775807
Float 32 位, IEEE 754 标准的单精度浮点数
Double 64 位 IEEE 754 标准的双精度浮点数
Char 16位无符号Unicode字符, 区间值为 U+0000 到 U+FFFF
String 字符序列
Boolean true或false
Unit 表示无值,和其他语言中void等同。用作不返回任何结果的方法的结果类型。Unit只有一个实例值,写成()。
Null null 或空引用
Nothing Nothing类型在Scala的类层级的最底端;它是任何其他类型的子类型。
Any Any是所有其他类的超类
AnyRef AnyRef类是Scala里所有引用类(reference class)的基类

Scala 基础字面量

Scala 非常简单且直观。接下来我们会详细介绍 Scala 字面量。

整型字面量

整型字面量用于 Int 类型,如果表示 Long,可以在数字后面添加 L 或者小写 l 作为后缀。:

  1. 0
  2. 035
  3. 21
  4. 0xFFFFFFFF
  5. 0777L

浮点型字面量

如果浮点数后面有f或者F后缀时,表示这是一个Float类型,否则就是一个Double类型的。实例如下:

  1. 0.0
  2. 1e30f
  3. 3.14159f
  4. 1.0e100
  5. .1

布尔型字面量

布尔型字面量有 true 和 false。

符号字面量

符号字面量被写成: ‘<标识符> ,这里 <标识符> 可以是任何字母或数字的标识(注意:不能以数字开头)。这种字面量被映射成预定义类scala.Symbol的实例。

如: 符号字面量 ‘x 是表达式 scala.Symbol(“x”) 的简写,符号字面量定义如下:

  1. package scala
  2. final case class Symbol private (name: String) {
  3. override def toString: String = "'" + name
  4. }

字符字面量

在 Scala 字符变量使用单引号 来定义,如下:

'a' 
'\u0041'
'\n'
'\t'

其中 *_ 表示转义字符,其后可以跟 _u0041 数字或者 \r\n 等固定的转义字符。

字符串字面量

在 Scala 字符串字面量使用双引号 来定义,如下:

"Hello,\nWorld!"
"菜鸟教程官网:www.runoob.com"

多行字符串的表示方法

多行字符串用三个双引号来表示分隔符,格式为:“”” … “””

实例如下:

val foo = """菜鸟教程
www.runoob.com
www.w3cschool.cc
www.runnoob.com
以上三个地址都能访问"""

Null 值

空值是 scala.Null 类型。

Scala.Null和scala.Nothing是用统一的方式处理Scala面向对象类型系统的某些”边界情况”的特殊类型。

Null类是null引用对象的类型,它是每个引用类(继承自AnyRef的类)的子类。Null不兼容值类型。

Scala 转义字符

下表列出了常见的转义字符:

转义字符 Unicode 描述
\b \u0008 退格(BS) ,将当前位置移到前一列
\t \u0009 水平制表(HT) (跳到下一个TAB位置)
\n \u000a 换行(LF) ,将当前位置移到下一行开头
\f \u000c 换页(FF),将当前位置移到下页开头
\r \u000d 回车(CR) ,将当前位置移到本行开头
\u0022 代表一个双引号(“)字符
\u0027 代表一个单引号(’)字符
\ \u005c 代表一个反斜线字符 ‘’
object Test {
   def main(args: Array[String]) {
      println("Hello\tWorld\n\n" );
   }
}
$ scalac Test.scala
$ scala Test
Hello    World

Scala 变量

变量是一种使用方便的占位符,用于引用计算机内存地址,变量创建后会占用一定的内存空间。

基于变量的数据类型,操作系统会进行内存分配并且决定什么将被储存在保留内存中。因此,通过给变量分配不同的数据类型,你可以在这些变量中存储整数,小数或者字母。

变量声明

在学习如何声明变量与常量之前,我们先来了解一些变量与常量。

  • 变量: 在程序运行过程中其值可能发生改变的量叫做变量。如:时间,年龄。
  • 常量 在程序运行过程中其值不会发生变化的量叫做常量。如:数值 3,字符’A’。

在 Scala 中,使用关键词 “var” 声明变量,使用关键词 “val” 声明常量。

声明变量实例如下:

var myVar : String = "Foo"
var myVar : String = "Too"

以上定义了变量 myVar,我们可以修改它。

声明常量实例如下:

val myVal : String = "Foo"

以上定义了常量 myVal,它是不能修改的。如果程序尝试修改常量 myVal 的值,程序将会在编译时报错。

变量类型声明

变量的类型在变量名之后等号之前声明。定义变量的类型的语法格式如下:

var VariableName : DataType [=  Initial Value]

// 或

val VariableName : DataType [=  Initial Value]

Scala 多个变量声明

Scala 支持多个变量的声明:

// xmax, ymax都声明为100
val xmax, ymax = 100 
//如果方法返回值是元组,我们可以使用 val 来声明一个元组:
scala> val pa = (40,"Foo")
pa: (Int, String) = (40,Foo)

Scala 闭包

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数。

如下面这段匿名的函数:

val multiplier = (i:Int) => i * 10

函数体内有一个变量 i,它作为函数的一个参数。如下面的另一段代码:

val multiplier = (i:Int) => i * factor

在 multiplier 中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier 函数被调用时,i 被赋予一个新的值。然而,factor不是形式参数,而是自由变量,考虑下面代码:

var factor = 3  
val multiplier = (i:Int) => i * factor

这里我们引入一个自由变量 factor,这个变量定义在函数外面。

这样定义的函数变量 multiplier 成为一个”闭包”,因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数。

object Test { 
  def main(args: Array[String]) { 
     println( "muliplier(1) value = " +  multiplier(1) ) 
     println( "muliplier(2) value = " +  multiplier(2) ) 
  } 
  var factor = 3 
  val multiplier = (i:Int) => i * factor  
}
$ scalac Test.scala  
$ scala Test  
muliplier(1) value = 3  
muliplier(2) value = 6

4. Spark架构

Spark遵循主从架构。它的集群由一个主服务器和多个从服务器组成。

Spark架构依赖于两个抽象:

  • 弹性分布式数据集(RDD)
  • 有向无环图(DAG)

RDD

Spark的主要抽象是分布式的元素集合,称为弹性分布式数据集(ResilientDistributedDataset,RDD),它可被分发到集群的各个节点上进行并行操作。

RDD可以通过Hadoop InputFormats创建(如HDFS),或者从其他RDD转化而来。

从Spark的./README文件新建一个RDD:

scala> var textFile = sc.textFile("file:///usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> textFile.count()
res10: Long = 103
  • file://前缀指定读取本地文件
  • Spark Shell默认是读取HDFS中的文件,需要先上传文件到HDFS中,否则会有org.apache.hadoop.mapred.InvalidInputException:Input path does notexist的错误提示

RDD是一种特殊集合,支持多种来源,有容错机制,可以被缓存,支持并行操作,一个RDD代表一个分区里的数据集。

RDD有两种操作算子:

  • Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作。Transformations操作是Lazy懒加载的,不是立刻执行,只会记录需要这样的操作,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
  • Ation(执行):触发Spark作业的运行,真正触发转换算子的计算。Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
# filter提供了一个元素过滤器,只有符合过滤条件的元素才会被包含到新集合中
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
# 文件中含有Spark的内容行数
linesWithSpark.count()
# 找到包含单词最多的那一行内容共有多少个单词
textFile.map(line => line.split(" ").size).reduce((a,b) => if(a>b) a else b)
# 统计包含单词最多的那一行内容共有多少个单词,使用Math.max()函数(需要导入Java的Math库)
textFile.map(line => line.split(" ").size).reduce((a,b) => Math.max(a,b))
# 数据流模式单词数统计
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) =>a + b)

wordCounts.collect

常用的Spark函数

转换 作用
reduce(func) 通过函数func聚集数据集中的所有元素。func函数接收两个参数,返回一个值。这个函数必须是关联性的,确保可以被正确地并发执行
collect() 在 Driver的程序中,以数组的形式返回数据集的所有元素。这通常会在使用filter或者其他操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让 Driver程序OOM
count() 返回数据集的元素个数
take(n) 返回一个数组,由数据集的前n个元素组成。注意:这个操作目前并非在多个节点上并行执行,而是 Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
first() 返回数据集的第一个元素,类似于take(1)
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到本地文件系统、HDFS或者任何其他Hadoop支持的文件系统。Spark将调用每个元素的toString方法,并将它转换为文件中的一行文本
saveAsSequenceFile(path) 将数据集的元素以sequencefile的格式保存到指定的目录、本地系统、HDFS或者任何其他Hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable ( Spark包括基本类型的转换,例如 Int、Double、String 等)
foreach(func) 在数据集的每一个元素上运行函数func。这通常用于更新一个累加器变量或者与外部存储系统进行交互
  • 一个基本的RDD transformations示例

RDD包含{1, 2, 3, 3},transformations操作示例

函数名 功能 例子 结果
map() 对每个元素应用函数 rdd.map(x => x+1) {2,3,4,4}
flatMap() 压扁,常用来抽取单词 rdd.flatMap(x => x.to(3)) {1,2,3,2,3,3,3}
filter() 过滤 rdd.filter(x => x!=1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}
  • RDD transformations也支持两个RDD的集合操作。一个RDD包含{1, 2, 3},另一个RDD包含{3, 4, 5} | 函数名 | 功能 | 例子 | 结果 | | :—- | :—- | —- | :—- | | union() | 并集 | rdd.union(other) | {1,2,3,4,5} | | intersection() | 交集 | rdd.intersection(other) | {3} | | subtract() | 取存在第一个RDD而不存在第二个RDD的元素(使用场景,机器学习中,移除训练集) | rdd.subtract(other) | {1,2} | | cartesian() | 笛卡尔积 | rdd.cartesian(other) | {(1,3),(1,4),…(3,5)} |
  • RDD action操作,RDD包含{1, 2, 3, 3} | 函数名 | 功能 | 例子 | 结果 | | —- | —- | —- | —- | | collect() | 返回RDD的所有元素 | rdd.collect() | {1,2,3,3} | | count() | 计数 | rdd.count() | 4 | | countByValue() | 返回一个map,表示唯一元素出现的个数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} | | take(num) | 返回几个元素 | rdd.take(2) | {1,2} | | top(num) | 返回前几个元素 | rdd.top(2) | {3,3} | | reduce(func) | 合并RDD中元素 | rdd.reduce((x,y) => x+y) | 9 | | foreach(func) | 对RDD的每个元素作用函数,什么也不返回 | rdd.foreach(func) | 什么也没有 |

5.Spark入门示例

  • Scala版 ```scala package cn.waqwb

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf /**

  • Spark入门示例Scala版 */ object SimpleApp { def main(args: Array[String]) { //val logFile = “hdfs://hadoop0:9000/README.md”; val logFile = “file:///usr/local/spark/README.md”; val conf = new SparkConf().setAppName(“Simple Application”).setMaster(“local”) val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains(“a”)).count() val numBs = logData.filter(line => line.contains(“b”)).count() println(“含有字母a: %s行, 含有字母b: %s行”.format(numAs, numBs)) } } ```
  • 需要导入/usr/local/spark/jars目录下所有的jar包
  • 更改scala编译版本

08 内存计算框架Spark - 图6

  • Java版 ```java package cn.waqwb;

import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function;

public class SimpleApp { public static void main(String[] args) { String logFile = “hdfs://hadoop0:9000/README.md”; // String logFile = “file:///usr/local/spark/README.md”; SparkConf conf = new SparkConf().setAppName(“Simple Application”).setMaster(“local”); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function() { public Boolean call(String s) { return s.contains(“a”); } }).count(); long numBs = logData.filter(new Function() { public Boolean call(String s) { return s.contains(“b”); } }).count(); System.out.println(“字母a共有 “ + numAs + “行,字母b共有” + numBs+”行”); } }




---

<a name="e0e4a5f8"></a>
## 二、Spark Streaming

Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统。

<a name="61f6ba4c"></a>
### 1. Spark Streaming概述

Spark的各个子框架都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。

**SparkStream术语定义如下**

(1)离散流(discretized stream)或DStream:这是Spark Streaming对内部持续的实时数据流的抽象描述。

(2)批数据(batch data):将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。

(3)时间片或批处理时间间隔(batch interval):以时间片作为拆分流数据的依据。一个时间片的数据对应一个RDD实例。

(4)窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。

(5)滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。

(6)Input DStream:一个Input DStream是一个特殊的DStream,将SparkStreaming连接到一个外部数据源来读取数据。

<a name="d495d2a1"></a>
### 2. Spark Streaming示例

作为构建于Spark之上的应用框架,Spark Streaming承袭了Spark的编程风格,对于已经了解Spark的用户来说能够快速地上手。接下来以Spark Streaming官方提供的WordCount代码为例,介绍Spark Streaming的使用方式。

- 
新建一个网络模拟器类`NetSimulation.scala`,其作用是读取`/usr/local/spark/README.md`文件,接受客户端连接,每秒从文件中随机选取一行内容,发往客户端。再由客户端对接收到的内容作单词计数的统计。
```scala
package cn.waqwb
/**
 * 网络模拟器类
 * 读取/usr/local/spark/README.md文件,接受客户端连接,
 * 每秒从文件中随机选取一行内容,发往客户端。
 * 再由客户端对接收到的内容作单词计数的统计
 */
import java.io.{ PrintWriter }
import java.net.ServerSocket
import scala.io.Source

object NetSimulation {
  //产生随机数
  def randomIndex(length: Int) = {
    import java.util.Random
    val rdm = new Random
    rdm.nextInt(length)
  }

  def main(args: Array[String]): Unit = {
    //读取文件
    val filename = "/usr/local/spark/README.md"
    val lines = Source.fromFile(filename).getLines.toList
    val filerow = lines.length
      //监听端口9999
    val serverSocket = new ServerSocket(9999)
    while (true) {
      //对每个客户端产生一个新的线程
      val socket = serverSocket.accept()
      new Thread() {
        override def run = {
          println("获取到的客户段的连接:" + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            //每隔1s,发送文件中的随机一行内容
            Thread.sleep(1000)
            val content = lines(randomIndex(filerow))
            println(content)
            out.write(content + "\n")
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}
  • 新建客户端程序NetworkWordCount.scala,其作用是建立StreamingContext,每秒钟从主机hadoop0的9999端口读取一次网络数据,然后对接收到的数据,统计每个单词的出现次数。 ```scala package cn.waqwb

import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.storage.StorageLevel

object NetworkWordCount { def main(args: Array[String]) { //创建一个具有两个工作线程和1秒批处理间隔的本地StreamingContext // 主机需要2个内核 val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”) val ssc = new StreamingContext(conf, Seconds(1)) // 创建将连接到的数据流主机名:端口, localhost:9999 val lines = ssc.socketTextStream(“hadoop0”, 9999) // 切割每一行分成几个字 val words = lines.flatMap(.split(“ “)) import org.apache.spark.streaming.StreamingContext. // 统计每批中的每一个单词 val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + ) // 将此数据流中生成的每个RDD的前10个元素打印到控制台 wordCounts.print() //开始计算 ssc.start() //等待计算终止 ssc.awaitTermination() } } ```


程序的执行流程解析如下

  1. 创建StreamingContext对象
    使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master、设定名称。需要注意的是参数Seconds(1),Spark Streaming需要指定处理数据的时间间隔,如上例所示的1秒,那么Spark Streaming会以1秒为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置。

  2. 创建InputDStream

  1. 操作DStream
    对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用Map和ReduceByKey方法进行计算,最后使用print()方法输出结果。

  2. 启动Spark Streaming

/usr/local/spark/jars