花了几天时间,基于python入门了一下Spark,了解了Spark Core,Spark SQL,Spark Streaming等相关包的基础操作,此文作为读书笔记供自己参考。

PySpark

https://github.com/apache/spark

1、SparkCore

1.1、什么是RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,partitioned collection of elements that can be operated on in parallel.

弹性分布式数据集,spark的最基本抽象。代表的是一个不可变的、可分区的集合元素能进行并行计算

  1. abstract class RDD[T: ClassTag](
  2. @transient private var _sc: SparkContext,
  3. @transient private var deps: Seq[Dependency[_]]
  4. ) extends Serializable with Logging {
  • 1)RDD是一个抽象类
  • 2)带泛型的,可支持多种数据类型:String、object

1.2、RDD 特性

Internally, each RDD is characterized by five main properties:

  • A list of partitions
    • 一系列的分区/分片
  • A function for computing each split/partitions
    • 一个作用于RDD的函数本质上会作用于RDD内部的每一个分区 rdd.map(_+1)
  • A list of dependencies on other RDDs
    • 会依赖一些列的其他的RDD rdd1 ==> rdd2 ==> rdd3 ==> rdd4
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • 指定 key-value RDD 的 分区方法(Partitioner:分区器)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    • 数据在哪优先把作业调度到数据所在的节点进行计算:移动数据不如移动计算(减少IO)

1.3、RDD 特性在源码中的体现

  1. // 特性二
  2. def compute(split: Partition, context: TaskContext): Iterator[T]
  3. // 特性一
  4. protected def getPartitions: Array[Partition]
  5. // 特性三
  6. protected def getDependencies: Seq[Dependency[_]] = deps
  7. // 特性五
  8. protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  9. // 特性四
  10. val partitioner: Option[Partitioner] = None

1.4、图解RDD

image.png

1.5、SparkContext & SparkConf

创建 SparkContext

  • 连接到spark“集群”:local/standalone/yarn/mesos
    • 指定master 和 appName
  • 通过SparkContext创建RDD和广播变量到集群

在创建 SparkContext 之前,需要创建一个 SparkConf

1.6、本地开发环境

本地开发不需要集群,安装完成之后直接开发

1、安装

  1. pip install pyspark

2、编码

  1. from pyspark import SparkConf
  2. from pyspark import SparkContext
  3. conf = SparkConf().setAppName("spark_01").setMaster("local[2]")
  4. sc = SparkContext(conf=conf)
  5. rdd = sc.parallelize(range(1, 6))
  6. print(rdd.collect())
  7. sc.stop()

3、提交

  1. ./bin/spark-submit --master "local[1]" --name "pysubmit" /Users/calvin/pyspark_project/spark_01.py

