通用方法写
df1.write.format("json、csv、...").mode("overwrite").save("output/persons2")
Json读写
/
注意:默认一行只读取一个json
设置读取的参数: DataFrameReader.385行
lineSep: 每行的分隔符,默认 \r, \r\n and \n
mode(写股模式):
append : 追加写
ignore : 输出目录存在,就不写
error(默认) : 输出目录存在,就报错
overwrite : 覆盖写
/
@Testdef testReadAndWriteJson():Unit={// 专门读jsonval dataFrame1 = session.read.json("input/people.json")// 通用的读取方法 通过format(文件格式)不同,读取不同格式的数据val dataFrame2 = session.read.format("json").option("lineSep"," ") //设置读取的参数.load("input/people.json")dataFrame2.show()// 写JSONvar datas:java.util.List[Row]=new util.ArrayList[Row]();datas.add(Row("jack",20))datas.add(Row("jack1",30))datas.add(Row("jack2",40))// 表结构var schema: StructType=StructType( StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)val df: DataFrame = session.createDataFrame(datas, schema)val rdd: RDD[Row] = df.rdd.coalesce(1)// rdd.toDS()val df1: DataFrame = session.createDataFrame(rdd, schema)// 通用方法写jsondf1.write.format("json").mode("overwrite").save("output/persons2")// 专门写json格式df.write.mode("ignore").json("output/persons1.json")}
csv读写
/
.csv 是一种特殊的文件格式,在这种格式中保存的数据,必须以一种指定的分隔符分割
c(comma)s(seperate)v(value): 逗号分割字段
:sv :割字段
asv : a符号分割字段
注意: 默认只认,分隔符
设置读取的参数: DataFrameReader.608行
sep (default ,)
header (default false)
/
@Testdef testCSV():Unit={val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")df.write.option("sep",",").option("header","true").csv("output/csv")}
jdbc读写
/*
def jdbc(
url: String,
table: String,
columnName: String, : 只能传入numeric, date, or timestamp,用来分区<br /> lowerBound: Long, : 分区列的最小范围<br /> upperBound: Long, : 分区列的最大范围<br /> numPartitions: Int : 分几个区connectionProperties: Properties : <br /> username,<br />password,<br />driverClass<br />)<br /> 参数名参考: JDBCOptions<br /> */
jdbc读
@Testdef testJDBCRead():Unit= {val properties = new Properties()properties.put("user", "root")properties.put("password", "000000")properties.put("driverClass", "com.mysql.jdbc.Driver")val df: DataFrame = session.read.jdbc("jdbc:mysql://hadoop104:3306/gmall_report","ads_user_total",properties)//全表显示,默认显示前20行df.show()// 按需查询df.createTempView("ads_user_total")session.sql("select dt,no_order_user_count from ads_user_total where dt >= '2021-05-08' ").show()}
jdbc写
@Testdef testJDBCWrite():Unit={val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")val properties = new Properties()properties.put("user", "root")properties.put("password", "000000")properties.put("driverClass", "com.mysql.jdbc.Driver")// 写入jdbc的数据是默认text格式,如果要用idea导数据最好先在mysql中创建好表。df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop104:3306/gmall_report","people",properties)}
不指定输出格式默认parquet
/
不指定输出文件的格式(json\csv等),默认以parquet格式写 ,默认读也是读parquet文件
Avro SequnceFile : 早期
ORC(Hive研发): spark2.4.x以上才支持orc
Parquet(Hadoop生态圈通用) : phoenix,impala,presto
textfile : 一直使用,简单,方便查看
textfile + 压缩格式
/
@Testdef testDefault():Unit={val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("datas/people.csv")// df.write.save("output/default")session.read.load("datas/users.parquet").show()}
