通用方法写
df1.write.format("json、csv、...").mode("overwrite").save("output/persons2")
Json读写
/
注意:默认一行只读取一个json
设置读取的参数: DataFrameReader.385行
lineSep: 每行的分隔符,默认 \r
, \r\n
and \n
mode(写股模式):
append : 追加写
ignore : 输出目录存在,就不写
error(默认) : 输出目录存在,就报错
overwrite : 覆盖写
/
@Test
def testReadAndWriteJson():Unit={
// 专门读json
val dataFrame1 = session.read.json("input/people.json")
// 通用的读取方法 通过format(文件格式)不同,读取不同格式的数据
val dataFrame2 = session.read
.format("json")
.option("lineSep"," ") //设置读取的参数
.load("input/people.json")
dataFrame2.show()
// 写JSON
var datas:java.util.List[Row]=new util.ArrayList[Row]();
datas.add(Row("jack",20))
datas.add(Row("jack1",30))
datas.add(Row("jack2",40))
// 表结构
var schema: StructType=StructType( StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
val df: DataFrame = session.createDataFrame(datas, schema)
val rdd: RDD[Row] = df.rdd.coalesce(1)
// rdd.toDS()
val df1: DataFrame = session.createDataFrame(rdd, schema)
// 通用方法写json
df1.write.format("json").mode("overwrite").save("output/persons2")
// 专门写json格式
df.write.mode("ignore").json("output/persons1.json")
}
csv读写
/
.csv 是一种特殊的文件格式,在这种格式中保存的数据,必须以一种指定的分隔符分割
c(comma)s(seperate)v(value): 逗号分割字段
:sv :割字段
asv : a符号分割字段
注意: 默认只认,分隔符
设置读取的参数: DataFrameReader.608行
sep
(default ,
)
header
(default false
)
/
@Test
def testCSV():Unit={
val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")
df.write
.option("sep",",")
.option("header","true")
.csv("output/csv")
}
jdbc读写
/*
def jdbc(
url: String,
table: String,
columnName: String, : 只能传入numeric, date, or timestamp,用来分区<br /> lowerBound: Long, : 分区列的最小范围<br /> upperBound: Long, : 分区列的最大范围<br /> numPartitions: Int : 分几个区
connectionProperties: Properties : <br /> username,<br />password,<br />driverClass<br />)<br /> 参数名参考: JDBCOptions<br /> */
jdbc读
@Test
def testJDBCRead():Unit= {
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "000000")
properties.put("driverClass", "com.mysql.jdbc.Driver")
val df: DataFrame = session.read.jdbc("jdbc:mysql://hadoop104:3306/gmall_report",
"ads_user_total",
properties
)
//全表显示,默认显示前20行
df.show()
// 按需查询
df.createTempView("ads_user_total")
session.sql("select dt,no_order_user_count from ads_user_total where dt >= '2021-05-08' ").show()
}
jdbc写
@Test
def testJDBCWrite():Unit={
val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("input/people.csv")
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "000000")
properties.put("driverClass", "com.mysql.jdbc.Driver")
// 写入jdbc的数据是默认text格式,如果要用idea导数据最好先在mysql中创建好表。
df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop104:3306/gmall_report","people",properties)
}
不指定输出格式默认parquet
/
不指定输出文件的格式(json\csv等),默认以parquet格式写 ,默认读也是读parquet文件
Avro SequnceFile : 早期
ORC(Hive研发): spark2.4.x以上才支持orc
Parquet(Hadoop生态圈通用) : phoenix,impala,presto
textfile : 一直使用,简单,方便查看
textfile + 压缩格式
/
@Test
def testDefault():Unit={
val df: DataFrame = session.read.option("sep", ";").option("header", "true").csv("datas/people.csv")
// df.write.save("output/default")
session.read.load("datas/users.parquet").show()
}