打印输出如下:

  1. 2019-03-27 19:04:29 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  2. 2019-03-27 19:04:30 INFO SparkContext:54 - Running Spark version 2.4.0
  3. 2019-03-27 19:04:30 INFO SparkContext:54 - Submitted application: spark_01
  4. 2019-03-27 19:04:30 INFO SecurityManager:54 - Changing view acls to: calvin
  5. 2019-03-27 19:04:30 INFO SecurityManager:54 - Changing modify acls to: calvin
  6. 2019-03-27 19:04:30 INFO SecurityManager:54 - Changing view acls groups to:
  7. 2019-03-27 19:04:30 INFO SecurityManager:54 - Changing modify acls groups to:
  8. 2019-03-27 19:04:30 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(calvin); groups with view permissions: Set(); users with modify permissions: Set(calvin); groups with modify permissions: Set()
  9. 2019-03-27 19:04:30 INFO Utils:54 - Successfully started service 'sparkDriver' on port 56645.
  10. 2019-03-27 19:04:30 INFO SparkEnv:54 - Registering MapOutputTracker
  11. 2019-03-27 19:04:30 INFO SparkEnv:54 - Registering BlockManagerMaster
  12. 2019-03-27 19:04:30 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
  13. 2019-03-27 19:04:30 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
  14. 2019-03-27 19:04:30 INFO DiskBlockManager:54 - Created local directory at /private/var/folders/4l/zdr5yt953pl36zwtl7dk9kfr0000gn/T/blockmgr-3f8bcb35-ea31-40e2-86fe-61f894c14717
  15. 2019-03-27 19:04:30 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB
  16. 2019-03-27 19:04:30 INFO SparkEnv:54 - Registering OutputCommitCoordinator
  17. 2019-03-27 19:04:30 INFO log:192 - Logging initialized @2403ms
  18. 2019-03-27 19:04:30 INFO Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
  19. 2019-03-27 19:04:30 INFO Server:419 - Started @2493ms
  20. 2019-03-27 19:04:30 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
  21. 2019-03-27 19:04:30 WARN Utils:66 - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
  22. 2019-03-27 19:04:30 INFO AbstractConnector:278 - Started ServerConnector@663f2e4{HTTP/1.1,[http/1.1]}{0.0.0.0:4042}
  23. 2019-03-27 19:04:30 INFO Utils:54 - Successfully started service 'SparkUI' on port 4042.
  24. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@e3fbafe{/jobs,null,AVAILABLE,@Spark}
  25. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6c2383f3{/jobs/json,null,AVAILABLE,@Spark}
  26. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@8c7afcf{/jobs/job,null,AVAILABLE,@Spark}
  27. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@32ac61e7{/jobs/job/json,null,AVAILABLE,@Spark}
  28. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@42e0405a{/stages,null,AVAILABLE,@Spark}
  29. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@776c261e{/stages/json,null,AVAILABLE,@Spark}
  30. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3a9ad1e6{/stages/stage,null,AVAILABLE,@Spark}
  31. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3d80dc1d{/stages/stage/json,null,AVAILABLE,@Spark}
  32. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@51792faa{/stages/pool,null,AVAILABLE,@Spark}
  33. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7b5b49fa{/stages/pool/json,null,AVAILABLE,@Spark}
  34. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@275320e{/storage,null,AVAILABLE,@Spark}
  35. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@392e7669{/storage/json,null,AVAILABLE,@Spark}
  36. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@21321a99{/storage/rdd,null,AVAILABLE,@Spark}
  37. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@783dc{/storage/rdd/json,null,AVAILABLE,@Spark}
  38. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2bc93490{/environment,null,AVAILABLE,@Spark}
  39. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@619e82be{/environment/json,null,AVAILABLE,@Spark}
  40. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5049bac6{/executors,null,AVAILABLE,@Spark}
  41. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6cecca70{/executors/json,null,AVAILABLE,@Spark}
  42. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@e630ec9{/executors/threadDump,null,AVAILABLE,@Spark}
  43. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1b7df6c9{/executors/threadDump/json,null,AVAILABLE,@Spark}
  44. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6fff0ea{/static,null,AVAILABLE,@Spark}
  45. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1d5b2088{/,null,AVAILABLE,@Spark}
  46. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1939ec14{/api,null,AVAILABLE,@Spark}
  47. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@332ff00f{/jobs/job/kill,null,AVAILABLE,@Spark}
  48. 2019-03-27 19:04:30 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62bba62a{/stages/stage/kill,null,AVAILABLE,@Spark}
  49. 2019-03-27 19:04:30 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://192.168.1.100:4042
  50. 2019-03-27 19:04:30 INFO Executor:54 - Starting executor ID driver on host localhost
  51. 2019-03-27 19:04:31 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56646.
  52. 2019-03-27 19:04:31 INFO NettyBlockTransferService:54 - Server created on 192.168.1.100:56646
  53. 2019-03-27 19:04:31 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
  54. 2019-03-27 19:04:31 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 192.168.1.100, 56646, None)
  55. 2019-03-27 19:04:31 INFO BlockManagerMasterEndpoint:54 - Registering block manager 192.168.1.100:56646 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.100, 56646, None)
  56. 2019-03-27 19:04:31 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 192.168.1.100, 56646, None)
  57. 2019-03-27 19:04:31 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 192.168.1.100, 56646, None)
  58. 2019-03-27 19:04:31 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1c98b669{/metrics/json,null,AVAILABLE,@Spark}
  59. 2019-03-27 19:04:31 INFO SparkContext:54 - Starting job: collect at /Users/calvin/pyspark_project/spark_01.py:12
  60. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Got job 0 (collect at /Users/calvin/pyspark_project/spark_01.py:12) with 2 output partitions
  61. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Final stage: ResultStage 0 (collect at /Users/calvin/pyspark_project/spark_01.py:12)
  62. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Parents of final stage: List()
  63. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Missing parents: List()
  64. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Submitting ResultStage 0 (PythonRDD[1] at collect at /Users/calvin/pyspark_project/spark_01.py:12), which has no missing parents
  65. 2019-03-27 19:04:31 INFO MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 4.3 KB, free 366.3 MB)
  66. 2019-03-27 19:04:31 INFO MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.9 KB, free 366.3 MB)
  67. 2019-03-27 19:04:31 INFO BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 192.168.1.100:56646 (size: 2.9 KB, free: 366.3 MB)
  68. 2019-03-27 19:04:31 INFO SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1161
  69. 2019-03-27 19:04:31 INFO DAGScheduler:54 - Submitting 2 missing tasks from ResultStage 0 (PythonRDD[1] at collect at /Users/calvin/pyspark_project/spark_01.py:12) (first 15 tasks are for partitions Vector(0, 1))
  70. 2019-03-27 19:04:31 INFO TaskSchedulerImpl:54 - Adding task set 0.0 with 2 tasks
  71. 2019-03-27 19:04:31 INFO TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7852 bytes)
  72. 2019-03-27 19:04:31 INFO TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7852 bytes)
  73. 2019-03-27 19:04:31 INFO Executor:54 - Running task 1.0 in stage 0.0 (TID 1)
  74. 2019-03-27 19:04:31 INFO Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
  75. 2019-03-27 19:04:32 INFO PythonRunner:54 - Times: total = 390, boot = 383, init = 7, finish = 0
  76. 2019-03-27 19:04:32 INFO PythonRunner:54 - Times: total = 390, boot = 378, init = 12, finish = 0
  77. 2019-03-27 19:04:32 INFO Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 1437 bytes result sent to driver
  78. 2019-03-27 19:04:32 INFO Executor:54 - Finished task 1.0 in stage 0.0 (TID 1). 1440 bytes result sent to driver
  79. 2019-03-27 19:04:32 INFO TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 489 ms on localhost (executor driver) (1/2)
  80. 2019-03-27 19:04:32 INFO TaskSetManager:54 - Finished task 1.0 in stage 0.0 (TID 1) in 470 ms on localhost (executor driver) (2/2)
  81. 2019-03-27 19:04:32 INFO TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
  82. 2019-03-27 19:04:32 INFO PythonAccumulatorV2:54 - Connected to AccumulatorServer at host: 127.0.0.1 port: 56647
  83. 2019-03-27 19:04:32 INFO DAGScheduler:54 - ResultStage 0 (collect at /Users/calvin/pyspark_project/spark_01.py:12) finished in 0.702 s
  84. 2019-03-27 19:04:32 INFO DAGScheduler:54 - Job 0 finished: collect at /Users/calvin/pyspark_project/spark_01.py:12, took 0.763967 s
  85. [1, 2, 3, 4, 5]
  86. 2019-03-27 19:04:32 INFO AbstractConnector:318 - Stopped Spark@663f2e4{HTTP/1.1,[http/1.1]}{0.0.0.0:4042}
  87. 2019-03-27 19:04:32 INFO SparkUI:54 - Stopped Spark web UI at http://192.168.1.100:4042
  88. 2019-03-27 19:04:32 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
  89. 2019-03-27 19:04:32 INFO MemoryStore:54 - MemoryStore cleared
  90. 2019-03-27 19:04:32 INFO BlockManager:54 - BlockManager stopped
  91. 2019-03-27 19:04:32 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
  92. 2019-03-27 19:04:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
  93. 2019-03-27 19:04:32 INFO SparkContext:54 - Successfully stopped SparkContext
  94. 2019-03-27 19:04:33 INFO ShutdownHookManager:54 - Shutdown hook called
  95. 2019-03-27 19:04:33 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/4l/zdr5yt953pl36zwtl7dk9kfr0000gn/T/spark-ff2cab87-726c-494d-adb6-8fedde89aaef
  96. 2019-03-27 19:04:33 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/4l/zdr5yt953pl36zwtl7dk9kfr0000gn/T/spark-a4cd4f93-ffbb-48cd-8356-f3dbd62ef1d4/pyspark-3ebf0ccb-c270-4348-a94b-1c78773dd4ca
  97. 2019-03-27 19:04:33 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/4l/zdr5yt953pl36zwtl7dk9kfr0000gn/T/spark-a4cd4f93-ffbb-48cd-8356-f3dbd62ef1d4

