Spark 快速大数据分析
https://blog.csdn.net/swing2008/article/details/60869183 Spark(一): 基本架构及原理
弹性分布式数据集 RDD
是分布在多个计算节点上的可以并行操作的元素集合,是Spark的主要编程抽象
集群管理器,YARN Mesos 以及Spark自带的简易调度器,叫做独立调度器
2.3 Spark核心概念介绍
*驱动器程序+执行器节点
SparkContext对象代表对该计算集群的一个连接
驱动器程序通过SparkContext对象来访问Spark
向Spark传递函数


Directed Acyclic Graph DAG有向无环图
Stage 属于同一个TaskSet的任务,Stage的划分和调度是由DAGScheduler来负责的
Stage分为非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方
Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
TASKSedulter
HA(High available)
Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD).
After Spark 2.0, RDDs are replaced by Dataset
SparkContext—-》SparkSession
Spark内存管理
https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral
堆内 规划式管理,不精确
堆外 精确,降低了管理难度。
第三章 RDD编程
RRD 不可变的分布式对象集合
RDD默认不进行持久化,RDD.persist
RDD的基本操作:创建、转换(Transformation)和行动(Action)
转换操作返回的是RDD,行动操作返回的其他数据类型
RDD的惰性求值机制
MapReduce貌似没有惰性求值机制?
RDD:记录如何计算数据的指令列表,而不是存放特定数据的数据集
RDD在任何时候都能重算
3.2 创建RDD
将已有集合传递给parallelize()方法 val lines = sc.parallelize(List(“pandas”,”i like paddas”))
从外部存储读取 val lines = sc.textFile(“READ.md”)
* 3.3 RDD操作
转化操作fliter map union distinct 转化操作都是惰性求值的
RDD谱系图 lineage graph
行动操作take count collect
RDD的宽依赖和窄依赖
窄依赖的函数有:map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues
* 宽依赖的函数有:groupByKey, join(父RDD不是hash-partitioned ), partitionBy,reduceByKey、sortByKey
宽依赖一般都对应着shuffle(但是shuffle不一定对应着行动操作)

  1. persist 并不是action操作
  2. ** 3.4 Spark传递函数
  3. Scala:所传递的函数及其所引用的数据需要是可序列化的
  4. ** 3.5 常见的转化和行动函数
  5. 1、基本RDD
  6. a 针对各个元素的转化函数 filter map flatmap(每个输入元素生成多个输出元素,将迭代器拍扁)
  7. b 伪集合操作
  8. *RDD 并不是严格意义上的集合,不能保证元素的唯一性
  9. *RDD.distinct() 开销很大,shuffle混洗
  10. *union
  11. *interselection 返回两个RDD中都有的元素 shuffle
  12. *subtract
  13. *cartesian 求笛卡尔积
  14. c 行动操作
  15. *reduce 累加,操作两个相同类型RDD的数据,并返回一个同样类型的数据
  16. *folder 类似,但是要加上初始值
  17. *collect
  18. *takeSample(false,3) RDD中随机返回一些元素
  19. *foreach
  20. *aggragate 返回值类型可以和所操作的RDD数据类型不一致
  21. 2、在不同RDD类型间转换
  22. scala中的隐式转换
  23. import org.apache.spark.SparkContext._
  24. ** 3.6 持久化(缓存)
  25. 持久化级别
  26. persist
  27. unpersist

第四章 键值对操作
为分布式数据集选择正确的分区方式
4.1 动机
pair RDD
4.2 创建Pair RDD
map
4.3 Pair的转化RDD
reduceByKey 合并相同键的值
groupByKey
sortByKey
mapValues
flatMapValues

  1. 针对两个pair RDD的转化操作
  2. val h= pairs.filter{case (k,v)=>v.length<20}
  3. 4.3.1 聚合操作
  4. rdd. mapValues( x => (x, 1)). reduceByKey(( x, y) => (x._ 1 + y._ 1, x._ 2 + y._ 2))
  5. 求每个键平均值
  6. 合成器,combiner
  7. //flatMap和mapValues来实现单词计数
  8. val rdd = sc.textFile("README.md")
  9. val sentences = rdd.filter(x=>x.contains(x)) //简化过滤
  10. val words = sentences.flatMap(x=>x.split(" ")) //变成单个单词
  11. val wordsMap = words.Map(x=>(x,1)) //变成键值对
  12. wordsMap.reduceByKey((x,y)=>x+y) //相同键的合并,但是这里可以这样用吗??
  13. words.countByValue() //更快更便捷的方法
  14. combineByKey这里没有太看明白,暂时放过。。
  15. *并行度调优
  16. 分区数决定了在RDD上执行操作的并行度
  17. words.partitions.size // 查看当前RDD的分区数
  18. repartition函数,重新分区,将数据通过网络混洗,并创建新的分区集合,这是一个代价较大的操作。
  19. 4.3.2 数据分组
  20. groupByKey
  21. cogroup
  22. 4.3.3 连接
  23. 右外连接
  24. 左外链接
  25. 交叉连接
  26. 内连接
  27. join

