整体感受

这边书最大的优点是对Java非常友好,全书示例都是用Java实现来讲述。另外一个优点是几乎每章都有真实数据的示例练习。缺点是理论讲的少(不深入浅出),不适合成为学Spark的第一本书。对我有个好处就是,我用scala独自实现了书中所有的练习(不参照作者github里面的实现),一来巩固之前学的Spark知识,二来也是作为Scala的练习。

Ch1,源码

摘要

读取一个本地csv文件,然后输出,相当于走通hello, world

Ch 2,源码

摘要

Spark输出到第三方存储,这里的示例是输出到mysql(原书的例子是postgresql),需要数据库驱动的jar包

注意点

1,使用$"name"这种获取列的语法,需要手动引入隐式转换import spark.implicits._
2,手动建了数据CREATE DATABASE spark_labs character set utf8mb4;

Ch 3,源码

摘要

1,操作Dataset,使用case class得到schema
2,读取不同数据源csv/json,转换统一scheme后,将两个dataframe拼接起来(union)

注意点

1,打平嵌套的属性withColumn("datasetId", $"fields.id")
2,读取数组元素withColumn("type", split($"fields.type_description", " - ").getItem(1))
3,拼接不同列withColumn("id", concat($"state", lit("_"), $"county", lit("_"), $"datasetId"))
4,上面这些处理函数都来之org.apache.spark.sql.functions
5,Dataset处理允许null的属性

  1. case class Book(id: Int,
  2. authorId: Option[Int],
  3. title: String,
  4. releaseDate: Option[Date],
  5. link: String
  6. )

Ch 4,源码

摘要

打印跟踪了一下spark执行的步骤,使用System.currentTimeMillis计时

Ch 5/6,源码

摘要

1,一个大数据应用:使用统计的方法计算π值,落在圆内的点/落在正方形内的点
2,介绍如何启动一个Spark集群

  1. $ cd SPARK_HOME/sbin
  2. $ ./start-master.sh
  3. // 另外的而机器或者本机
  4. $ ./start-worker.sh spark://changzhi.local:7077

Ch 7,源码

摘要

读取各种文件格式(Avro, Orc, Parquet, Json, xml, csv, text),得到DataFrame

注意点

有些格式需要第三方库支持(如xml),需要额外下载jar包,https://mvnrepository.com/

Ch 8,源码

摘要

1,从不同数据库中获取数据(mysql、sqlite、es),得到DataFrame
2,获取数据库执行sql后的结果
3,获取数据库内容,同时指定分区数

    props.put("partitionColumn", "film_id")
    props.put("lowerBound", "1")     // film_id 最小值
    props.put("upperBound", "1000")  // film_id 最大值
    props.put("numPartitions", "10")

4,使用sqlite示例了如何自定义实现数据库方言JdbcDialects.registerDialect(dialect)

注意点

1,使用es的jar需要和scala的版本匹配上,例如elasticsearch-spark-30_2.13-8.2.2.jar
2,es访问远程节点需要设置选项option("es.nodes.wan.only", "true")

Ch 9,源码

摘要

这个练习是整本书,最复杂的一个,也是深入底层API最很的一个。
实现自定义的数据格式,示例是获取图片元信息(拍摄日期,尺寸、地点等),注册自定义驱动

// 位置:src/main/resources/META-INF/services/
// 配置文件:org.apache.spark.sql.sources.DataSourceRegister
// 内容:
net.jgp.books.spark.ch09.x.ds.exif.ExifDirectoryDataSourceShortnameAdvertiser

注意点

1,实现了递归遍历文件夹,获取所有/过滤文件,支持最大文件获取数

Ch 10,源码

摘要

1,处理流式数据
2,读取文件流,这里自定义实现了一个产生文件流的程序,辅助实验过程
3,读取网络流,使用了Unix自带的服务nc -lk 9999
4,读取多个文件流,并在流上注册了一个foreach检查器ForeachWriter[Row],这个东西可以实现旁路
5,实现了Fat Jar

Ch 11,源码

摘要

1,使用spark.sql操作数据,需要创建视图
2,createOrReplaceTempView在Session级别共享
3,createOrReplaceGlobalTempView在Application级别共享
4,DataFrame不可以delete,只能filter
5,特殊传参方式

// Just use yr1980 and yr2010
val noUsedCols = (1981 to 2009).map(y => s"yr${y}")
// Ref: def drop(colNames: String*): DataFrame
val cleanDF = df.drop(noUsedCols: _*).withColumn("evolution",
  expr("round( (yr2010 - yr1980) * 1000000)")
)

Ch 12,源码

摘要

1,练习一些transform操作。作者也提到现实世界里,原始数据往往是比较恶劣,通常不知道字段的含义,需要进行逆向工程
2,列举了所有join的类型:inner/outer/left/right/left-semi/left-anti/cross

