1 Spark Core内模块功能

API:JAVA、Python和R;

BroadCast:广播变量的实现;

Deploy:Spark部署与启动运行的实现;

Executor:Worker节点负责计算部分的实现;

Metrics:运行时状态监控的实现;

Network:集群通信的实现;

Partial:近似评估

Serializer:序列化模块;

Storage:存储模块;

UI:监控界面的代码逻辑实现。

2 Spark Core外模块功能

Spark SQL/Spark MLlib/Spark ML/Spark Streaming/Spark GraphX/Spark on YARN

弹性分布式数据集(RDD)不仅是一组不可变的JVM(JAVA虚拟机)对象的分布集,可以让你执行高速运算,而且是Apache Spark的核心。

RDD 并行操作,Spark的最大优势是:每个转换并行执行,从而大大提高速度。

RDD是一种无schema的数据结构,这与下一章节的DataFrame不同,我们可以混合使用任何类型的数据结构:tuple,dist,list等。

Lambda表达式的使用:定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间连续切换,这是一个开销大的操作,所以我们需要尽量使用Spark内置的功能函数。

3 RDD的特点

(1)不可变性:RDD是一种不可变的数据结构,一旦创建,它将不可以在原地修改,一旦修改RDD的操作都会返回一个新的RDD。

(2)分片:RDD表示的是一组数据的分区。这些分区分布在多个集群节点上,然而当Spark在单个节点上运行时,所有的分区数据都会在当前节点上。Spark存储RDD的分区和数据集物理分区之间关系的映射关系。RDD是各个分布式数据源之中数据的一个抽象,它通常表示分布在多个集群节点上的分区数据。例如HDFS将数据分片或分块存储在集群中,默认情况下,一个RDD分区对应一个HDFS文件分片。

(3)容错性

(4)接口:RDD是一个处理数据的接口。

(5)强类型:RDD类有一个参数用于表示类型,RDD可以表示不同类型的数据。RDD可以表示同一类型数据的分布式集合,包括Integer、Long、Float、Double和String等。

(6)驻留在内存中:对于一个缓存在内存中的RDD进行操作比操作没缓存的RDD要快很多。

3.1 创建RDD

PySpark中,创建RDD的方法有两种:

方法一:通过元组创建
parallelize很少用于生产上,一般用于学习Spark

  1. data1 = sc.parallelize(("a",2))
  2. type(data1)

