Chapter 3

RDD,最基础的抽象

三个重要特征(vital characteristics)

  • Dependencies
  • Partitions (with some locality information)
  • Compute function: Partition => Iterator[T]

    DataFrame

    可以理解成数据库的表,每个column都有具体的类型,integer, string, array, map, real, date, timestamp, etc

    DataSet

    DataFrame是没有严格类型的,然而DataSet要求类型(获得编译时的类型安全,只对scala/java有效:case class, JavaBean)。

Spark SQL是底层基础

At the core of the Spark SQL engine are the Catalyst optimizer and Project Tungsten

优化器的四个阶段(当前的理解能力只能到字面意思):

  • Analysis
  • Logical optimization
  • Physical planning
  • Code generation

不论开发者使用哪种应用语音,一个spark query都会经历一样的优化流程(如上4步)。

Chapter 4

Spark SQL includes SQL, the DataFrame API, and the Dataset API

DataFrameReader

只能通过spark.read/readStream引用到,而不能创建一个DataFrameReader实例。官方文档

  1. DataFrameReader.format(args).option("key", "value").schema(args).load()
  2. // 示例
  3. val df = spark.read
  4. .format("csv")
  5. .option("header", "true")
  6. .schema(schema)
  7. .load(csvFile)

DataFrameWriter

只能通过DataFrame的实例来引用到。

  1. DataFrameWriter.format(args)
  2. .option(args)
  3. .bucketBy(args)
  4. .partitionBy(args)
  5. .save(path)
  6. DataFrameWriter.format(args)
  7. .option(args)
  8. .sortBy(args)
  9. .saveAsTable(table)
  10. // 举例
  11. val location = "file path"
  12. df.write.format("json").mode("overwrite").save(location)

疑问

哪里可以执行SQL还没有搞明白?
回答:spark-sql

  1. -- In SQL
  2. CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING json
  3. OPTIONS (
  4. path "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
  5. )

Chapter 5

UDF(user define functions)

自定义函数(session scope)。典型的场景是封装model,使用这不需要理解model的内部逻辑。

  1. scala> val cubed = (s: Long) => s*s*s
  2. scala> spark.udf.register("cubed", cubed)
  3. scala> spark.range(1, 9).createOrReplaceTempView("udf_test")
  4. scala> spark.sql("select id, cubed(id) as id_cubed from udf_test").show()

UDF不保证子表达式的执行顺序,例如下面的片段strlen(s) > 1执行时不保证s不为null

  1. spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")
  • UDF自己内部来处理null的情况
  • 使用IF/CASE来判断之后调用UDF
  • 针对Python有个专门的库做性能优化:[Pandas UDFs](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)

    几个查询工具

    1,内置的spark-sql,不用多说
    2,内置的beeline,介绍说是参考hive,可以连远程 ```scala 需要启动服务 ./sbin/start-thriftserver.sh

连接 ./bin/beeline

!connect jdbc:hive2://localhost:10000 show tables; any sql…

停止服务 ./sbin/stop-thriftserver.sh

  1. 3Tableau/power BI 商用,不多说
  2. <a name="ewpj3"></a>
  3. ### 常见的数据库读写
  4. 我实操了一下`mysql`的,提供的api都比较简洁,其他数据库的用法差不多,步骤:<br />1,需要驱动,我下载的是`mysql-connector-java-8.0.29.jar`,下载的地方有两个
  5. > [https://downloads.mysql.com/archives/c-j/](https://downloads.mysql.com/archives/c-j/) 下载压缩包,解压后只需要jar文件
  6. > [https://mvnrepository.com/artifact/mysql/mysql-connector-java](https://mvnrepository.com/artifact/mysql/mysql-connector-java) 下载jar文件,版本随便
  7. 2,把jar文件放到Spark的目录(就是安装spark时解压的目录,也叫Spark Home目录),我是放在了`$spark_home/jars`目录(因为我操作时还不确定spark home目录在那里,就试了下这个目录)<br />3,上代码操作
  8. ```scala
  9. cd 到Spark Home目录
  10. # 注意使用的./jars/**.jar,因为我放到了jars目录
  11. $> ./bin/spark-shell --jars ./jars/mysql-connector-java-8.0.29.jar
  12. scala> val jdbcDF = spark.
  13. | read.
  14. | format("jdbc").
  15. | option("url", "jdbc:mysql://localhost:3306/RAP2_DELOS_APP").
  16. | option("driver", "com.mysql.jdbc.Driver").
  17. | option("dbtable", "Users").
  18. | option("user", "root").
  19. | option("password", "your-pass").
  20. | load()
  21. scala> jdbcDF.show(3, false)
  22. # 成功显示数据库里的内容
  23. # 写库没有操作,应该是一样的

