一、数据源

1. API结构

Spark采用了的统一规范的API使我们访问与写入数据源中,对去读取数据我们采用如下格式。

  1. DataFrameReader.format().option("key", "value").schema().load()

format事可选的,默认情况下Spark将使用Parquet格式,options使你能配置键值对来参数化读取数据的方式。最后,如果数据源包含某种schema或你想
使用模式推理,则可以选择制定schema。对于option存在通用的参数,即读取模式,其默认采用permissive模式,其他模式介绍如下。

  • permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中;
  • dropMalformed:删除包含错误格式记录的行;
  • failFast:遇到错误格式的记录后立即返回失败;

介绍完读取的API结构,我们接着介绍写入的API格式。

  1. DataFrameReader.format(...).options(...).partitionBy(...).bucketBy(...).sortBy(...).save()

format是可选的,默认情况下Spark将使用Parquet格式,option仍用于配置写出数据的方法,PartitionBy,bucketBy和sortBy
仅适用基于文件的数据源,你可以使用这些方法来控制写出目标文件的具体结构。其也有通用的参数,即写入模式,其默认采
用errorIfExists模式。其他模式的介绍如下。

  • append:将输出文件追加到目标路径已存在的文件上或目录的文件列表;
  • overwrite:将完全覆盖目标路径中已存在的任何数据;
  • errorIfExists:如果目标路径已存在数据或文件,将抛出错误并返回写入操作失败;
  • ignore:如果目标路径已存在数据文件,则不执行任何操作;

介绍完基本API后下面将根据具体读取的格式介绍其参数类型以及对应的API使用方式。

2. CSV文件

首先介绍的就是其可用的选项参数。

模式 参数名 参数类型 默认值 说明
读写 sep string , 用作每个字段和值的分隔符
读写 header bool false 申明文件中第一行是否为列的标题
读写 escape string \ 用于转义的字符
读写 inferSchema bool false 读取文件时Spark是否自动推断列类型
读写 ignoreLeadingWhiteSpace bool false 是否应跳过读取值中的前导空格
读写 ignoreTrailingWhiteSpace bool false 是否应跳过读取值中的尾部空格
读写 nullValue string “” 声明文件中什么字符表示null值
读写 nanValue string NaN 声明什么字符表示NaN或缺失字符
读写 positiveInf string Inf 声明什么字符表示正无穷大
读写 negativeInf string -Inf 声明什么字符表示负无穷大
读写 Compression None,uncompressed,bzip2,deflate,gzip,lz4,snappy none 压缩算法


读取操作方式如下:

  1. sparkSession.read().format("csv").option("header","true").option("mode","FAILFAST").load("/a.csv");

写入操作如下:

  1. csvfil.write().format("csv").option("header","true").option("mode","overwrite").save("save.csv");

3. JSON文件

模式 参数名 参数类型 默认值 说明
读写 Compression none,uncompressed,bzip2,deflate,gzip,lz4,snappy none 压缩格式
读写 dateFromat string yyyy-MM-dd 日期格式
读写 timestampFormat string yyyy-MM-dd’T’HH:mm:ss.SSSZZ 时间格式
读写 primitiveAsString bool false 将原始值推断为字符串类型

读取操作方式如下:

  1. sparkSession.read().format("json").option("mode","failfast").load("data.json");

写入操作方式如下:

  1. jsonFile.write().format("json").option("mode","overwrite").save("output.json");

4. Parquet文件

Parquet是一种开源的面向列的数据存储格式,它提供了各种存储优化,尤其适合数据分析。做为默认的格式,其可供的选项非常少,基本都需要进行配置即可读取。

  1. sparkSession.read().format("parquet").load("data.parquet");
  2. parquetFile.write().format("parquet").save("ouput.parquet");

5. ORC文件

ORC是为Hadoop作业而设计的描述,类型感知的列存储文件格式。具体的使用方式如下。

  1. spartSession.read().format("orc").load("data.orc");
  2. orcFile.write().format("orc").mode("overwrite").save("output.orc");

6. SQL数据库

由于其采用了基于JDBC的方式连接数据库,所以只要是兼容JDBC均可使用该方式进行数据读取。首先在Spark类路径
中为指定的数据库包含JDBC驱动,并为连接驱动器提供合视的JAR包。

  1. ./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

为了能够访问数据库,需要进行选项设置。

