注意:读写文件可以分为

  • 读本地文件和hdfs文件,区别在于使用的协议不同,读取的路径也不同,容易混淆。
  • 写文件可以分为保存为多个文件和一个文件。spark是分布式的,一般都是写成多个文件(因此路径参数是目录路径,而不是文件名),特殊情况需要写单个文件。

    读写

    读写本地文件

    使用 file:/// ,三个斜杠表示自动搜索路径。对分布式友好,否则要记住ip和端口了。后面提到的hdfs:///和hdfs://ip:port是等价的,但显然hdfs:///更好用。 ```python

    line=sc.textFile(“file:///home/data/wordcount/word.txt”)

保存,注意是路径名而不是文件名。

textFile.saveAsTextFile(“file:///home/data/wordcount/output”)

  1. <a name="oI0qo"></a>
  2. ### 读写HDFS文件
  3. 文件路径写法很多,但不推荐简写,在此不做介绍。
  4. ```python
  5. # 读 得到 RDD
  6. textFile = sc.textFile("hdfs:///user/data/word.txt")
  7. textFile = sc.textFile("/user/data/word.txt")
  8. textFile = sc.textFile("hdfs://192.168.200.72:9090/user/data/word.txt")
  9. # 读 得到 DataFrame
  10. textFile = spark.read.text("hdfs:///user/data/word.txt")
  11. # 保存
  12. textFile.saveAsTextFile("hdfs:///user/data/output")

txt文件

  • 读 sc.textFile(path)
  • 写 textFile.saveAsTextFile(path)
  • 也可以读写 csv和json为rdd,再转为dataframe

    csv

    ```python

    读 1

    df = spark.read.csv(‘hdfs:///python/test_support/sql/ages.csv’)

读 2

rdd = sc.textFile(‘hdfs:///python/test_support/sql/ages.csv’) df2 = spark.read.csv(rdd)

spark.read.json()其他部分参数

encoding=None sep=None 分隔符默认 ‘,’ header=None 使用第一行作为列名。 如果设置为None,则使用默认值false。 还可以对数据做格式化,包括时间、空值、特殊符号 …其余参数看官方文档 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

df.write.csv(path)

  1. <a name="deSX0"></a>
  2. ### json
  3. ```python
  4. # 读 1
  5. df1 = spark.read.json('hdfs:///python/test_support/sql/people.json')
  6. # 读 2
  7. rdd = sc.textFile('hdfs:///python/test_support/sql/people.json')
  8. df2 = spark.read.json(rdd)
  9. # spark.read.json()其他部分参数
  10. encoding=None
  11. 还可以对数据做格式化,包括时间、空值、特殊符号
  12. ...其余参数看官方文档
  13. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
  14. # 写
  15. df.write.json(path)

parquet

  1. # 读
  2. df = spark.read.parquet('hdfs:///python/test_support/sql/parquet_partitioned')
  3. # 写
  4. df.write.parquet(path)

统一读写的API

读 : spark.read.format(source).load()

  1. df = spark.read.format('json').load('hdfs:///python/test_support/sql/people.json')
  2. df = spark.read.format('parquet').load('hdfs:///python/test_support/sql/people')
  3. df = spark.read.format('csv').load('hdfs:///python/test_support/sql/people.csv')

写:

  1. df.write
  2. .format('parquet')
  3. .mode("append")
  4. .save(os.path.join(path)

保存为单文件

经常会有需要保存为一个单独的文件,而不是目录。但spark的保存格式只有目录,幸运的是,当我们merge所有数据到一个文件保存后,该文件的命名一定是以 part-00000开头的,所以可以利用这个特征来移动文件,达到保存为单文件的目的。使用coalesce(1)即可

  1. data.coalesce(1).write.format('json').save('file:///user/data')
  2. spark没有办法直接操作hadoop的目录,但可以使用shell在外头进行hdfs操作。