2、Spark Core 核心RDD常用算子编程详解

2.1、RDD 常用操作

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

  • 转换:从已经存在的RDD中生成新的RDD
  • 动作:对RDD进行计算返回结果
  • map 是一个转换
  • collect 是一个动作
  • 所有的transformations都是lazy的,遇到action之后才会进行计算并返回结果
  • action才会触发计算

2.2、Transformation算子

  • map 和 filter
  1. def my_filter():
  2. data = [1, 2, 3, 4, 5]
  3. rdd1 = sc.parallelize(data)
  4. map_rdd = rdd1.map(lambda x: x * 2)
  5. filter_rdd = map_rdd.filter(lambda x: x > 5)
  6. print(filter_rdd.collect())
  7. print(sc.parallelize(data).map(lambda x: x * 2).filter(lambda x: x > 5).collect())
  • flap_map
  1. def my_flat_map():
  2. data = ["hello spark", "hello world", "hello world"]
  3. rdd = sc.parallelize(data)
  4. print(rdd.flatMap(lambda line: line.split(" ")).collect())
  5. print(rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).collect())
  • group by key
  1. def my_group_by():
  2. data = ["hello spark", "hello world", "hello world"]
  3. rdd = sc.parallelize(data)
  4. group_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).groupByKey()
  5. print(group_rdd.map(lambda x: {x[0]: sum(x[1])}).collect())

