一、 基本使用

首先需要从官网 下载我们需要的程序,并进行解压。解压完成后我们进入目录下可以通过如下指令进入到对应语言
的控制台中进行操作。

  • Python:./bin/pyspark
  • Python:./bin/pyspark2
  • Scala:./bin/spark-shell
  • SQL:./bin/spark-sql

如果是在Windows系统下则可以将该目录放置在Path下以便临时的进行调用。为了演示其具体的使用这里我们通过模拟
的数据进行基本的代码编写,且统一使用Python语言。

  1. myRange = spark.range(1000).toDF("number")
  2. divisBy2 = myRange.where("number % 2 = 0")
  3. divisBy2

如果我们需要将应用进行提交则可以通过以下方式提交Scala与Python程序,具体如下。

  1. spark-submit --class org.apache.spark.examples.SparkPi --master local ./examples/jars/spark-examples_2.11-2.2.0.jar 10
  2. spark-submit --master local ./examples/src/main/python/pi.py 10

1. 基本操作

为了能够在本地进行开发此处就通过手动设置 SparkConf 配置模拟本地独立节点模式,并通过其配置打开对应的 SparkSession
对象以便进行操作。

  1. SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local"); //.setMaster("spark://ip:port");
  2. SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();

为了能够便于后续的操作,我们将使用本地数据文件进行操作,其数据文件内容如下。

  1. {"name":"Michael", "salary":3000}
  2. {"name":"Andy", "salary":4500}
  3. {"name":"Justin", "salary":3500}
  4. {"name":"Berta", "salary":4000}

完成以上数据准备工作后我们就可以进行读取加载,并同时输出所有数据。

  1. Dataset<Row> df = session.read().json("data/employees.json");
  2. df.show();
  3. session.stop();

用于其通过运行时识别具体的数据模型,读者可以通过 df.printSchema() 显示其数据模型。如果读者希望在运行时只读取
符合预先要求的数据模型也可以手动指定对应的数据模型。

  1. StructType structType = DataTypes.createStructType(Arrays.asList(
  2. DataTypes.createStructField("name", DataTypes.StringType, true),
  3. DataTypes.createStructField("salary", DataTypes.LongType, true)
  4. ));
  5. Dataset<Row> df = session.read().schema(structType).json("data/employees.json");

上述代码我们可以看到其具体类型主要通过 DataTypes 下指定,最后的Boolean则是用来约束数据是否可为Nullable。做好上述
所有工作后我们就可以进入到正式章节,将介绍其各类API的使用方式。

列表达式

在介绍具体 select 前我们需要了解其实际类 Column 类,其主要用于表示列以及各类相关的计算操作等,具体函数读者可以通过
阅读 functions 下的各种方法进行学习,这里我们主要通过几个简单的例子来进行介绍。

  1. Column col = functions.col("name");
  2. Column expr = functions.expr("concat(name, '_ed')");
  3. df.select(functions.concat(col,functions.lit("_ed"))).show();
  4. df.select(expr).show();

上述代码的最终效果是一致的,均是在实际输出的数据后面追加_ed字符串。这里我们可以看到 expr 类似于我们平时使用SQL的方
式而另一种则是通过具体的函数进行操作最后,最后通过 select 进行调用即可。

select与selectExpr函数

通过上述的例子我们可以看到关于如何读取需要的列数据,如果每次读取都需要实例化 Column 必然是不现实的,所以其函数也提
供了更加便携的方式,让我们直接进行使用,下述将介绍具体如何使用。

  1. df.select("name").show(2); // 等同于 select name from dfTable limit 2
  2. df.select(functions.expr("name as sname")).show(2); // 等同于 select name as sname from dfTable limit 2
  3. df.selectExpr("name as sname").show(2); // 同上
  4. df.selectExpr("avg(salary)", "count(distinct(name))").show(); // 等同select avg(salary),count(distinct name) from dfTable
  5. df.select(functions.expr("name").equalTo("Andy")).show(); // 等同判断name是否为Andy

上述代码我们多进行了几个相关联例子的演示,就是为了体现其两者的差距,以及如何使用具体函数实现相同的最终效果。

  • lit函数

如果读者希望在结果中输出自己自定义的值,那么就需要特殊的手段进行进行输出,所以这里我们就需要使用到 lit 函数,具体的
适合用如下。

  1. df.select(functions.expr("*"), functions.lit("true").as("existed")).show();

列配置

除了通过SQL以及functions函数针对列进行配置,其 Dataset 对象也提供了相关函数以便我们针对列进行配置,下述我们将通过
一个完整的例子进行演示如何使用相关函数实现我们的目标。

  1. // 判断salary列,并将结果做为greater列(true/false)
  2. df.withColumn("greater", functions.expr("salary > 3500")).show();
  3. // 将name列重命名为subname列,同时过滤掉小于3500的数据
  4. df.withColumnRenamed("name", "subname").filter(x -> x.getLong(1) > 3500).show();
  5. // 删除name列,将其他列进行显示
  6. df.drop("name").show();
  7. // 将salary列强制转换为string类型,同时重命名为salarystr
  8. df.withColumn("salarystr", functions.col("salary").cast(DataTypes.StringType)).show();

