试卷题型

单选题 20题 0.5分
多选题 10题 1分
判断题 10题 1分
填空题 20个空 1分
编程题 4题 10分 python基础 RDD SparkSQL SparkStreaming
简答/论述 1题 10分

Spark生态

  1. Spark 生态系统以**Spark Core** 为核心,利用StandaloneYARN Mesos 等资源调度管理,完成应用程序分析与处理。这些应用程序来自Spark 的不同组件,如Spark Shell Spark Submit 交互式批处理、**Spark Streaming** 实时流处理、**Spark SQL** 快速查询、**MLlib** 机器学习、**GraphX** 图处理等。

Spark架构

image.png

(1)Cluster Manager:Spark的集群管理器。主要负责资源的分配与管理。集群管理器分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给应用程序,但是并不分配Executor的资源。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。
(2)Worker:Spark的工作节点。对Spark应用程序来说,由集群管理器分配得到资源的Worker节点主要负责以下工作:创建Executor,将资源和任务进一步分配给Executor,然后同步资源信息给Cluster Manager。
(3)Executor:Spark任务的执行单元。主要负责任务的执行以及与Worker、Driver App的信息同步。
(4)Driver App:客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换为RDD和DAG,并与Cluster Manager进行通信与调度。

Spark与Hadoop的区别与联系

spark与hadoop的不同点:
1 应用场景不同
Hadoop和Spark两者都是大数据框架,但是各自应用场景是不同的。Hadoop是一个分布式数据存储架构,它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,降低了硬件的成本。Spark是一个专门用来对那些分布式存储的大数据进行处理的工具,它要借助hdfs的数据存储。
2 处理速度不同
hadoop的MapReduce是分步对数据进行处理的,从磁盘中读取数据,进行一次处理,将结果写到磁盘,然后从磁盘中读取更新后的数据,再次进行的处理,最后再将结果存入磁盘,这存取磁盘的过程会影响处理速度。spark从磁盘中读取数据,把中间数据放到内存中,完成所有必须的分析处理,将结果写回集群,所以spark更快。
3 容错性不同
Hadoop将每次处理后的数据都写入到磁盘上,基本谈不上断电或者出错数据丢失的情况。Spark的数据对象存储在弹性分布式数据集 RDD,RDD是分布在一组节点中的只读对象集合,如果数据集一部分丢失,则可以根据数据衍生过程对它们进行重建。而且RDD 计算时可以通过 CheckPoint 来实现容错。
spark与hadoop的联系:
Hadoop提供分布式数据存储功能HDFS,还提供了用于数据处理的MapReduce。 MapReduce是可以不依靠spark数据的处理的。当然spark也可以不依靠HDFS进行运作,它可以依靠其它的分布式文件系统。但是两者完全可以结合在一起,hadoop提供分布式集群和分布式文件系统,spark可以依附在hadoop的HDFS代替MapReduce弥补MapReduce计算能力不足的问题。

Spark相对于Hadoop MapRedue的优势

高效(比MapReduce快10~100倍)
内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
DAG引擎,减少多次计算之间中间结果写到HDFS的开销
使用多线程池模型来减少task启动开稍,shuffle过程中避免 不必要的sort操作以及减少磁盘IO操作
易用
提供了丰富的API,支持Java,Scala,Python和R四种语言
代码量比MapReduce少2~5倍
与Hadoop集成 读写HDFS/Hbase 与YARN集成

Spark环境搭建

  1. Spark使用**Scala语言**进行开发,Scala运行在**Java平台**之上,因此需要下载并安装JDKScala。值得注意的是ScalaJavaSpark三者之间是有版本**搭配限制**的,可以根据官方文档提供的组合进行下载,否则会出现启动异常。<br /> Spark在使用之前,需要进行一定的配置。主要包括**安装SSH,实现免密码登录,修改环境变量,修改Spark文件夹的访问权限,节点参数配置**等。