使用groupbykey方式的world count

  1. def word_count():
  2. rdd = sc.textFile("readme.md")
  3. flat_map_rdd = rdd.flatMap(lambda line: line.split(" "))
  4. res_rdd = flat_map_rdd.map(lambda x: (x, 1)).groupByKey().map(lambda x: {x[0]: sum(x[1])})
  5. print(res_rdd.collect())
  • my reduce by key
  1. def my_reduce_by():
  2. data = ["hello spark", "hello world", "hello world"]
  3. rdd = sc.parallelize(data)
  4. map_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
  5. reduce_rdd = map_rdd.reduceByKey(lambda x, y: x + y)
  6. # sort by key,sort by value 可以使用两次map,将value转换到key上去
  7. reduce_rdd = reduce_rdd.map(lambda x: (x[1], x[0])).sortByKey(ascending=False).map(lambda x: (x[1], x[0]))
  8. print(reduce_rdd.collect())
  • my union
  1. def my_union():
  2. a = sc.parallelize([[1, 2, 3], [3, 4, 5]])
  3. b = sc.parallelize([[3, 4, 5], [1, 2, 3]])
  4. union_rdd = a.union(b).flatMap(lambda x: list(x))
  5. print(union_rdd.collect())
  • distinct
  1. def my_distinct():
  2. a = sc.parallelize([[1, 2, 3], [3, 4, 5]])
  3. b = sc.parallelize([[3, 4, 5], [1, 2, 3]])
  4. union_rdd = a.union(b).flatMap(lambda x: list(x))
  5. distinct_rdd = union_rdd.distinct()
  6. print(distinct_rdd.map(lambda x: (x, 1)).sortByKey().map(lambda x: x[0]).collect())
  • join
  1. def my_join():
  2. x = sc.parallelize([("a", 1), ("b", 4)])
  3. y = sc.parallelize([("a", 2), ("a", 3)])
  4. print(x.join(y).collect())
  5. print(x.leftOuterJoin(y).collect())
  6. print(x.fullOuterJoin(y).collect())

2.3、Action算子

  1. def my_action():
  2. data = list(range(1, 11))
  3. rdd = sc.parallelize(data)
  4. print(rdd.collect())
  5. print(rdd.max(), rdd.min())
  6. print(rdd.take(3))
  7. print(rdd.reduce(lambda x, y: x + y))
  8. rdd.foreach(lambda x: print(x))

2.4、Spark RDD 实战

  • 词频统计
  • topN
  • 平均数

3、Spark运行模式

3.1、local

pyspark最简单,使用 pip install pyspark 就能在本地进行测试开发