属性名 说明
Url 表示要连接的JDBC URL
dbtable 表示要读取的表
driver JDBC驱动器的类名
partitionColumn 分区的列
lowerBound,upperBound 分区跨度
numPartitions 最大分区数
fetchsize 每次读取多少条记录
batchsize 批处理大小,指定每次写入多少记录
isolationLevel 数据库的事务隔离级别,可选NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE
createTableOptions 设定数据库的表和分区选项
createTableColumnTypes 表示创建表时使用的数据库列数据类型

下面我们以连接Postgresql为例说明。

  1. sparkSession.read().format("jdbc").option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://server").option("dbtable","schema.tablename").option("user","username"),option("password","my-password").load();
  2. data.write().mode("overwrite").jdbc("jdbc:sqlite://new-sqlite.db", tablename, props);

当然为了提高查询效率,Spark将会采用查询下推的方式,在数据库层面过滤数据。当然读者也可以自行手动设置。

  1. String pushdownQuery = "(SELECT * FROM table) aAS table";
  2. sparkSession.read().format("jdbc").option("url", url).option("dbtable", pushdownQuery).option("driver", "driver").load();

7. 并行写数据

写数据涉及的文件数量取决于DataFrame的分区数。默认情况是每个数据分片都会有一定的数据写入,这意味着虽然我们指定的是一个“文件”,但实际上它是由一个文件夹中的多个文件组成,每个文件对应着一个数据分片。

  1. data.repartition(5).write().format("csv").save("multiple.csv");

不仅仅根据以上分区数进行划分还可以支持写入数据时控制存储数据以及存储数据的位置。将文件写出时,可以将列编码做为文件夹,
这使得你在之后读取时可跳过大量数据,只读入与问题相关的列数据而不必扫描整个数据集。

  1. data.limit(10).write().mode("overwrite").partitionBy("DEST_COUNTRY_NAME").save("data.parquet");

最后一种写入方式为数据分桶,可以使用该方法控制写入每个文件的数据。具有相同桶ID的数据将放置到一个物理分区中,这样就可以避免在稍后读取数据时进行洗牌。

  1. data.write().format("parquet").mode("overwrite").bucketBy(numberBuckets, columnToBucketBy).saveAsTable("tableName");

二、RDD操作

这里建议用户使用结构化API,尽量避免使用低级API,特别是弹性分布式数据集(RDD),建议在仅有以下特殊情况下使用这类API。

  • 当在高级API中找不到所需的功能时,例如要对集群中数据的物理放置进行非常严格的控制时;
  • 当需要维护一些使用RDD编写的遗留代码库时;
  • 当需要执行一些自定义共享变量操作时;

在开始进入正题前我们需要准备进行举例的数据。

  1. SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local");
  2. SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
  3. String[] myCollection = "Spark the definitive guide : big data processing made simple".split(" ");
  4. JavaRDD<String> words = JavaRDD.fromRDD(session.sparkContext().parallelize(JavaConverters.asScalaIteratorConverter(Arrays.asList(myCollection).iterator()).asScala().toSeq(), 2,
  5. ClassManifestFactory.classType(String.class)), ClassManifestFactory.classType(String.class));

完成基本的数据准备工作之后我们就可以进入到整体,如何利用低级API完成分析任务。

1. 转换操作

  • distinct

    1. long len = words.distinct().count();
  • filter

    1. List<String> coll = words.filter((x) -> x.startsWith("S")).collect();
  • map

    1. List<Integer> lens = words.map((x) -> x.length()).collect();
  • flatmap

    1. List<char[]> chars = words.flatMap((x) -> Arrays.asList(x.toCharArray()).iterator()).collect();
  • sort

    1. List<String> coll = words.sortBy((x) -> x.length(), true, 2).collect();
  • random split

通过该函数可以将一个RDD随机却分成若干哥RDD,这些RDD组成一个RDD的数组返回。

  1. JavaRDD<String>[] rdds = words.randomSplit(new double[]{ 0.5, 0.5 });

2. 动作操作

  • reduce

该方法指定一个函数将RDD中的任何类型的值规约为一个值。

  1. String result = words.reduce((x, y) -> x + y);
  • count

    1. words.countApprox(400, 0.95);
    2. words.countApproxDistinct(0.05);
    3. words.countByValue();
    4. words.countByValueApprox(400, 0.95);
  • othre

    1. String first = words.first();
    2. List<String> take = words.take(3);