如何处理复杂的数据类型(深嵌套)

常见的做法有两种:

  • 把嵌套的数据打平,创建新表保持数据
  • 使用UDF封装处理复杂的数据类型,对上层透明

另外也内置和很多方法来操作arraymap,例如array_sortarray_removemap_concat。还有一招支持higher-order function,例如transform(values, value -> lambda expr)

常见的操作

内置函数官网:https://spark.apache.org/docs/latest/api/sql/index.html#day

window function

window: a range of input rows

  1. -- In SQL
  2. spark.sql("""
  3. SELECT origin, destination, TotalDelays, rank
  4. FROM (
  5. SELECT origin, destination, TotalDelays, dense_rank()
  6. OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
  7. FROM departureDelaysWindow )t
  8. WHERE rank <= 3 """).show()

理解不是很多,需要练习,@TODO

修改操作

DataFrame是不可修改的,应用的这些操作都会新创建DataFrame — Scala精髓

例如增加列,删除列

  1. val foo2 = foo.withColumn("status",
  2. expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
  3. )
  4. val foo3 = foo2.drop("delay")
  5. val foo4 = foo3.withColumnRenamed("status", "flight_status")

有个比较厉害的例子:没有使用group by,但对相同的destination进行聚合计算了avg/max,并且筛选了month属于1和2的(值得实操一下!!!)。

  1. -- In SQL
  2. SELECT * FROM (
  3. SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
  4. FROM departureDelays
  5. WHERE origin = 'SEA'
  6. )
  7. PIVOT (
  8. CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
  9. FOR month IN (1 JAN, 2 FEB)
  10. )
  11. ORDER BY destination

Chapter 6

注意:case class定义的属性顺序要和数据格式里的顺序一样

Tungsten

Spark是内存消耗性应用,在1.0的时候性能很差,后面引入Tungsten来解决这个内存消耗问题,主要的特性:row-base formatmemory off-heap减少GC
image.png
image.png

DataSet有代价

most good things in life come at a price

主要的问题是需要序列化和反序列化(Tungsten <-> JVM),有两个优化建议:
1,不要在higher-order function中过渡使用匿名lambdas参数
2,链式调用,减少序列化的次数(不要穿插使用lambdas和DSL)
image.png

Chapter 7

Three way set Spark properties

  • configuration file, in Spark Home directory
  • 应用启动时代码指定或者命令行参数,spark-submit --conf
  • programmatic interface via the Spark shell ```scala scala> spark.conf.isModifiable(“spark.sql.shuffle.partitions”) val res1: Boolean = true

scala> spark.conf.get(“spark.sql.shuffle.partitions”) val res2: String = 200

scala> spark.conf.set(“spark.sql.shuffle.partitions”, 100)

scala> spark.conf.get(“spark.sql.shuffle.partitions”) val res3: String = 100

  1. **优先级**:配置文件 < 命令行 < 程序直接设置
  2. <a name="YlT4A"></a>
  3. ### 应对大规模作业场景的配置参考
  4. <a name="aWrNO"></a>
  5. #### 动态申请executor
  6. > 使用spark-submit命令行指定资源,是静态,不可改变的,不会动态调整executor数量。
  7. ```scala
  8. # 开启动态申请,最少2个最多20个executor
  9. # task积压1分钟就新开一个executor
  10. # executor超过2分钟闲置就回收
  11. spark.dynamicAllocation.enabled true
  12. spark.dynamicAllocation.minExecutors 2
  13. spark.dynamicAllocation.schedulerBacklogTimeout 1m
  14. spark.dynamicAllocation.maxExecutors 20
  15. spark.dynamicAllocation.executorIdleTimeout 2min

