创建
文件
*parquet读取(api方式)scala> spark.read.csv format jdbc json load option options orc parquet schema table text textFileval userDF=spark.read.format("parquet").load("data/users.parquet")*parquet读取(spark-sql方式)CREATE TEMPORARY VIEW parquetTableUSING org.apache.spark.sql.parquetOPTIONS (path "file:opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet");select * from parquetTable;
mysql
*mysql读取(api方式)val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", "rddtable").option("user", "root").option("password", "000000").option("driver", "com.mysql.jdbc.Driver").load()或者: val studentFrame: DataFrame = spark.read.jdbc(url,student,properties)*mysql读取(spark-sql方式)CREATE TEMPORARY VIEW jdbcTableUSING org.apache.spark.sql.jdbcOPTIONS (url "jdbc:mysql://hadoop:3306",dbtable "hive.TBLS",user 'root',password '123456',driver 'com.mysql.jdbc.Driver');select * from jdbcTable;*mysql写入jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", "dftable").option("user", "root").option("password", "000000").save()或者:val result: DataFrame = spark.sql("select * from user where age > 30")result.write.mode("append").jdbc(url,"result",properties)
hive
* 确定原有Hive是正常工作的* 需要把hive-site.xml拷贝到spark的conf/目录下* 如果以前hive-site.xml文件中,配置过Tez相关信息,注释掉* 把Mysql的connector复制到Spark的jars/目录下* 需要提前启动hive服务,hive/bin/hiveservices.sh start* 如果访问不到hdfs,则需把core-site.xml和hdfs-site.xml拷贝到conf/目录<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency>1. 拷贝hive-site.xml、core-site.xml和hdfs-site.xml到resources目录2.代码实现def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()spark.sql("show tables").show()//释放资源spark.stop()}
json
指定schema读取json
- https://blog.csdn.net/weixin_46034893/article/details/113619372 flatMap需要传入RowEncoder(schema)
�
好处:1、避免额外的扫描判断 2、不指定的话会按照字典顺序
坏处:1、对json要求严格,int 不能指定成string。 2、如果一行解析失败 会变成null
val structType = new StructType()
.add("name", StringType)
.add("age", IntegerType)
.add("address", StringType)
.add("location", ArrayType(DoubleType))
val df = spark.read.schema(structType).json(路径) # scala
spark.read.json(self, path, schema=Non…) # python 在DataFrameReader 源码下
csv
指定schema读取csv
val pathCSV_2="file:///E:\\taxi.csv"
val schemaS=StructType(Array(
StructField("tid",LongType),
StructField("lat",StringType,nullable = true),
StructField("lon",StringType,nullable = true),
StructField("time",StringType)
))
//读取
sqlContext.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header","false")
.schema(schemaS)
.load(pathCSV_2)
.show(5)
//写入
resultDataFrame.coalesce(1).write
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header","true")
.partitionBy("hour")
.mode(SaveMode.Overwrite)
.save("file:///E:\\out")
保存
#通用方式
-spark.read.format("类型").load("路径")
-df. writer.format("类型").save("路径")
val writeOptions = Map("header" -> "true",
"delimiter" -> "\t",
"path" -> "hdfs://hadoop102:9000/test")
val readOptions = Map("header" -> "true",
"delimiter" -> "\t",
"path" -> "hdfs://hadoop102:9000/test")
# 读取
val datarDF= spark.read.format("csv").options(readOptions).load()
# 保存
ds.write.format("csv").options(writeOptions).mode(SaveMode.Overwrite).save(路径)
转换
RDD 》》》 DF
import spark.implicits._ #需要加入隐式转换 支持。这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称
val peopleRDD = sc.textFile("/opt/module/spark-local/people.txt") #qiaofeng,18
# 方法1
peopleRDD.map{x=> val fields=x.split(",");(fields(0),fields(1).trim.toInt)}.toDF("name","age").show
#方法2
case class People(name:String,age:Int)
peopleRDD.map{x=> var fields=x.split(",");People(fields(0),fields(1).toInt)}.toDF.show
val userRdd: RDD[User] = lines.map(e => {
val split = e.split(",")
val name = split(0)
val age = split(1).toInt
val fv = split(2).toDouble
User(name, age, fv)
})
import spark.implicits._
val df1: DataFrame = userRdd.toDF()
#方法3 使用少
//通过可编程接口,定义Schema,并应用到RDD上 太复杂,不重要
val testRDD = spark.sparkContext.textFile("/Users/zhibinma/data/test.txt")#qiaofeng,18
val schema = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val rowRDD = testRDD.map(_.split(",")).map(x => Row(x(0),x(1)))
val testDF = spark.createDataFrame(rowRDD,schema)
testDF.show()
DS》》》DF
#创建DS
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("wangyuyan",2)).toDS()
var df = ds.toDF