上述代码中我们展示了各种针对列的操作,同时还额外介绍了其他函数的使用方式。

过滤数据

如果我们需要过滤我们不需要的数据,可以通过其提供的 where 函数实现对应功能,这部分可以采用类SQL的语法编写,也可以
通过functions的函数实现相同的功能,所以下面的例子我们将展示利用SQL的语法与函数的方式对数据进行过滤。

  1. df.where("salary > 3500").show();
  2. df.where(functions.col("name").equalTo("Andy")).show();
  3. df.where("name = 'Andy'").show();

去重

如果读者希望针对结果数据的组合进行去重,则可以利用本章的函数实现。比如针对目的地与起始地组合,我们只希望看到不同的
组合即可,就可以利用该函数实现我们希望的功能。

  1. df.select("salary").distinct().show();

随机抽样&分割

提供了可从数据集中随机抽取一个数据或根据比率拆分按比的数据分片,对于抽样数据可通过 withReplacement 决定是否放回样
本,以决定是否会抽取到重复样本。

  1. // 0.2表示随机抽取样本数据中的20%的样本
  2. df.sample(false, 0.2).show();
  3. // 将数据按照 25% 与 75% 比例拆分
  4. Dataset<Row>[] dts = df.randomSplit(new double[] { 0.25, 0.75 });
  5. dts[0].show();

联合操作

有时候我们需要将多个结果数据进行联合,当然联合的前提就是这些数据的模式必须一致,否则将无法联合。而该过程我们仅仅
需要使用 union 函数即可实现。

  1. dts[0].union(dts[1]).show();

排序

通过上述的几个函数我们可以得知使用SQL或函数就可以实现我们期望的功能,当然这里需要额外提出的就是考虑到存在 NULL
这种情况,所以Spark也提供了诸如 asc_nulls_firstasc_nulls_lastdesc_nulls_firstdesc_nulls_last 之类
函数使NULL排在前排或后排。

  1. df.sort("salary").show();
  2. df.orderBy(functions.col("salary").desc()).show();
  3. df.orderBy(functions.desc("salary")).show();
  4. df.orderBy(functions.desc_nulls_first("salary")).show();

出于性能优化的目的,最好是在进行别的转换之前,先对每个分区进行内部排序。可以使用 sortWithinPartitions 方式实现这一目的。

limit函数

如果读者需要获取数据的前面若干条,可以通过该函数实现数据的抓取。

  1. df.limit(4).show();

划分和合并

为了提高检索性能,有时候需要根据一些经常过滤的列对数据进行分区,控制跨集群数据的物理布局,包括分区方案和分区数。不管是否
有必要,重新分区都会导致数据的全面洗牌。如果将来的分区数大于当前的分区数,或者当你想要基于某一组特定列来进行分区时,通常
只能重新分区。

  1. df.rdd().getNumPartitions();
  2. df.repartition(5);
  3. df.repartition(functions.col("name"));

2. 根据数据类型处理

数值类型

接下来的章节我们将针对不同数据类型的处理方式进行针对性的介绍,保障在实际开发过程中能够根据目标字段的类型选择更佳的方式
进行任务开发。

  1. // 等同于 select pow(salary, 2) + 4 as powSalary from dfTable
  2. Column col = functions.pow(functions.col("salary"), 2).plus(4).as("powSalary");
  3. df.select(col).show();
  4. df.selectExpr("pow(salary, 2) + 4 as pwSalary").show(); // 效果同上
  5. // round为向上取整,bround为向下取整
  6. df.select(functions.round(functions.lit(2.5)), functions.bround(functions.lit(2.5))).show();
  7. // monotonicallyIncreasingId将可以按照数据顺序从0开始自增
  8. df.select(functions.monotonicallyIncreasingId()).show();
  9. df.selectExpr("monotonically_increasing_id()", "*").show();

字符串类型

其提供的主要函数说明如下。

  • functions.initcap: 遇到首字母大写则增加空格拆分
  • functions.lower: 将字符串小写
  • functions.upper: 将字符串大写
  • functions.lpad: 字符串左侧增加空格
  • functions.ltrim: 删除字符串左侧空格
  • functions.rpad: 字符串右侧增加空格
  • functions.rtrim: 删除字符串右侧空格
  • functions.trim: 删除字符串两侧空格

注意,如果lpad或rpad方法输入的数值参数小于字符串长度,它将从字符串的右侧删除字符。

正则表达式

其提供的主要函数如下。

  • functions.regexp_extract: 提取匹配的值
  • functions.regexp_replace: 替换匹配的值
  • functions.translate: 按字符级别进行替换
  • functions.col(“name”).contains: 判断列是否包含指定字符串