executor的内存模型(下面写的是默认)

  • execution memory, 60%,主要用于shuffle/join/aggregation
  • storage memory,40%,主要用于缓存用户数据
  • reserved memory,保留300M,减轻OOM

优化点:map/shuffle这些操作会导致读写磁盘IO,可以做一些调整尽量使用缓存数据

  1. spark.driver.memory # 默认1G
  2. spark.shuffle.file.buffer # 默认32k,建议1M;使spark多利用缓存
  3. spark.shuffle.registration.timeout # 默认5000ms,建议120000ms

最大化并行的能力(it’s a trial-and-error approach)

spark任务模型:A job -> N stage; A stage -> N task
最理想的并发情况:一个线程 -> 一个task -> 一个core;一个task处理一个唯一的数据partition
数据partition(20220518的理解):

  • 文件系统中连续的块(block/chunk)的一个子集,能用独立(不同线程)读取和计算(并行)
  • 一般默认的大小(64M - 128M),例如hdfs/S3默认为128M
  • 可以显示指定创建数量,例如spark.read.textFile("a-large-file.txt").repartition(16)
  • shuffle stage会创建shuffle partitions,默认spark.sql.shuffle.partitions = 200,太小的分块可能导致数据在executor之间过多的传输(通过网络)
  • shuffle过程难免写磁盘,建议用SSD

    Cache数据

    DataFrame.cache()/persist()

使用cache()

用到的partition才会放入缓存,缓存的单元是partition(不可分隔0.5个partition),所以一个DataFrame的partitions可能只有部分缓存了。

使用persist(StorageLevel.LEVEL)

LEVEL = MEMORY_(ONLY | ONLY_SER | AND_DISK | AND_DISK_SER) | DISK_ONLY | OFF_HEAP

每个LEVEL其实还有个LEVEL_2的选项(如MEMORY_ONLY_2),这个选项要求数据放到两个不同的地方,作用主要是容错和让spark可以更好的调度executor。
除了DataFrame,view和table也可以被cache。

不可避免的join

join会触发数据在executor之间大量移动 5种策略:broadcast hash join(BHJ),shuffle hash join(SHJ),shuffle sort merge join(SMJ),broadcast nested loop join(BNLJ),shuffle-and-replicated nested loop join

Broadcast Hash Join

数据集大小小于10MB,Spark会默认使用这个策略。配置spark.sql.autoBroadcastJoinThreshold
适合的场景是一个大数据集+一个很小的数据集,小的程度是比较容易放到单独的executor内存中。
从名字可以容易理解,其实是会把这个完整的小数据集广播给每个executor,而大数集则partition。

Shuffle Sort Merge Join

