通用方法写

  1. df1.write.format("json、csv、...").mode("overwrite").save("output/persons2")

Json读写

/
注意:默认一行只读取一个json
设置读取的参数: DataFrameReader.385行
lineSep: 每行的分隔符,默认 \r, \r\n and \n
mode(写股模式):
append : 追加写
ignore : 输出目录存在,就不写
error(默认) : 输出目录存在,就报错
overwrite : 覆盖写
/

  1. @Test
  2. def testReadAndWriteJson():Unit={
  3. // 专门读json
  4. val dataFrame1 = session.read.json("input/people.json")
  5. // 通用的读取方法 通过format(文件格式)不同,读取不同格式的数据
  6. val dataFrame2 = session.read
  7. .format("json")
  8. .option("lineSep"," ") //设置读取的参数
  9. .load("input/people.json")
  10. dataFrame2.show()
  11. // 写JSON
  12. var datas:java.util.List[Row]=new util.ArrayList[Row]();
  13. datas.add(Row("jack",20))
  14. datas.add(Row("jack1",30))
  15. datas.add(Row("jack2",40))
  16. // 表结构
  17. var schema: StructType=StructType( StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
  18. val df: DataFrame = session.createDataFrame(datas, schema)
  19. val rdd: RDD[Row] = df.rdd.coalesce(1)
  20. // rdd.toDS()
  21. val df1: DataFrame = session.createDataFrame(rdd, schema)
  22. // 通用方法写json
  23. df1.write.format("json").mode("overwrite").save("output/persons2")
  24. // 专门写json格式
  25. df.write.mode("ignore").json("output/persons1.json")
  26. }

csv读写

/
.csv 是一种特殊的文件格式,在这种格式中保存的数据,必须以一种指定的分隔符分割
c(comma)s(seperate)v(value): 逗号分割字段
:sv :割字段
asv : a符号分割字段
注意: 默认只认,分隔符
设置读取的参数: DataFrameReader.608行
sep (default ,)
header (default false)
/

  1. @Test
  2. def testCSV():Unit={
  3. val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")
  4. df.write
  5. .option("sep",",")
  6. .option("header","true")
  7. .csv("output/csv")
  8. }

jdbc读写

/*
def jdbc(
url: String,
table: String,

  1. columnName: String, : 只能传入numeric, date, or timestamp,用来分区<br /> lowerBound: Long, 分区列的最小范围<br /> upperBound: Long, : 分区列的最大范围<br /> numPartitions: Int 分几个区
  2. connectionProperties: Properties <br /> username,<br />password,<br />driverClass<br />)<br /> 参数名参考: JDBCOptions<br /> */

jdbc读

  1. @Test
  2. def testJDBCRead():Unit= {
  3. val properties = new Properties()
  4. properties.put("user", "root")
  5. properties.put("password", "000000")
  6. properties.put("driverClass", "com.mysql.jdbc.Driver")
  7. val df: DataFrame = session.read.jdbc("jdbc:mysql://hadoop104:3306/gmall_report",
  8. "ads_user_total",
  9. properties
  10. )
  11. //全表显示,默认显示前20行
  12. df.show()
  13. // 按需查询
  14. df.createTempView("ads_user_total")
  15. session.sql("select dt,no_order_user_count from ads_user_total where dt >= '2021-05-08' ").show()
  16. }

jdbc写

  1. @Test
  2. def testJDBCWrite():Unit={
  3. val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")
  4. val properties = new Properties()
  5. properties.put("user", "root")
  6. properties.put("password", "000000")
  7. properties.put("driverClass", "com.mysql.jdbc.Driver")
  8. // 写入jdbc的数据是默认text格式,如果要用idea导数据最好先在mysql中创建好表。
  9. df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop104:3306/gmall_report","people",properties)
  10. }

不指定输出格式默认parquet

/
不指定输出文件的格式(json\csv等),默认以parquet格式写 ,默认读也是读parquet文件
Avro SequnceFile : 早期
ORC(Hive研发): spark2.4.x以上才支持orc
Parquet(Hadoop生态圈通用) : phoenix,impala,presto
textfile : 一直使用,简单,方便查看
textfile + 压缩格式
/

  1. @Test
  2. def testDefault():Unit={
  3. val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("datas/people.csv")
  4. // df.write.save("output/default")
  5. session.read.load("datas/users.parquet").show()
  6. }