注意:读写文件可以分为
- 读本地文件和hdfs文件,区别在于使用的协议不同,读取的路径也不同,容易混淆。
- 写文件可以分为保存为多个文件和一个文件。spark是分布式的,一般都是写成多个文件(因此路径参数是目录路径,而不是文件名),特殊情况需要写单个文件。
读写
读写本地文件
使用 file:/// ,三个斜杠表示自动搜索路径。对分布式友好,否则要记住ip和端口了。后面提到的hdfs:///和hdfs://ip:port是等价的,但显然hdfs:///更好用。 ```python读
line=sc.textFile(“file:///home/data/wordcount/word.txt”)
保存,注意是路径名而不是文件名。
textFile.saveAsTextFile(“file:///home/data/wordcount/output”)
<a name="oI0qo"></a>### 读写HDFS文件文件路径写法很多,但不推荐简写,在此不做介绍。```python# 读 得到 RDDtextFile = sc.textFile("hdfs:///user/data/word.txt")textFile = sc.textFile("/user/data/word.txt")textFile = sc.textFile("hdfs://192.168.200.72:9090/user/data/word.txt")# 读 得到 DataFrametextFile = spark.read.text("hdfs:///user/data/word.txt")# 保存textFile.saveAsTextFile("hdfs:///user/data/output")
txt文件
- 读 sc.textFile(path)
- 写 textFile.saveAsTextFile(path)
- 也可以读写 csv和json为rdd,再转为dataframe
csv
```python读 1
df = spark.read.csv(‘hdfs:///python/test_support/sql/ages.csv’)
读 2
rdd = sc.textFile(‘hdfs:///python/test_support/sql/ages.csv’) df2 = spark.read.csv(rdd)
spark.read.json()其他部分参数
encoding=None sep=None 分隔符默认 ‘,’ header=None 使用第一行作为列名。 如果设置为None,则使用默认值false。 还可以对数据做格式化,包括时间、空值、特殊符号 …其余参数看官方文档 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
写
df.write.csv(path)
<a name="deSX0"></a>### json```python# 读 1df1 = spark.read.json('hdfs:///python/test_support/sql/people.json')# 读 2rdd = sc.textFile('hdfs:///python/test_support/sql/people.json')df2 = spark.read.json(rdd)# spark.read.json()其他部分参数encoding=None还可以对数据做格式化,包括时间、空值、特殊符号...其余参数看官方文档https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json# 写df.write.json(path)
parquet
# 读df = spark.read.parquet('hdfs:///python/test_support/sql/parquet_partitioned')# 写df.write.parquet(path)
统一读写的API
读 : spark.read.format(source).load()
df = spark.read.format('json').load('hdfs:///python/test_support/sql/people.json')df = spark.read.format('parquet').load('hdfs:///python/test_support/sql/people')df = spark.read.format('csv').load('hdfs:///python/test_support/sql/people.csv')
写:
df.write.format('parquet').mode("append").save(os.path.join(path)
保存为单文件
经常会有需要保存为一个单独的文件,而不是目录。但spark的保存格式只有目录,幸运的是,当我们merge所有数据到一个文件保存后,该文件的命名一定是以 part-00000开头的,所以可以利用这个特征来移动文件,达到保存为单文件的目的。使用coalesce(1)即可
data.coalesce(1).write.format('json').save('file:///user/data')spark没有办法直接操作hadoop的目录,但可以使用shell在外头进行hdfs操作。