From Spark’s perspective, this means that all rows within each data set with the same key are hashed on the same partition on the same executor.

  1. import scala.util.Random
  2. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
  3. var states = scala.collection.mutable.Map[Int, String]()
  4. var items = scala.collection.mutable.Map[Int, String]()
  5. val rnd = new scala.util.Random(42)
  6. states += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")
  7. items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4", 5-> "SKU-5")
  8. val usersDF = (0 to 1000)
  9. .map(id => (id, s"user_$id", s"user_${id}@dod.com", states(rnd.nextInt(5))))
  10. .toDF("uid", "login", "email", "user_state")
  11. val ordersDF = (0 to 1000)
  12. .map(r => (r, r, rnd.nextInt(100), 10 * r * 0.2d, states(rnd.nextInt(5)), items(rnd.nextInt(5))))
  13. .toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")
  14. // 会大量移动数据,我的本上OOM
  15. val usersOrdersDF = ordersDF.join(usersDF, $"users_id" === $"uid")
  16. usersOrdersDF.show(false)
  17. // 优化,无OOM
  18. import org.apache.spark.sql.functions._
  19. import org.apache.spark.sql.SaveMode
  20. usersDF.orderBy(asc("uid")).write.format("parquet").bucketBy(4, "uid").mode(SaveMode.Overwrite).saveAsTable("UsersTbl")
  21. ordersDF.orderBy(asc("users_id")).write.format("parquet").bucketBy(4, "users_id").mode(SaveMode.Overwrite).saveAsTable("OrdersTbl")
  22. // Cache the tables
  23. spark.sql("CACHE TABLE UsersTbl")
  24. spark.sql("CACHE TABLE OrdersTbl")
  25. // Read them back in
  26. val usersBucketDF = spark.table("UsersTbl")
  27. val ordersBucketDF = spark.table("OrdersTbl")
  28. // Do the join and show the results
  29. val joinUsersOrdersBucketDF = ordersBucketDF.join(usersBucketDF, $"users_id" === $"uid")
  30. joinUsersOrdersBucketDF.show(false)

Chapter 8(todo: 实操)

Streaming Data Source

Unfortunately, as of Spark 3.0, the APIs to build custom streaming sources and sinks are still experimental. 大意是目前自定义的流式源还不稳定

Files

  • 文件目录可以作为stream的源
  • 多个文件都可以读,但micro-batch由于某些原因(如限流)只能选一部分,则earliest timespamps,并且选取的文件处理也是并行,无顺序处理的说法
  • 写文件只支持append mode,为了支持exactly-once guarantee会在_spark_metadata子目录维护log信息

    Kafka

  • 支持全部的3种写模式,但complete mode是不推荐的,会导致很多重复的数据

  • 细节介绍不多,只是表明容易使用,跟多参考官网Kafka Integration

    自定义写入逻辑

  • foreachBatch(),可以自定义一个回调函数(如:写到多处),每次处理一个micro-batch注意这个方法只保证at-least-once回调,需要自定义的逻辑来处理重复的batchId

  • foreach这个需要实现一个trait的三个方法:open/process/close,这个方法更底层,更多了解需要看官方文档

    Data Transformation

    The essence of streaming state is to retain summaries of past data.

流式处理涉及一个很关键的问题:有状态的数据,一直需要保持。
image.png

Stateful Streaming Aggregation(只涉及一个流)

聚合可以不同维度区分,如按时间来看:基于时间和不基于时间的聚合

不基于时间的聚合又可以分为:全局、局部

对于streaming DataFrame,不可以直接使用DataFrame.count()/reduce(),而要用groupBy().count()

  1. # 全局,不传参
  2. val runningCount = sensorReadings.groupBy().count()
  3. # 局部,传key
  4. val baselineValues = sensorReadings.groupBy("sensorId").mean("value")
  5. # 一次使用多个聚合函数,mean是求平均的函数
  6. import org.apache.spark.sql.functions.*
  7. val multipleAggs = sensorReadings
  8. .groupBy("sensorId")
  9. .agg(
  10. count("*"),
  11. mean("value").alias("baselineValue"),
  12. collect_set("errorCode").alias("allErrorCodes")
  13. )

支持聚合的函数有如下几类:
1,内置聚合函数:count()/sum()/mean()/stddev()/countDistinct()/collect_set(),更多
2,一次使用多个聚合函数
3,UDF,自定义

基于时间的聚合(window)

注意:区分两个概念的不同,事件发生的时间 和 接受到事件的时间

例举了时间窗口、以及滑动时间窗口的聚合;有点复杂,在于

  • 数据过来是无序的,收到和发生没有顺序保障
  • 收到很久以前的数据,要如何抉择:放弃 or 更新很久以前的记录?
  • 搞个折中的办法:设置一个超时窗口(watermarks)