输出结果:pyspark.rdd.RDD

  1. data2 = sc.parallelize([
  2. ('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),('Amber', 9)
  3. ])
  4. data2.collect()

输出结果:[(‘Amber’, 22), (‘Alfred’, 23), (‘Skye’, 4), (‘Albert’, 12), (‘Amber’, 9)]

  1. data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
  2. data_heterogenous

输出结果:[(‘Ferrari’, ‘fast’), {‘Porsche’: 100000}, [‘Spain’, ‘visited’, 4504]]

  1. data_heterogenous[1]['Porsche']

输出结果:100000

方法二:通过读取文件创建

(1)textFile方法

用于从文本文件创建RDD实例,可以从多种数据源读取数据,包括单个文件、同一目录下(本地或HDFS)的多个文件、或者其他Hadoop支持的存储系统文件,返回一个RDD,这个RDD代表的数据集每个元素都是一个字符串,每个字符串代表输入文件的一行数据。

textFile方法也可以读取压缩文件中的数据,参数中可以存在通配符,用于从一个目录中读取多个文件,例如:

  1. rdd = sc.textFile('hdfs://namenode:9000/path/to/directory/*.gz')

textFile方法第二个可选参数,它用于指定分区的个数,默认情况下,Spark为每一个文件分块创建一个分区,可以设置成一个更大的数字从而提高并行化程度,但是设置成一个小于文件分块数的数字是不可以的。

  1. rdd = sc.textFile('file:///root/ydzhao/PySpark/Chapter02/VS14MORT.txt.gz', 4)

(2)wholeTextFiles方法

读取目录下的所有文本文件,然后返回一个键值型(key-value)RDD。返回RDD中的每一个键值对对应一个文件,键为文件路径,值为文件的内容,可以从多种数据源读取数据,包括单个文件、同一目录下(本地或HDFS)的多个文件、或者其他Hadoop支持的存储系统文件。

例如:

  1. x = sc.wholeTextFiles("hdfs:///sh/signaling/2017/05/*")
  2. x.take(2)
  1. [('hdfs://meihui/user/ydzhao/2017/05/01/TRAFF_20170501233101.txt', '2996DCD4C52E6D,,,,'),
  2. ('hdfs://meihui/user/ydzhao/2017/05/01/TRAFF_20170501233102.txt',
  3. 'E4AE800797AD7E3,,,,')]
  1. # 读取本地文件
  2. data_from_file = sc.textFile('file:///root/ydzhao/PySpark/Chapter02/VS14MORT.txt.gz', 4)
  3. data_from_file.count()

输出结果:2631171

  1. # 读取HDFS文件
  2. data_from_hdfs = sc.textFile('/user/ydzhao/PySpark/Chapter02/jupyter_notebook.txt', 4)
  3. data_from_hdfs.count()

输出结果:253637566

一个综合案例:

  1. x=sc.textFile("/sh/signaling/2017/05/*/")
  2. from operator import add
  3. x.map(lambda line:(1,line)).\
  4. mapValues(lambda s:(s.split(','))[1][0:12]).\
  5. map(lambda ss:(ss[1],ss[0])).\
  6. reduceByKey(add). \
  7. sortByKey(true,31).\
  8. saveAsTextFile('/user/ydzhao/appresult/201705opt')

3.2 Transformations转换操作

(1)map:一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新的RDD实例。

  1. data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
  2. data_2014.take(10)

输出结果:[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

  1. data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
  2. data_2014_2.take(10)

输出结果:

  1. [('2014', 2014),
  2. ('2014', 2014),
  3. ('2014', 2014),
  4. ('2014', 2014),
  5. ('2014', 2014),
  6. ('2014', 2014),
  7. ('2014', 2014),
  8. ('2014', 2014),
  9. ('2014', 2014),
  10. ('-99', -99)]

(2)filter:高阶方法,它把一个布尔函数作为它的参数,并把这个函数作用原RDD的每个元素上,从而创建一个新的RDD实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的RDD实例,这个RDD实例代表的数据集由布尔函数返回true的元素构成。新的RDD实例代表的数据集是原RDD的子集。

  1. data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
  2. data_filtered.count()

输出结果:6

(3)flatMap:高阶方法,它把一个函数作为它的参数,这个函数处理原RDD中每个元素返回一个序列,扁平化这个序列的集合得到一个数据集,flatMap方法返回的RDD就代表这个数据集。

  1. data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
  2. data_2014_flat.take(10)

输出结果:[‘2014’, 2015, ‘2014’, 2015, ‘2014’, 2015, ‘2014’, 2015, ‘2014’, 2015]

(4)distinct

  1. distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
  2. distinct_gender
  3. ['-99', 'M', 'F']

(5)sample:返回原RDD数据集的一个抽样子集,它拥有三个参数。

第一个参数指定是有放回抽样还是无放回抽样;

第二个参数指定抽样比例;

第三个参数是可选的,指定抽样的随机种子数。

  1. fraction = 0.1
  2. data_sample = data_from_file_conv.sample(False, fraction, 666)
  3. data_sample.take(1)
  4. [array(['1', ' ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
  5. ' ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
  6. '215', '063', ' ', '21', '02', '11I350 ', '21I251 ', ' ',
  7. ' ', ' ', ' ', ' ', ' ', ' ',
  8. ' ', ' ', ' ', ' ', ' ', ' ',
  9. ' ', ' ', ' ', ' ', ' ', '02',
  10. 'I251 ', 'I350 ', ' ', ' ', ' ', ' ', ' ',
  11. ' ', ' ', ' ', ' ', ' ', ' ', ' ',
  12. ' ', ' ', ' ', ' ', ' ', ' ', '28', ' ',
  13. ' ', '2', '4', '100', '8'],
  14. dtype='
  15. print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

输出结果:Original dataset: 2631171, sample: 263247

(6)intersection

  1. rdd5 = rdd1.intersection(rdd2)
  2. rdd5.collect()

输出结果:[(‘a’, 1)]

(7)union

  1. rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
  2. rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
  3. rdd3 = rdd1.union(rdd2)
  4. rdd3.collect()

输出结果:[(‘a’, 1), (‘b’, 4), (‘c’,10),(‘a’, 4), (‘a’, 1), (‘b’, ‘6’), (‘d’, 15)]

(8)subtract:以一个RDD实例作为输入,返回一个新RDD实例,这个新的RDD实例代表的数据集由那些存在于原RDD实例中但不在输入RDD实例中的元素构成。

  1. rdd1 = sc.parallelize([(1,4,10)])
  2. rdd2 = sc.parallelize([(1,4,6)])
  3. rdd3 = rdd1.subtract(rdd2)
  4. rdd3.collect()

输出结果:[10]

(9)repartition:把一个整数作为参数,返回分区数等于这个参数的RDD实例,有助于提高Spark的并行能力,它会重新分布数据,它是一个耗时操作。(高开销)

  1. rdd1 = rdd1.repartition(4)
  2. len(rdd1.glom().collect())

输出结果:4

(10)coalesce:用于减少RDD的分区数量,它把分区数作为参数,返回分区数等于这个参数的RDD实例。但是,使用coalesce方法时要小心,因为减少了RDD的分区数意味着降低了Spark的并行能力,它通常用于合并小分区。例如,使用filter操作之后,RDD可能会有很多小分区,这种情况下,减少分区数提升性能。(高开销)
键值对型RDD的转换

(11)keys:返回只由原RDD中的键构成的RDD

  1. rdd = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
  2. rdd.keys

(12)values:返回只由原RDD中的值构成的RDD

  1. rdd = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
  2. rdd.values

(13)mapValues(注意没有mapKeys转换),与map方法不同的是,不同点在于它把作为参数的函数作用在原RDD的值上,原RDD的键都没有变,返回的RDD和原RDD都拥有相同的键。

  1. rdd = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
  2. rdd1 = rdd.mapValues(lambda v:v*2)
  3. rdd1.collect()

(14)leftOuterJoin(高开销)

  1. rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
  2. rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
  3. rdd3 = rdd1.leftOuterJoin(rdd2)
  4. rdd3.,collect()

输出结果:[(‘a’, (1, 4)), (‘a’, (1, 1)), (‘c’, (10, None)), (‘b’, (4, ‘6’))]

(15)subtractByKey:把一个键值对型RDD作为输入参数,返回一个键值对RDD,这个键值对RDD的键都是只存在于原RDD中但是不存在于输入RDD中。

(16)groupByKey:返回一个由二元组构成的RDD,二元组的第一个元素是原RDD的键,第二个元素是一个集合,集合是由键对应的所有值构成。与groupBy有别,不需要生成键的函数作为输入参数。

注意,应当尽量避免groupByKey转换操作,是一个耗时操作,可能对数据进行shuffle操作,大多数情况都有不适用groupByKey的替代方案。(高开销)

(17)groupBy:高阶方法,它将原RDD中的元素按照用户定义的标准分组从而组成一个RDD。它把一个函数作为它的参数,这个函数为原RDD中的每一个元素生成一个键。groupBy把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键值,第二个元素是对应这个键的所有原RDD元素的集合,其中,键和原RDD元素的对应关系由那个作为参数的函数决定。

注意,groupBy是一个耗时的转换操作,需要对数据进行shuffle操作。(高开销)

3.3 Action动作操作(结果返回给驱动程序)

(1)collect:返回一个数组,这个数组由原RDD中的元素构成。在使用这个方法的时候需要小心,因为它把worker节点的数据移给了驱动程序driver。如果操作一个有大数据集的RDD,它有可能导致驱动程序崩溃。(高开销)

(2)count:返回RDD中的元素的个数

  1. data_reduce.count()

(3)countByValue:返回原RDD中每个元素的个数,返回是一个map类实例,键为元素的值,值为该元素的个数。

(4)first:返回原RDD中的第一个元素。

(5)take:输入参数为一个整数N,返回一个由原RDD中前N个元素构成的RDD。

(7)top:返回一个由原RDD中前N小的元素构成的RDD。

(8)max/min

(9)reduce:对原RDD的元素做汇总操作,汇总的时候满足结合律和交换律的二元操作符。

  1. rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
  2. data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
  3. works = data_reduce.reduce(lambda x, y: x / y)

键值对型RDD的动作操作
(1)reduceByKey:它把满足结合律的二元操作符当作输入参数,它把这个操作符作用于相同键的值上。reduceByKey方法可以用于对同一键对应的值进行汇总操作(求和、求乘积、求max,min)。
基于键的汇总操作、合并操作,reduceByKey比groupByKey更加适合。

  1. data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
  2. data_key.reduceByKey(lambda x, y: x + y).collect()
  3. from operator import add
  4. data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
  5. data_key.reduceByKey(add).collect()

(2)countByKey:用于统计原RDD每个键的个数,它返回一个map类实例

  1. data_key.countByKey().items()

数值对型RDD的动作操作
如果RDD的元素类型为Integer、Long、Float和Double,则这样的RDD为数值型RDD,统计分析很有用。

  • mean
  • stdev
  • sum
  • variance

保存RDD
saveAsTextFile:将原RDD中的元素保存在指定目录中,这个目录位于任何hadoop支持的存储系统中。每一个RDD中的元素都用字符串表示并另存为文本中的一行。

  1. # 保存到本地文件系统
  2. data_key.saveAsTextFile('file:///root/***/export')
  1. # 保存到HDFS
  2. data_key.saveAsTextFile('hdfs://***/user/***/export')

注意:上述方法均把目录的名字作为输入参数,然后在这个目录为每个RDD分区创建一个文件。这种设计不仅高效而且可容错。因为每个分区被存成一个文件,所以Spark在保存RDD的时候可以启动多个任务,并行执行,将数据写入文件系统中。从而保证写入数据的过程可容错。一旦有一个将分区写入文件的任务失败了,Spark可以再启动一个任务,重写刚才失败任务创建的文件。

4 惰性操作

RDD的创建和转换是惰性操作,只有调用动作函数或者保存RDD时才会真正触发计算。惰性转换使得Spark可以高效执行RDD计算,直到Spark应用需要操作结果时才会进行计算,Spark可以利用这一点优化RDD的操作,使得操作流水线化,而且避免了在网络间不必要的数据传输。

5 缓存

除了把数据驻留在内存中,缓存对于RDD也十分重要。创建RDD有两种方式,从存储系统读取或者应用其他现存RDD的转换操作。默认情况下,当一个RDD的操作方法被调用时,Spark会根据它的父RDD来创建这个RDD,有可能导致父RDD的创建,如此往复,可能一直持续到Spark找到根RDD,而后Spark通过从存储系统读取数据的方式创建根RDD。操作方法被调用一次,上述过程就会执行一次。每次调用操作方法,Spark都会遍历这个调用者RDD的血统数,执行所有的转换操作来创建它。

如果一个RDD缓存了,Spark会执行到目前为止的所有转换操作并为这个RDD创建一个检查点。具体来说,这只会在第一次在一个缓存的RDD上调用某操作的时候发生。类似于转换方法,缓存方法也是惰性的。

如果一个应用缓存了RDD,Spark并不是立即执行计算并把它存储在内存中。Spark只有在第一次缓存的RDD上调用某操作时才会将RDD物化在内存中。而且第一次缓存操作并不会从中受益,后续的操作才会从缓存中受益。因为它们不需要再执行从存储系统中读取数据开始的一系列操作,所以运行将会快很多。当然如果只使用一次数据应用去使用缓存就没有意义了。所以缓存对于那些同样数据做多次迭代的应用才能从缓存中受益。

RDD的缓存方法

cache和persist

(1)cache :把RDD存储在集群中执行者的内存中,实际上是将RDD物化在内存中。

(2)persist:通用版的cache方法,把RDD存储在内存中或者硬盘上或者二者皆有。它的输入参数是存储等级,这是一个可选参数,如果调用persist方法而没有提供参数,那么它的行为类似于cache方法。

  1. rdd.persist()

可选参数:

  • MEMORY_ONLY
  • DISK_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER:内存消耗和CPU使用之间做的妥协。
  • MEMORY_AND_DISK_SER

需要注意的是,RDD缓存也是可容错的。

6 共享变量、广播变量和累加器

SparkContext类提供了一个叫做broadcast方法用于创建广播变量。

SparkContext类提供了一个叫做accumulator方法用于创建累加器变量。