日期时间类型

其提供的主要函数如下。

  • functions.current_date: 当前日期
  • functions.current_timestamp: 当前时间戳
  • functions.date_sub: 减少天数
  • functions.date_add: 添加天数
  • functions.datediff: 计算两个日期相差天数
  • functions.months_between: 计算两个日期相差的月份
  • functions.to_date: 将指定字符串转换为日期格式
  • functions.to_timestamp: 将指定字符串转换为时间格式

处理空值

为了处理NULL这种特殊的空值类型,首先我们通过 coalesce 选择待选列中第一个不为NULL的列值,具体使用
方式如下。

  1. df.select(functions.coalesce(functions.col("name"), functions.col("salary"))).show();

除了根据列是否为NULL输出对应值以外,还可以通过 drop 过滤对应列。若指定”any”做为参数,当存在一个值
是NULL时,就删除该行;若指定”all”为参数,只有当所有值为NULL或者Nan时才能删除该行,当然也可以指定对
应列。

  1. df.na().drop("all");
  2. df.na().drop("all", new String[] {"name", "salary"});

当然也可以根据类型、列名进行替换。

  1. df.na().drop("all");
  2. df.na().drop("all", new String[] {"name", "salary"});
  3. df.na().fill("null str");
  4. df.na().fill(3, new String[] { "salary" });

复杂类型

在实际处理中也会遇到结构体,结构体等于在DataFrame中的DataFrame,我们通过普通的数据创建一个
结构体。并从结构体中获取我们所需要的数据。

  1. df.selectExpr("(name, salary) as complex").select("complex.name").show();
  2. df.select(functions.struct(functions.col("name"), functions.col("salary")).alias("complex")).select("complex.name").show();

上述代码我们通过两种方式列举了具体的操作,最终我们都是通过”.”来访问结构体中的具体字段,当然读者也可以获
取该结构体下的所有数据,具体写法如下。

  1. df.select("complex.*").show();

处理完结构体后我们还可以根据分隔符将字符串拆分为数组类型并进行访问,我们可以先通过 split 拆分然后通过
下标索引进行访问。

  1. df.select(functions.split(functions.col("name"), " ").alias("array_col")).selectExpr("array_col[0]").show();
  2. df.selectExpr("split(name, ' ') as array_col").selectExpr("array_col[0]").show();

除了访问数组具体数据以外,还可以查询数组的长度以及判断对应数据是否存在于数组中。

  1. df.selectExpr("size(split(name, ' ')) as array_size").show();
  2. df.selectExpr("array_contains(split(name, ' '), 'B')").show();

与数组对应的就是字段类型,这可以通过 map 实现,并且通过下标的方式进行访问,对于不存在Key的数据则返回NULL。

  1. df.select(functions.map(functions.col("name"), functions.col("salary")).alias("map_col")).selectExpr("map_col['Berta D']").show();
  2. df.selectExpr("map(name, salary) as map_col").selectExpr("map_col['Berta D']").show();

最后我们需要处理的常用结构就是数据为JSON字符串的背景下,如果将JSON字符串转换为对应类型从而可以获取其中我
们所需要的具体值,下面我们用模拟的数据进行访问,并提供如何从结构体转换为JSON字符串格式。

  1. Dataset<Row> jsonDF = session.range(1).selectExpr("\'{\"myJSONKey\": {\"myJSONValue\": [1, 2, 3]}}\' as jsonString");
  2. jsonDF.select(functions.get_json_object(functions.col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("column"),
  3. functions.json_tuple(functions.col("jsonString"), "myJSONKey")).show();

上述我们通过 get_json_object 直接查询JSON对象,该方式可以通用于很多数据结构。如果JSON数据仅有一层嵌套则
可以通过 json_tuple 进行访问。紧接着就是如果从结构体转换为JSON字符串。

  1. df.selectExpr("(name, salary) as map_col").select(functions.to_json(functions.col("map_col"))).show();

用户自定义函数

很多时候提供的函数并不能满足我们所有的需求,那么就需要通过用户扩展的自定义函数进行各类常用算法的增加。下述
我们将采用两种注册机制的方式来进行讲述,主要是通过函数调用以及字符串表达式调用时注册的方式存在差异性。

  1. UDF1<Long, Long> fnc = x -> x * x;
  2. UserDefinedFunction udf = functions.udf(fnc, DataTypes.LongType);
  3. df.select(udf.apply(functions.col("salary"))).show();
  4. session.udf().register("power2", udf);
  5. df.selectExpr("power2(salary)").show();

DataSet强类型

如果用户需要使用DataSet强类型以保证编译时的正确性,则可以采用如下的方式。

  1. session.read().schema(structType).json("sparkdemo/data/employees.json").as(Encoders.javaSerialization(DataSourceSpark.class));