PySpark日志设置

  1. Spark程序运行时会产生大量的程序执行记录日志,其中有效的日志级别包括:DEBUG < INFO < WARN < ERROR < FATAL,控制日志输出内容的方式有两种:修改**log4j.properties**、代码中使用**setLogLevel(logLevel)**控制日志输出。

RDD主要属性

RDD 是一个数据集,不仅表示了数据集,还表示了这个数据集从哪来,如何计算。
主要属性包括:
1.数据分片:partitions。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
2.计算函数:compute。Spark中RDD的计算是以分片为单位的,每个RDD都可达到分片计算目的。
3.依赖关系:dependencies。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。
4.分区函数:partitioner。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。

RDD的创建

Spark的核心是弹性分布式数据集RDD,创建 RDD的方式有三种 :基于数据集合创建 parallelize、基于外部数据源创建textFile 和 基于父RDD转换得来

  1. from pyspark import SparkContext
  2. sc = SparkContext()
  3. # 使用已经存在的迭代器或者集合来创建RDD
  4. rdd = sc.parallelize([1,2,3,4,5])
  5. # 指定读取本地文件系统数据
  6. rdd1 = sc.textFile("file:///home/ubuntu/data.txt")
  7. # 指定读取HDFS数据
  8. rdd2 = sc.textFile("hdfs://localhost:8020/data.txt")
  9. # 按配置读取,spark-env.sh文件中若配置HADOOP_CONF_DIR则默认读取HDFS,否则默认读取本地文件系统
  10. rdd3 = sc.textFile("/data.txt")

RDD算子

RDD支持两种类型的操作: transformations(转换)和 actions(动作)。transformations操作会在一个已存在的 RDD上创建一个新的 RDD,但实际的计算并没有执行,仅仅记录操作过程,所有的计算都发生在actions环节。actions操作会执行记录的所有transformations操作并计算结果,结果可返回到 driver 程序,也可保存到相关存储系统中。

RDD场景算子的使用

RDD共享变量

累加器accumulator
一个全局共享变量,可以完成对信息进行聚合操作。当从文件中读取无线电的呼号列表的日志,同时也想知道输入文件中有多少空行,这里就可以用到累加器。需要注意一点,累加器是懒加载的,需要有action算子触发才会执行。
广播变量 Broadcast
Spark的算子逻辑是发送到Executor中运行的,数据是分区的,所以当Executor中需要引用外部变量时,需要使用广播变量。
累机器相当于统筹大变量,常用于计数。统计广播变量允许程序员缓存一个只读的变量在每台机器(worker)上面,而不是每个任务(Task)保存一份拷贝。利用广播变量能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。
广播变量通过两个方面提高数据共享效率:(1)集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;(2)广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。

宽依赖与窄依赖

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用。例如map、filter、union等操作都会产生窄依赖。宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖。读者可以这么理解,如果RDD之间的操作产生了shuffle,就是宽依赖。
image.png

DAG

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG。DAG 是一组顶点和边的组合,顶点代表了 RDD, 边代表了对 RDD 的一系列操作。在Spark里每一个操作生成一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成一个有向无环图。根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

image.png

RDD的持久化方法

  1. Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。RDD 可以使用 **persist() 方法或 cache() 方法**进行持久化。cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。

DataFrame与RDD的主要区别

在Spark中,DataFrame以RDD为基础,是一种的分布式数据集,与传统数据库中的二维表格相类似。DataFrame与RDD的主要区别如下,前者带有schema元素信息,即DataFrame所表示的二维表数据集,包含每一列的名称和类型。这使得Spark SQL得以学者可以洞察更多的结构信息,从而对隐藏于DataFrame背后的数据信息,以及对作用于DataFrame的转化,进行更有针对性的优化,最终达到大幅提升运行效率的目标。反观RDD,由于学者无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

image.png

SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习Spark的各项功能。

  • 在Spark的早期版本中,SparkContext是Spark的主要切入点,由于RDD是主要的API,我们通过SparkContext来创建和操作RDD。
  • 随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。
  • SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。