3.2、standalone

http://spark.apache.org/docs/latest/spark-standalone.html

3.3、yarn

http://spark.apache.org/docs/latest/running-on-yarn.html

4、Spark Core 进阶(面试关键)

4.1、Spark核心概念

http://spark.apache.org/docs/latest/cluster-overview.html

image.png

4.2、Spark运行架构及注意事项(非常重要)

image.png

4.3、Spark和Hadoop重要概念区分

Hadoop

  1. 一个MR程序 = 一个Job
  2. 一个Job = 1 到 N 个Task(Map/Reduce)
  3. 一个Task对应一个进程
  4. Task运行时开启进程,Task执行完毕时销毁进程,对多个Task来说,开销比较大的(即使你能通过JVM共享)

Spark

  1. Application = Driver(main方法,创建SparkContext) + Executors
  2. 一个Application = 0到N 个Job
  3. 一个Job = 一个Action
  4. 一个Job = 1到N个Stage
  5. 一个Stage = 1到N个Task
  6. 一个Task对应一个线程,多个Task可以以并行的方式在一个Executor中

4.4、Spark Cache

http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

  • cache和transformations一样是lazy 没有遇到action是不会提交作业到spark上运行
  • cache本质上是执行persis(),指定storageLevel=StorageLevel.Memory_Only
  • 如果一个rdd在后续的操作中会被用到,建议使用Cache
  • 不缓存的场景,有多少次action就会读取多少次disk上的数据,缓存能减少io
  • 使用LRU算法进行缓存删除

4.4、Spark Lineage

transformations产生的RDD里的partition和之前的RDD里的partition能进行一一对应,如果其中一个partition发生了数据丢失能很方便从源partition进行计算恢复数据

4.4、Spark Dependency

  • 窄依赖:父rdd的分区至多被子rdd用一次,map,filter,union,join with inputs co-partition
  • 宽依赖:一个父RDD的的数据被多个子RDD使用,groupBuKey on non-partitioned data,join with inputs not co-partition

shuffle操作是一个非常昂贵的操作

  • 所有的宽依赖对应的 ByKey和join操作都会产生shuffle

image.png

5、Spark优化

http://spark.apache.org/docs/latest/tuning.html

5.1、序列化

http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage

5.2、内存管理

  • 执行内存:shuffle, joins, sorts, aggregations
  • 存储内存:persist, cache

两者公用一个内存范围,执行内存不够的时候回进行内存剔除,但是存储内存可以设置一个最小阈值

需要掌握的:

  1. 内存管理的百分比
  2. 如何知道一个对象占用多少内存?
    1. 1)创建一个RDD放到缓存里去,webUI里可以看
    2. 2)使用SizeEstimator’s estimate方法
  3. 太大的RDD存储的时候可使用序列化,时间换空间

http://spark.apache.org/docs/latest/tuning.html#memory-management-overview
http://spark.apache.org/docs/latest/configuration.html#memory-management

5.3、广播变量

广播变量会将变量直接缓存在每一个机器上,而不用每次分发task的时候都copy一个副本

http://spark.apache.org/docs/latest/tuning.html#broadcasting-large-variables

对大的RDD进行广播,tasks larger than about 20 KB are probably worth optimizing.

5.4、数据本地性

http://spark.apache.org/docs/latest/tuning.html#data-locality

数据节点和计算节点不在一起的时候,需要移动计算而不是移动数据

数据和代码距离从最近到最远:

  • 在同一个JVM,最好的
  • 在同一个节点,数据需要在两个进程之间传输
  • 数据能快速访问到,没有本地性可言
  • 数据在相同的机架
  • 网络上的其他数据

http://spark.apache.org/docs/latest/configuration.html#scheduling
默认的本地化spark.locality.wait = 3s,如果集群的本地化较差,可以适当调大参数

6、Spark SQL

6.1、Spark SQL 架构

image.png

  • 前端:负责数据输入
  • Catalyst:接收前端的输入,对执行计划进行解析和优化。几乎所有的Spark优化都是在Catalyst中进行
  • 后端:对物理计划进行优化,生成Spark作业

6.2、DataFrame

因为已经会了pandas DataFrame,两年前自己也使用Spark SQL 做过报表,所以这一章节学起来非常的快