注意点

1,在处理df之前可以记录原始的column名称,方便转换完成后一次性删除原始的列名

val cols = df.columns.toSeq

// 一次性删除列名
df.[withColumn | withColumnRenamed | drop]()
  .drop(cols: _*)

2,对于列是array的类型,可以通过下标索引;如果越界会直接赋值null,而不是报错

val transDF = cleanDF.
  withColumn("countyState", split(col("label"), ", ")).
  withColumn("state", col("countyState").getItem(1)).
  withColumn("county", col("countyState").getItem(0))

3,functions里面对于数组下标是从1开始的

df.withColumn("addressElements", split(col("Address"), " ")).
  withColumn("addressElementCount", size(col("addressElements"))).
  withColumn("zip9", 
     element_at(col("addressElements"), col("addressElementCount"))
  )

4,join操作经常出现的一个问题是重名的列,删除时需要用到df

val joinCamCountyDF = campusDF.join(countyDF, campusDF("zip").equalTo(countyDF("zip")), "inner").
  drop(campusDF("zip"))
showDF(joinCamCountyDF)

val joinCenCamDF = censusDF.join(joinCamCountyDF, censusDF("countyId").equalTo(joinCamCountyDF("county")), "left").
  drop(joinCamCountyDF("county")).
  distinct

Ch 13,源码

摘要

1,处理多行的json数据,multiline = true
2,将单行数组扩展成多行,总行数扩大,df.withColumn("items", explode($"books"))
3,将多行合并成一行,总行数减少,这其实agg的应用

// 先创造了一个新列,聚合需要列的值
val tempDF = joinDF.
  withColumn("temp_column", struct(getColumnsOfDataFrame(inspectionDF):_*))

val nestedDF = tempDF.
  groupBy(getColumnsOfDataFrame(businessDF):_*).
  agg( collect_list($"temp_column").as("inspections") )

4,介绍了functions库实现的函数的类别:数组操作、日期操作、数学函数、安全、字符串处理等

Ch 14,源码

摘要

1,实现udf,注册,调用
2,udf多列传参,返回单值、或者case class
3,这个练习提供的数据格式有点乱,处理日期是麻烦的时期(我实现的时候花了挺多时间)
4,集成了单元测试,对于工具类的函数,最应该进行有效的单元测试
5,udf一个典型的应用场景是:数据质量检查,毕竟大数据不能人工肉眼看
6,udf一个弱项:不能被优化器理解(如Catalyst),不建议大范围用

Ch 15,源码

摘要

1,介绍一些简单agg操作,求和、平均值
2,UDAF:自定义实现聚合函数(传参支持单列、多列),接口比udf复杂一点

// abstract class Aggregator[-IN, BUF, OUT] extends Serializable

case class Data(i: Int)

val customSummer =  new Aggregator[Data, Int, Int] {
  def zero: Int = 0
  def reduce(b: Int, a: Data): Int = b + a.i
  def merge(b1: Int, b2: Int): Int = b1 + b2
  def finish(r: Int): Int = r
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}.toColumn()

val ds: Dataset[Data] = ...
val aggregated = ds.select(customSummer)

Ch 16,源码

摘要

1,cache/persist,内存或者磁盘,会保存DAG,丢失能重新计算
2,checkpoint(lazy | eager),磁盘,会截断DAG;需要手动设置checkpoint目录
3,示例为了验证缓存的作用,使用很多的转换、聚合(增多消耗)
4,提到了partition相关的优化:coalesce/repartition/repartitionByRange
5,实现了数据生成器程序,支持自定义Schema的数据:Name/Title/Rating/Year…

Ch 17,源码

摘要

1,介绍了Delta Lake,下一代分布式数据库(我觉得有戏)
2,列的处理会很频繁(繁琐),spark内置的支持也很多

// 条件处理
withColumn("confidence_level",
  when(col("confidence") <= low, "low").
  when(col("confidence") < high, "nominal").
  otherwise("high")
).
// 时间处理
withColumn("acq_time_hr", expr("int(acq_time / 100)")).
withColumn("acq_time_min", expr("acq_time % 100")).
withColumn("acq_time2", unix_timestamp(col("acq_date"), "yyyy-MM-dd")).
withColumn("acq_time3", expr("acq_time2 + acq_time_min * 60 + acq_time_hr * 3600")).
withColumn("acq_datetime", from_unixtime(col("acq_time3"), "yyyy-MM-dd HH:mm:ss")).

3,提及了各种云存储:s3/google cloud storage

Ch 18

摘要

1,提及了资源管理方案:Spark内置、yarn、k8s(2.3开始支持k8s)
2,提及了安全(DataFrame是Session隔离的),数据网络传输进行加密、UI用户认证。