4.3.4 数据排序
sortByKey()

4.4 PairRDD的行动RDD
countByKey 对每个键对应元素分别计数
collectAsMap 将结果以映射表形式返回,以便查询
lookup(key) 返回指定键,对应的所有值

  1. 4.5 数据分区
  2. SPARK可以通过控制RDD分区方式来减少通信开销
  3. 但是分区并不是都是好的,只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才有帮助
  4. 跨节点数据混洗
  5. partitionBy() 将表转为哈希分区,这是一个转化操作,返回值总是一个新的RDD
  6. spark.HashPartitioner
  7. sortByKey GroupByKey会分别生成范围分区和哈希分区的RDD
  8. map这样的操作会导致新的RDD失去父RDD的分区信息
  9. 未看。。

第五章 数据读取和保存
5.2 文件格式
1/文本
textFile
wholeTextFile
通配符
saveAsTextFile
2/JSON
5.3 文件系统
5.4 SparkSQL中的结构化数据
1 Hive
结构化数据
2 Json
第六章 Spark编程进阶
累加器 accumulator 用来对信息进行聚合 (结果聚合通信)
将工作节点的值,聚合到驱动器程序中的简单语法
累加器只有在驱动器程序汇总才能访问,工作节点不能访问
累加器是一个只写变量(除了驱动器可以访问)
如果想要在失败或需要重复计算的累加器,可将其放在foreach这样的行动操作中
转化操作中的累加器,可能会有异常
自定义累加器
*广播变量 broadcastvariable 高效分发较大的对象 (广播通信)

  1. 类似于MR中的分布式缓存
  2. 高效地向所有工作节点,发送一个较大的只读值
  3. 采用一种类似于Bittorrent的通信机制
  4. 变量只会被发到各节点一次,应当作为只读值处理
  5. *为了达到只读,可以使用广播基本类型的值或者引用不可变对象
  6. 也可以类似于数组的可变对象,但是要自己维护只读的条件-->不要手动去修改这个数组
  7. (工作节点手动修改了这个数组,但是只会对这个节点的本地副本有效,不会影响全局的广播变量)
  8. 广播的优化:选择合适的序列化库(原生的低效,换用Kyro)
  • 基于分区进行操作
    mapPartitions
    mapPartitionsWithIndex
    foreachPartitions

第七章 在集群上运行Spark
7.2 SPARK运行时框架
有向无环图DAG Directed Acyclic Graph
Driver
执行main方法的进程
DAGScheduler 将用户程序转为任务,程序-步骤Stage-任务Task,将DAG的逻辑图转化为执行计划并提交集群
task是Spark中最小的工作单元
TaskScheduler 为执行器节点调度任务
TaskScheduler 为执行器节点调度任务
Executor
运行任务,结果反馈
通过自身的块管理器,提供RDD的缓存服务
*本地模式中,驱动器和执行器程序,是在一个Java进程中运行的
集群管理器
Spark中的可插拔组件
Spark依赖他来启动执行器,有时也依赖他来启动驱动器
主节点和工作节点都是集群管理器中的概念
spark-submit,将应用提交到集群管理器
===================================================
DAG中的两种任务
1、shuffle map任务 不会运行在最后阶段。会将输出写入一组新的分区中,供后面的阶段取用。返回的是可以让下一个阶段检索其输出分区的信息
2、result任务 运行在最终阶段,将所在分区的运算结果发送回Driver汇集。
===================================================
DAG调度程序—>任务调度程序,
考虑位置偏好,分配Task

推测任务:是现有任务的副本,发现现有任务比预期的慢太多,Spark会酌情启动预测任务(设置启用情况下)