3. 映射

下面我们将介绍基于键值的处理方式,首先我们需要将上述准备的数据按照键进行拆分。

  1. JavaPairRDD<Character, String> kv = words.keyBy(x -> x.toLowerCase().charAt(0));

在有一组键值对之后,读者可以对他们进行操作。

  1. List<Tuple2<Character, String>> map = kv.mapValues(x -> x.toUpperCase()).collect();
  2. List<Tuple2<Character, char[]>> result = kv.flatMapValues((word) -> Arrays.asList(word.toCharArray())).collect();

当然我们还可以单独提取其中键或值。

  1. List<Character> keys = kv.keys().collect();
  2. List<String> Values = kv.values().collect();

或者跟胡字符查询其中的值。

  1. List<String> look = kv.lookup('s');

4. 聚合

  • countByKey

    1. Map<Character, Long> count = kv.countByKey();
  • reduceByKey

    1. Map<Character, String> result = kv.reduceByKey((x, y) -> x + y).collectAsMap();
  • aggregate

此函数需要一个null值或一个起始值,并需要你指定两个不同的函数,第一个函数执行分区内聚合,第二个执行分区间聚合。

  1. Long length = kv.aggregate(0l, (before, value) -> before + value._2().length() , (x1, x2) -> x1 + x2);

当然考虑到其可能出现的性能问题,可以通过下述函数实现相同的功能。

  1. Long length = kv.treeAggregate(0l, (before, value) -> before + value._2().length() , (x1, x2) -> x1 + x2, 3);
  • aggregateByKey

    1. Map<Character, Long> result = kv.aggregateByKey(0l, (before, value) -> before + value.length() , (x1, x2) -> x1 + x2).collectAsMap();
  • combineByKey

    1. Map<Character, char[]> map = kv.combineByKey(x -> x.toCharArray(), (col, v) -> ArrayUtils.addAll(col, v.toCharArray()), (x1, x2) -> ArrayUtils.addAll(x1, x2)).collectAsMap();

5. 连接

RDD的连接与结构化API中的连接有很多相同之处,他们都遵循相同的基本格式,包括执行连接操作的两个RDD,以及输出分区数或自定义分区函数。

  1. JavaPairRDD<Character, Double> keyedDou = kv.mapValues(x -> new Random().nextDouble());
  2. Map<Character, Tuple2<String, Double>> join = kv.join(keyedDou).collectAsMap();

6. 控制分区

coalesc有效的折叠同一工作节点上的分区,以便在重新分区时避免数据洗牌。而Repartition操作将对数据进行重新分区,跨节点的分区会执行洗牌操作。最后就是repartitionAndSortWithinPartitions操作将对数据重新分区,并指定每个输出分区的顺序。

如果读者需要自定义分区,可以先参考自身提供的两类分区即HashPartitioner基于哈希的分区以及RangePartitioner根据数值范围分区这两类的分区函数,如果上述的分区无法满足用户的特定需求则可以通过实现Partitioner实现自定义的分区函数。

7. 广播变量

通过广播变量可以在集群上有效地共享不变量,而不需要将其封装到函数中去。在驱动节点上逝用变量的一般方法是简单地在函数闭包中引用它,但
这种方式效率很低,尤其是对于大数据变量来说。原因在于,当在闭包中使用变量时,必须在工作节点上执行多次反序列化。为此需要下述将介绍如
何使用该方式。

  1. Map<String, Integer> map = new HashMap();
  2. session.sparkContext().broadcast(map, ClassManifestFactory.classType(map.getClass()));

8. 累加器

累加器提供一个累加用地变量,Spark集群可以以执行方式对其进行安全更新,你可以用它来进行调试或创建低级聚合。累加器仅支持由满足交换律喝集合律地操作进行累加地变量,因此对累加器地操作可以被高效并行,你可以使用累加器实现计数器或求和操作。具体使用方式如下。

  1. // 未命名累加器
  2. LongAccumulator accUnnamed = new LongAccumulator();
  3. session.sparkContext().register(accUnnamed);
  4. // 名命累加器
  5. LongAccumulator accNamed = session.sparkContext().longAccumulator("acc");
  6. LongAccumulator regNamed = new LongAccumulator();
  7. session.sparkContext().register(regNamed, "acc2");
  8. accNamed.add(2l);