创建

文件

  1. *parquet读取(api方式)
  2. scala> spark.read.
  3. csv format jdbc json load option options orc parquet schema table text textFile
  4. val userDF=spark.read.format("parquet").load("data/users.parquet")
  5. *parquet读取(spark-sql方式)
  6. CREATE TEMPORARY VIEW parquetTable
  7. USING org.apache.spark.sql.parquet
  8. OPTIONS (
  9. path "file:opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
  10. );
  11. select * from parquetTable;

mysql

  1. *mysql读取(api方式)
  2. val jdbcDF = spark.read
  3. .format("jdbc")
  4. .option("url", "jdbc:mysql://hadoop102:3306/rdd")
  5. .option("dbtable", "rddtable")
  6. .option("user", "root")
  7. .option("password", "000000")
  8. .option("driver", "com.mysql.jdbc.Driver")
  9. .load()
  10. 或者: val studentFrame: DataFrame = spark.read.jdbc(url,student,properties)
  11. *mysql读取(spark-sql方式)
  12. CREATE TEMPORARY VIEW jdbcTable
  13. USING org.apache.spark.sql.jdbc
  14. OPTIONS (
  15. url "jdbc:mysql://hadoop:3306",
  16. dbtable "hive.TBLS",
  17. user 'root',
  18. password '123456',
  19. driver 'com.mysql.jdbc.Driver'
  20. );
  21. select * from jdbcTable;
  22. *mysql写入
  23. jdbcDF.write
  24. .format("jdbc")
  25. .option("url", "jdbc:mysql://hadoop102:3306/rdd")
  26. .option("dbtable", "dftable")
  27. .option("user", "root")
  28. .option("password", "000000")
  29. .save()
  30. 或者:
  31. val result: DataFrame = spark.sql("select * from user where age > 30")
  32. result.write.mode("append").jdbc(url,"result",properties)

hive

  1. * 确定原有Hive是正常工作的
  2. * 需要把hive-site.xml拷贝到sparkconf/目录下
  3. * 如果以前hive-site.xml文件中,配置过Tez相关信息,注释掉
  4. * Mysqlconnector复制到Sparkjars/目录下
  5. * 需要提前启动hive服务,hive/bin/hiveservices.sh start
  6. * 如果访问不到hdfs,则需把core-site.xmlhdfs-site.xml拷贝到conf/目录
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-hive_2.11</artifactId>
  10. <version>2.1.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.hive</groupId>
  14. <artifactId>hive-exec</artifactId>
  15. <version>1.2.1</version>
  16. </dependency>
  17. 1. 拷贝hive-site.xmlcore-site.xmlhdfs-site.xmlresources目录
  18. 2.代码实现
  19. def main(args: Array[String]): Unit = {
  20. //创建上下文环境配置对象
  21. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
  22. val spark: SparkSession = SparkSession
  23. .builder()
  24. .config(conf)
  25. .enableHiveSupport()
  26. .getOrCreate()
  27. spark.sql("show tables").show()
  28. //释放资源
  29. spark.stop()
  30. }

json

指定schema读取json

好处:1、避免额外的扫描判断 2、不指定的话会按照字典顺序 
坏处:1、对json要求严格,int 不能指定成string。 2、如果一行解析失败  会变成null
val structType = new StructType()
      .add("name", StringType)
      .add("age", IntegerType)
      .add("address", StringType)
      .add("location", ArrayType(DoubleType))
val df = spark.read.schema(structType).json(路径) # scala
spark.read.json(self, path, schema=Non…)   # python     在DataFrameReader 源码下

csv

指定schema读取csv


val pathCSV_2="file:///E:\\taxi.csv"
val schemaS=StructType(Array(
    StructField("tid",LongType),
    StructField("lat",StringType,nullable = true),
    StructField("lon",StringType,nullable = true),
    StructField("time",StringType)
  ))
//读取
sqlContext.read
      .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
      .option("header","false")
      .schema(schemaS)
      .load(pathCSV_2)
      .show(5)
//写入
resultDataFrame.coalesce(1).write
    .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
    .option("header","true")
    .partitionBy("hour")
    .mode(SaveMode.Overwrite)
    .save("file:///E:\\out")

保存

#通用方式
-spark.read.format("类型").load("路径")
-df.    writer.format("类型").save("路径")


val writeOptions = Map("header" -> "true", 
                        "delimiter" -> "\t", 
                        "path" -> "hdfs://hadoop102:9000/test")
val readOptions = Map("header" -> "true", 
                    "delimiter" -> "\t", 
                    "path" -> "hdfs://hadoop102:9000/test")
# 读取
val datarDF= spark.read.format("csv").options(readOptions).load()
# 保存
             ds.write.format("csv").options(writeOptions).mode(SaveMode.Overwrite).save(路径)

转换

RDD 》》》 DF

import spark.implicits._ #需要加入隐式转换 支持。这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称
val peopleRDD = sc.textFile("/opt/module/spark-local/people.txt") #qiaofeng,18
# 方法1
peopleRDD.map{x=> val fields=x.split(",");(fields(0),fields(1).trim.toInt)}.toDF("name","age").show
#方法2 
case class People(name:String,age:Int)
peopleRDD.map{x=> var fields=x.split(",");People(fields(0),fields(1).toInt)}.toDF.show
    val userRdd: RDD[User] = lines.map(e => {
          val split = e.split(",")
          val name = split(0)
          val age = split(1).toInt
          val fv = split(2).toDouble
          User(name, age, fv)
        })
    import spark.implicits._
    val df1: DataFrame = userRdd.toDF()

#方法3 使用少
//通过可编程接口,定义Schema,并应用到RDD上 太复杂,不重要
    val testRDD = spark.sparkContext.textFile("/Users/zhibinma/data/test.txt")#qiaofeng,18
val schema = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
    val rowRDD = testRDD.map(_.split(",")).map(x => Row(x(0),x(1)))
    val testDF = spark.createDataFrame(rowRDD,schema)
    testDF.show()

DS》》》DF

#创建DS
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("wangyuyan",2)).toDS()

var df = ds.toDF