通用的load和save操作

  1. 对于Spark SQLDataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的loadsave操作。load操作主要用于加载数据,创建出DataFramesave操作,主要用于将DataFrame中的数据保存到文件中。
  2. Java版本
  3. DataFrame df = sqlContext.read().load("users.parquet");
  4. df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
  5. Scala版本
  6. val df = sqlContext.read.load("users.parquet")
  7. df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定数据源类型

  1. 也可以手动指定用来操作的数据源类型。数据源通常需要使用其全限定名来指定,比如parquetorg.apache.spark.sql.parquet。但是Spark SQL内置了一些数据源类型,比如jsonparquetjdbc等等。实际上,通过这个功能,就可以在不同类型的数据源之间进行转换了。比如将json文件中的数据保存到parquet文件中。默认情况下,如果不指定数据源类型,那么就是parquet
  2. Java版本
  3. DataFrame df = sqlContext.read().format("json").load("people.json");
  4. df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
  5. Scala版本
  6. val df = sqlContext.read.format("json").load("people.json")
  7. df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Save Mode

  1. Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。
Save Mode 意义
SaveMode.ErrorIfExists(默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append 如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite 如果目标位置已经存在数据,那么将已经存在的数据删除,用新数据进行覆盖
SaveMode.Igoew 如果目标位置已经存在数据,那么就忽略,不做任何操作

通用版
java版

  1. /**
  2. * 通用的load和save操作
  3. * @author Administrator
  4. *
  5. */
  6. public class GenericLoadSave {
  7. public static void main(String[] args) {
  8. SparkConf conf = new SparkConf()
  9. .setAppName("GenericLoadSave");
  10. JavaSparkContext sc = new JavaSparkContext(conf);
  11. SQLContext sqlContext = new SQLContext(sc);
  12. DataFrame usersDF = sqlContext.read().load(
  13. "hdfs://spark1:9000/users.parquet");
  14. usersDF.select("name", "favorite_color").write()
  15. .save("hdfs://spark1:9000/namesAndFavColors.parquet");
  16. }
  17. }

scala 版

  1. /**
  2. * @author Administrator
  3. */
  4. object GenericLoadSave {
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf()
  7. .setAppName("GenericLoadSave")
  8. val sc = new SparkContext(conf)
  9. val sqlContext = new SQLContext(sc)
  10. val usersDF = sqlContext.read.load("hdfs://spark1:9000/users.parquet")
  11. usersDF.write.save("hdfs://spark1:9000/namesAndFavColors_scala")
  12. }
  13. }

手动指定格式

java 版

  1. /**
  2. * 手动指定数据源类型
  3. * @author Administrator
  4. *
  5. */
  6. public class ManuallySpecifyOptions {
  7. public static void main(String[] args) {
  8. SparkConf conf = new SparkConf()
  9. .setAppName("ManuallySpecifyOptions");
  10. JavaSparkContext sc = new JavaSparkContext(conf);
  11. SQLContext sqlContext = new SQLContext(sc);
  12. DataFrame peopleDF = sqlContext.read().format("json")
  13. .load("hdfs://spark1:9000/people.json");
  14. peopleDF.select("name").write().format("parquet")
  15. .save("hdfs://spark1:9000/peopleName_java");
  16. }
  17. }

Scala 版

  1. /**
  2. * @author Administrator
  3. */
  4. object ManuallySpecifyOptions {
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf()
  7. .setAppName("ManuallySpecifyOptions")
  8. val sc = new SparkContext(conf)
  9. val sqlContext = new SQLContext(sc)
  10. val peopleDF = sqlContext.read.format("json").load("hdfs://spark1:9000/people.json")
  11. peopleDF.select("name").write.format("parquet").save("hdfs://spark1:9000/peopleName_scala")
  12. }
  13. }

savemode实例

  1. /**
  2. * SaveModel示例
  3. * @author Administrator
  4. *
  5. */
  6. public class SaveModeTest {
  7. @SuppressWarnings("deprecation")
  8. public static void main(String[] args) {
  9. SparkConf conf = new SparkConf()
  10. .setAppName("SaveModeTest");
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. SQLContext sqlContext = new SQLContext(sc);
  13. DataFrame peopleDF = sqlContext.read().format("json")
  14. .load("hdfs://spark1:9000/people.json");
  15. peopleDF.save("hdfs://spark1:9000/people_savemode_test", "json", SaveMode.Append);
  16. }
  17. }