2.1 程序执行入口 SparkContext对象
Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)
只有构建出SparkContext, 基于它才能执行后续的API调用和计算
本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来
2.2RDD的创建
RDD的创建主要有2种方式
通过并行化集合parallelize创建 ( 本地对象转分布式RDD )
# coding:utf8# 导入Spark的相关包from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 演示通过并行化集合的方式去创建RDD, 本地集合 -> 分布式对象(RDD)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法, 没有给定 分区数, 默认分区数是多少? 根据CPU核心来定print("默认分区数: ", rdd.getNumPartitions())rdd = sc.parallelize([1, 2, 3], 3)print("分区数: ", rdd.getNumPartitions())# collect方法, 是将RDD(分布式对象)中每个分区的数据, 都发送到Driver中, 形成一个Python List对象# collect: 分布式 转 -> 本地集合print("rdd的内容是: ", rdd.collect())
默认分区数: 12分区数: 3rdd的内容是: [1, 2, 3]
读取外部数据源 (读取文件)textFile
# 读取本地文件数据file_rdd1 = sc.textFile("../data/input/words.txt")print("默认读取分区数: ", file_rdd1.getNumPartitions())print("file_rdd1 内容:", file_rdd1.collect())# 加最小分区数参数的测试file_rdd2 = sc.textFile("../data/input/words.txt", 3)# 最小分区数是参考值, Spark有自己的判断, 你给的太大Spark不会理会file_rdd3 = sc.textFile("../data/input/words.txt", 100)print("file_rdd2 分区数:", file_rdd2.getNumPartitions())print("file_rdd3 分区数:", file_rdd3.getNumPartitions())# 读取HDFS文件数据测试hdfs_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")print("hdfs_rdd 内容:", hdfs_rdd.collect())
wholeTextFile
读取文件的API, 有个适用场景: 适合读取一堆小文件
这个API是小文件读取专用
用法:
sparkcontext.wholeTextFiLes(参数1,参数2)
# 参数1,必填,文件路径 支持本地文件 支持HDFS 1
持一些比如S3协议
# 参数2,可选,表示最小分区数量。
# 注意:参数2 话语权不足,这个API 分区数量最多
能开到文件数量
这个API偏向于少量分区读取数据
因为,这个API表明了自己是小文件读取专用,那么文件的数据很小
分区很多,导致shuffle的几率更高.所以尽量少分区读取数据.
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 读取小文件文件夹rdd= sc.wholeTextFiles("../data/input/tiny_files")print(rdd.map(lambda x:x[1]).collect())