7.5 Spark应用内与应用间调度
多用户集群
长期运行应用,如JDBC服务器—>公平调度器
7.6 集群管理器
本地模式
独立模式
Mesos模式
YARN模式
YARN客户端模式 driver在客户端
YARN集群模式 driver在YARN的application master集群上运行 —>适合生成作业,易于保留日志文件
第八章 Spark调优与调试
8.1 通过SparkConf配置Spark
SparkConf
spark-submit —conf 命令提交时也可以动态设置配置项
spark-submit —properties-File 自定义配置文件的位置
8.2 Spark执行的组成部分,作业任务和步骤
Spark将RDD的逻辑表示翻译为物理执行计划
toDebugString() 查看RDD的谱系
如果RDD需要混洗数据从父节点计算出来时,那么就需要新建新的stage。
一个物理步骤stage中的计算,是流水线执行的
优化:
1、流水线执行优化
2、RDD谱系图的截短
1/RDD 已经缓存缓存—>短路求值
2/RDD在数据混洗中作为副产品物化出来—>短路求值(原因是spark数据混洗操作的输出均被写入磁盘)
步骤的集合 —> 作业(作业Job—>步骤Stage—>任务Task)
8.3查找信息
1、Spark 网页用户界面
Spark中性能问题的常见原因:数据倾斜~
执行器页面Executor,Thread Dump线程转存按钮,收集执行器进程低效的栈跟踪信息,检测比较低效的用户代码
环境页面Environmenmt,调试Spark配置项
2、驱动器和执行器进程的日志
yarn logs — applicationId
8.4 关键性能考量
1 并行度
输入RDD一般根据其底层的存储系统来选择并行度
repartition 会shuffle
coalesce 不一定shuffle
2 序列化格式
序列化 —> 数据变为二进制
序列化会在数据混洗时发生,可能会造成大量的网络数据传输量
Java序列化
Kryo序列化
3 内存管理
执行器进程中,内存的三个用处
1、RDD缓存(cache或persist)
2、数据混洗与聚合的缓存区(保存中间数据)
3、用户代码(数组或其他对象等)
默认60% RDD缓存,20% 混洗缓存,20%用户代码
4 硬件供给
-executor-memory 3g 都有
—driver-memory 15g
—num-executors 20 Yarn才有
—executor-cores 3 Yarn才有
第九章 SparkSQL
SchemaRDD(在1.3之后被DataFrame取代)
是存放Row对象的RDD
是否有Hive支持?
HiveContext or SQLContext
推荐使用HiveQL作为SparkSQL的查询语言
第10章 Spark Streaming
使用离散化流(discretized stream) DStream来抽象表示,即随时间推移而收到的数据序列
是RDD组成的序列。
DStream支持两种操作,一种是转化操作,一种是输出操作
检查点机制,将数据存储到可靠文件系统里面的机制
第11章 基于MLlib的机器学习
MLlib中只包含能够在集群上运行良好的并行算法
单节点机器学习库
WeKa,SciKit-Learn
分类:已知种类,有监督学习
聚类:未知种类,无监督学习 Kmeans
11.3 机器学习基础
11.4 数据类型
Vector 向量(稠密向量和稀疏向量)
LabeledPoint 监督式学习中带标签的数据点
Rating
Model类
*操作向量:
1、
稠密向量 将向量的每一位都存下来
稀疏向量 只存储非零位以节省空间(当只有10%的为非零值时,倾向于用稀疏向量)
2、
11.5 算法
降维
Spark调优
数据倾斜
https://blog.csdn.net/u011317245/article/details/53542570 Spark性能调优—调度与分区优化
repartition 会shuffle
coalesce 不一定会shuffle
如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的
分区要变多,一定要用repartition
Dataset a strongly typed collection of objects 有类型的,需要一个明确的Encoder

DataFrame A distributed collection of data organized into named columns. 无类型的,按照命名栏位组织的分布式数据集
DataFrame引入了schema和off-heap
DataFrame带有schema,而DataSet没有schema。schema定义了每行数据的“数据结构”,就像关系型数据库中的“列”,schema指定了某个DataFrame有多少列。
http://www.cnblogs.com/seaspring/p/5804178.html Spark RDD、DataFrame和DataSet的区别
Some feature transformers implemented in MLlib are inspired by those implemented in scikit-learn.
The major difference is that most scikit-learn feature transformers operate eagerly on the entire input dataset,
while MLlib’s feature transformers operate lazily on individual columns, which is more efficient and flexible to handle large and complex datasets
由于Spark2.0起,SQLContext、HiveContext已经不再推荐使用,改以SparkSession
task —>stage—> job
===========================================================================
map和mapPartition的区别
map:map操作发送到每一份数据
mapPartition:map操作发送到每一个分区
Hadoop的map操作后面一定会接reduce
spark不一定
spark 的repartition,是在哪一个阶段呢? reduce前还是后?