image.png

Streaming Join(涉及两个流)

流 + 静态数据集的Join

这个比较简单,静态的数据集缓存到executor里面,干就完了。

流 + 流的Join

因为两个都是无限的集合,当某个key没有匹配到时,可能是这个key的另一半没到来;这就带来了问题:
1,需要等,就导致了要用缓存把数据暂存起来
2,等多久了?默认是无限等,这会导致消耗大量的内存
3,可以限定一下时空关系,这样让spark可以知道数据有过期这一说(过期可以不管),也就限制了内存的消耗
另外要执行out join的话,必须限定时空关系,不能无限等(这种输出是有延时的)

自定义有状态的计算(涉及一个流)

需要代码实现,所以3.0只支持Scala和Java

mapGroupsWithState()

输入:上一个版本的状态(保持的数据)、新的数据(by key 分好了的)
输出:更新后的状态、计算的结果

  1. def arbitraryStateUpdateFunction( key: K,
  2. newDataForKey: Iterator[V],
  3. previousStateForKey: GroupState[S]
  4. ):U
  5. val inputDataset: Dataset[V] = // input streaming Dataset
  6. inputDataset
  7. .groupByKey(keyFunction) // keyFunction() generates key from input
  8. .mapGroupsWithState(arbitraryStateUpdateFunction)

这个操作也有超时这一说法(很显然),spark的实现可以选择不同的时间基准:

  • 基于process time,就是以spark运行的时钟为准
  • 基于event time,这个是基于事件产生的时间为准,相当于第三方的时钟

    flatMapGroupsWithState()

    这个操作就太灵活了(太复杂),主要是解决上一个操作具有的局限(只能返回一个值,以及不能使用append mode);作者没有过多介绍,可能是介绍太细也没啥用,需要动手实操来理解。

    Chapter 9 Storage Solution

    前面的章节都是在说明如何计算,但真实的应用最终是需要把结果保存下来。

    Database

    两种类型:OLTP(online transaction processing) 和 OLAP(online analytical processing)
    显然Spark是属于OLAP
    主要有两个限制:数据规模的增长后不容易扩容;不支持no-sql的分析(machine learning)

    Data Lake

    a data lake is a distributed storage solution that runs on commodity hardware and easily scales out horizontally.

一种分布式的存储方案,使用很常见的硬件设备,很容易水平扩容。
建设Data Lake从三个维度,进行独立评估:

  • 存储系统,HDFS/S3/Azure Data Lake/Google Cloud
  • 文件格式,结构化(parquet)、半结构化(JSON)、无结构(text/image)
  • 处理引擎,Spark、Flink、Spark MLlib

Spark支持Hadoop’s FileSystem API,大部分都实现了这个接口标准。
Data Lake这种存储解决方案其实已经很牛逼了,现有的很多应用就是基于此;但还是有个比较大的缺陷:不支持事务

LakeHouse

Lakehouses are enabled by a new system design that provides data management features similar to databases directly on the low-cost, scalable storage used for data lakes.

它的出现,使得一种全新的存储设计架构成为可能:低成本、高伸缩、事务特性。
提到了三个开源的实现:Apache Hudi/Apache Iceberg/Delta Lake,They are all open data storage formats
Delta Lake是Spark原创团队搞起来的,和Spark集成的最好。
我理解(有点存疑):LakeHouse 只是一种文件格式,分布式的话,还需要结合分布式存储系统(如HDFS)。
推荐阅读:https://databricks.com/discover/data-lakes/introduction,里面提到的最佳实践:

  • Use the data lake as a landing zone for all of your data
  • Mask data containing private information before it enters your data lake(脱敏)
  • Secure your data lake with role- and view-based access controls(权限)
  • Build reliability and performance into your data lake by using Delta Lake
  • Catalog the data in your data lake

image.png