注意:读写文件可以分为
- 读本地文件和hdfs文件,区别在于使用的协议不同,读取的路径也不同,容易混淆。
- 写文件可以分为保存为多个文件和一个文件。spark是分布式的,一般都是写成多个文件(因此路径参数是目录路径,而不是文件名),特殊情况需要写单个文件。
读写
读写本地文件
使用 file:/// ,三个斜杠表示自动搜索路径。对分布式友好,否则要记住ip和端口了。后面提到的hdfs:///和hdfs://ip:port是等价的,但显然hdfs:///更好用。 ```python读
line=sc.textFile(“file:///home/data/wordcount/word.txt”)
保存,注意是路径名而不是文件名。
textFile.saveAsTextFile(“file:///home/data/wordcount/output”)
<a name="oI0qo"></a>
### 读写HDFS文件
文件路径写法很多,但不推荐简写,在此不做介绍。
```python
# 读 得到 RDD
textFile = sc.textFile("hdfs:///user/data/word.txt")
textFile = sc.textFile("/user/data/word.txt")
textFile = sc.textFile("hdfs://192.168.200.72:9090/user/data/word.txt")
# 读 得到 DataFrame
textFile = spark.read.text("hdfs:///user/data/word.txt")
# 保存
textFile.saveAsTextFile("hdfs:///user/data/output")
txt文件
- 读 sc.textFile(path)
- 写 textFile.saveAsTextFile(path)
- 也可以读写 csv和json为rdd,再转为dataframe
csv
```python读 1
df = spark.read.csv(‘hdfs:///python/test_support/sql/ages.csv’)
读 2
rdd = sc.textFile(‘hdfs:///python/test_support/sql/ages.csv’) df2 = spark.read.csv(rdd)
spark.read.json()其他部分参数
encoding=None sep=None 分隔符默认 ‘,’ header=None 使用第一行作为列名。 如果设置为None,则使用默认值false。 还可以对数据做格式化,包括时间、空值、特殊符号 …其余参数看官方文档 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
写
df.write.csv(path)
<a name="deSX0"></a>
### json
```python
# 读 1
df1 = spark.read.json('hdfs:///python/test_support/sql/people.json')
# 读 2
rdd = sc.textFile('hdfs:///python/test_support/sql/people.json')
df2 = spark.read.json(rdd)
# spark.read.json()其他部分参数
encoding=None
还可以对数据做格式化,包括时间、空值、特殊符号
...其余参数看官方文档
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
# 写
df.write.json(path)
parquet
# 读
df = spark.read.parquet('hdfs:///python/test_support/sql/parquet_partitioned')
# 写
df.write.parquet(path)
统一读写的API
读 : spark.read.format(source).load()
df = spark.read.format('json').load('hdfs:///python/test_support/sql/people.json')
df = spark.read.format('parquet').load('hdfs:///python/test_support/sql/people')
df = spark.read.format('csv').load('hdfs:///python/test_support/sql/people.csv')
写:
df.write
.format('parquet')
.mode("append")
.save(os.path.join(path)
保存为单文件
经常会有需要保存为一个单独的文件,而不是目录。但spark的保存格式只有目录,幸运的是,当我们merge所有数据到一个文件保存后,该文件的命名一定是以 part-00000开头的,所以可以利用这个特征来移动文件,达到保存为单文件的目的。使用coalesce(1)即可
data.coalesce(1).write.format('json').save('file:///user/data')
spark没有办法直接操作hadoop的目录,但可以使用shell在外头进行hdfs操作。