特点:

  • 为用户提供一个统一的切入点使用Spark 各项功能
  • 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
  • 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
  • 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

    1. from pyspark.sql import SparkSession
    2. spark = SparkSession.builder.getOrCreate()

    SparkSQL

  • Spark SQL是Spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame

  • 作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
  • 运行原理:将 Spark SQL 转化为 RDD, 然后提交到集群执行
  • 特性:
    • 集成。即:无缝地将SQL查询与Spark程序混合。
    • 统一数据访问。即:加载和查询来自各种来源的数据。包括Hive、Parquet、JSON和JDBC等。
    • Hive兼容性。即:在现有仓库上运行未修改的Hive查询。
    • 标准连接。即:通过JDBC或ODBC连接。
  • Spark SQL 是由DataFrame派生出来的,首先必须先出创建DataFrame,然后通过登录Spark SQL temp table就可以使用Spark SQL语句了。
  • 使用Spark SQL最简单,就是直接使用SQL语句,即使是非程序设计人员,只需要懂得SQL语句就可以使用 ```python from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()

data = [“zhangsan,18”,”lisi,19”,”wangwu,20”] rdd = spark.sparkContext.parallelize(data).map(lambda x:x.split(“,”)) rdd = rdd.map(lambda x:(x[0],int(x[1]))) df = rdd.toDF([‘name’,’age’])

方式一 SQL 通过createOrReplaceTempView注册为

df.createOrReplaceTempView(‘person1’) res = spark.sql(“select * from person1 where age > 18”) res.collect() “”” [Row(name=’lisi’, age=19), Row(name=’wangwu’, age=20)] “””

方式二 SQL 通过registerTempTable注册

df.registerTempTable(‘person2’) res = spark.sql(“select * from person2 where age > 18”) res.collect()

方式三 DSL

df.filter(“age > 18”).collect() ```

DSL与SQL操作DataFrame

Spark SQL特性

易整合。将SQL查询与Spark程序无缝混合;可以使用不同的语言进行代码开发。
统一的数据源访问。以相同的方式连接到任何数据源,sparksql后期可以采用一种统一的方式去对接任意的外部数据源,不需要使用不同的API。
兼容hive。sparksql可以支持hivesql这种语法 sparksql兼容hivesql。
支持标准的数据库连接。sparksql支持标准的数据库连接JDBC或者ODBC

流计算

  • 数据总体上可以分为静态数据流数据
  • 流数据是一组顺序、大量、快速、连续到达的数据序列,一般情况下,数据流可被视为一个随时间延续而无限增长的动态数据集合。
  • 对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算实时计算
  • 批量计算以“静态数据”为对象,可以在很充裕的时间内对海量数据进行批量处理,计算得到有价值的信息。Hadoop就是典型的批处理模型,由HDFS和HBase存放大量的静态数据,由MapReduce负责对海量数据执行批量计算。
  • 流数据必须采用实时计算,实时计算最重要的一个需求是能够实时得到计算结果,一般要求响应时间为秒级。
  • 在大数据时代,不仅数据格式复杂、来源众多,而且数据量巨大,这就对实时计算提出了很大的挑战。因此,针对流数据的实时计算——流计算,应运而生。
  • 总的来说,流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。

    流数据特点有哪些

    流数据特点:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息
    快速持续到达;
    来源多,格式复杂;
    数据量大,但不关心存储;
    注重整体价值;
    顺序颠倒或不完整;
    数据的价值随着时间的流逝而降低;

    SparkStreaming

    Spark Streaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。
    Spark Streaming是Spark的核心组件之一,为Spark提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。
    image.png
    Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。
    image.png
    Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。

    编写Spark Streaming程序的基本步骤

    1.通过创建输入DStream来定义输入源
    2.通过对DStream应用转换操作和输出操作来定义流计算。
    3.用streamingContext.start()来开始接收数据和处理流程。
    4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
    5.可以通过streamingContext.stop()来手动结束流计算进程。

    DStream转换操作

    DStream转换操作包括无状态转换有状态转换
    无状态转换:每个批次的处理不依赖于之前批次的数据。
    有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化的转换(updateStateByKey)