主要看
http://spark.apache.org/docs/latest/sql-getting-started.html#running-sql-queries-programmatically

练习源码

  1. from pyspark import Row
  2. from pyspark.sql import SparkSession
  3. import pandas as pd
  4. # basic
  5. from pyspark.sql.types import StructField, StringType, IntegerType, StructType

从json中生成df

  1. def json_df(_spark):
  2. # from json
  3. df = _spark.read.json("resource/people.json")
  4. df.show()

从pandas DataFrame 中生成df

  1. def pdf_to_df(_spark):
  2. data_list = [{"name": "Michael"}, {"name": "Andy", "age": 30}, {"name": "Justin", "age": 19}]
  3. df = _spark.createDataFrame(pd.DataFrame(data_list))
  4. df.show()

rdd 和 df 互相转换

  1. def rdd_to_df(_spark):
  2. sc = _spark.sparkContext
  3. lines = sc.textFile("resource/people.txt")
  4. parts = lines.map(lambda x: x.split(","))
  5. parts_row = parts.map(lambda x: Row(name=x[0], age=int(x[1])))
  6. df = _spark.createDataFrame(parts_row)
  7. df.createOrReplaceTempView("schema_people")
  8. _spark.sql("select * from schema_people").show()
  9. df.printSchema()
  10. df_teenager = _spark.sql("select * from schema_people where age > 13 and age <= 19")
  11. df_teenager.show()
  12. rdd_df = df.rdd.map(lambda x: "name={}".format(x.name))
  13. print(rdd_df.collect())

指定schema

  1. def specify_schema(_spark):
  2. sc = _spark.sparkContext
  3. lines = sc.textFile("resource/people.txt")
  4. parts = lines.map(lambda x: x.split(","))
  5. people = parts.map(lambda x: (x[0], int(x[1])))
  6. # 官方代码
  7. schema_string = "name age"
  8. fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split()]
  9. schema = StructType(fields)
  10. df = _spark.createDataFrame(people, schema)
  11. df.show()
  12. df.printSchema()
  13. # 手写schema,主要如果需要执行类型为int,需要rdd中的数据已经为int类型,否则会报错
  14. schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
  15. df = _spark.createDataFrame(people, schema)
  16. df.show()
  17. df.printSchema()
  18. df["age"] = df["age"].astype(StringType())
  19. df.schema
  20. df.printSchema()

7、Spark Streaming

http://spark.apache.org/docs/latest/streaming-programming-guide.html

7.1、Spark Streaming 概述

image.png

Spark Streaming 是

  • 基于core Spark API
  • 可伸缩
  • 高吞吐
  • 可容错
  • 支持众多的数据源
  • 可以使用Spark中的 high-level 函数如 map 、 reduce 、 join 、 window 表达的复杂的算法
  • 可以输出到文件系统、数据库、实时看板
  • 可使用在 Streaming中使用Spark的机器学习库和图形算法库处理数据
  • 的一个实时数据流处理的Spark拓展

Spark Streaming 的工作流程
image.png

  • 接收数据
  • 把数据拆分成 batches
  • batches被Spark Engine处理
  • 产生结果batches
  • Spark Core的核心概念是RDD,Spark Streaming的核心概念是 DStream(discretized stream)
  • DStreams 可以从Kafka, Flume, Kinesis等外部流创建,也可以从其他的DStreams创建
  • DStreams本质上是一系列的 RDD

7.2、常用实时流处理对比

应用 特性
Storm 真正的实时流处理
Spark Streaming mini batch 操作,微批处理,使用Spark一栈式解决
Flink 实时流处理(DataStream),支持基于实时流的批处理(DataSet)
Kafka Stream 分析处理Kafka中的数据

7.3、DStream

image.png

  • 是连续的流数据的抽象结构
  • 输入输出都是流数据
  • 本质上是一系列的RDD
  • 每个RDD都是固定的时间间隔获取到的数据
  • 应用在DStream上的操作,都是应用在RDD上的

8、Structured Streaming

是基于Spark SQL engin的一种流式计算引擎。Structured Streaming 之于 Spark SQL,相当于 Spark Streaming 之于 Spark RDD。

9、总结

入门知识就看到这里了,具体的一些问题可以单独细细讨论,每一个环节都值得深入了解。