利用反射机制推断RDD

在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame。

  1. package cn.bx.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
  4. case class People(name :String,age:Int)
  5. object DataFrameNote {
  6. def main(args: Array[String]): Unit = {
  7. val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  8. val fileRDD: RDD[String] = spark.sparkContext.textFile("input/people.txt")
  9. val mapRDD: RDD[Array[String]] = fileRDD.map(_.split(","))
  10. val peopleRDD: RDD[People] = mapRDD.map(attr => People(attr(0),attr(1).trim.toInt))
  11. import spark.implicits._
  12. val peopleDF: DataFrame = peopleRDD.toDF
  13. peopleDF.show()
  14. peopleDF.filter(peopleDF.col("age")>20).show
  15. peopleDF.createOrReplaceTempView("people")
  16. val peopleFrame: DataFrame = spark.sql("select name,age from people where age > 20")
  17. peopleFrame.show()
  18. peopleFrame.map(t =>"Name:"+t(0)+","+"Age: "+t(1)).show()
  19. spark.stop()
  20. }
  21. }

使用编程方式定义RDD模式

当无法提前定义case class时,就需要采用编程方式定义RDD模式

  1. package cn.bx.spark
  2. import org.apache.spark.sql.types._
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.types.StructField
  5. import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
  6. import org.apache.spark.sql.Row
  7. case class People(name :String,age:Int)
  8. object DataFrameNote {
  9. def main(args: Array[String]): Unit = {
  10. val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  11. import spark.implicits._
  12. val peopleRDD: RDD[String] = spark.sparkContext.textFile("input/people.txt")
  13. val rowRDD: RDD[Row] = peopleRDD.map(_.split(",")).map(attr =>Row(attr(0),attr(1).trim))
  14. val schemaString = "name,age"
  15. val fields: Array[StructField] = schemaString.split(",").map(fieldName=>StructField(fieldName,StringType,nullable = true))
  16. val schema = StructType(fields)
  17. val peopleDF: DataFrame = spark.createDataFrame(rowRDD,schema)
  18. peopleDF.createOrReplaceTempView("people")
  19. val results = spark.sql("SELECT name,age FROM people where age >20")
  20. results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
  21. results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).write.format("json").
  22. save("input/people_result.json")
  23. results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).rdd.saveAsTextFile("input/newpeople.txt")
  24. spark.stop()
  25. spark.stop()
  26. }
  27. }
  1. 1Create an RDD of Rows from the original RDD;
  2. 2Create the schema represented by a `StructType` matching the structure of Rows in the RDD created in Step 1.
  3. 3Apply the schema to the RDD of Rows via `createDataFrame` method provided by SparkSession.
  1. package cn.bx.spark
  2. import org.apache.spark.sql.types._
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  5. import org.apache.spark.sql.types.{StructField, StructType}
  6. object DataFrameSchemaApp {
  7. def main(args: Array[String]): Unit = {
  8. val spark: SparkSession = SparkSession.builder()
  9. .appName("DataFrameSchemaApp")
  10. .master("local[*]")
  11. .getOrCreate()
  12. val mapRDD: RDD[Array[String]] = spark.sparkContext.textFile("resources/people.txt").map(_.split(","))
  13. val rowRDD: RDD[Row] = mapRDD.map(lines =>Row(lines(0),lines(1).trim.toInt))
  14. val structType = StructType(Array(StructField("name",StringType,true),StructField("age",IntegerType,true)))
  15. val dataFrame: DataFrame = spark.createDataFrame(rowRDD,structType)
  16. dataFrame.printSchema()
  17. dataFrame.show(1)
  18. import spark.implicits._
  19. }
  20. }

学生信息

测试数据

  1. 1|Burke|1-300-746-8446|ullamcorper.velit.in@ametnullaDonec.co.uk
  2. 2|Kamal|1-668-571-5046|pede.Suspendisse@interdumenim.edu
  3. 3|Olga|1-956-311-1686|Aenean.eget.metus@dictumcursusNunc.edu
  4. 4|Belle|1-246-894-6340|vitae.aliquet.nec@neque.co.uk
  5. 5|Trevor|1-300-527-4967|dapibus.id@acturpisegestas.net
  6. 6|Laurel|1-691-379-9921|adipiscing@consectetueripsum.edu
  7. 7|Sara|1-608-140-1995|Donec.nibh@enimEtiamimperdiet.edu
  8. 8|Kaseem|1-881-586-2689|cursus.et.magna@euismod.org
  9. 9|Lev|1-916-367-5608|Vivamus.nisi@ipsumdolor.com
  10. 10|Maya|1-271-683-2698|accumsan.convallis@ornarelectusjusto.edu
  11. 11|Emi|1-467-270-1337|est@nunc.com
  12. 12|Caleb|1-683-212-0896|Suspendisse@Quisque.edu
  13. 13|Florence|1-603-575-2444|sit.amet.dapibus@lacusAliquamrutrum.ca
  14. 14|Anika|1-856-828-7883|euismod@ligulaelit.co.uk
  15. 15|Tarik|1-398-171-2268|turpis@felisorci.com
  16. 16|Amena|1-878-250-3129|lorem.luctus.ut@scelerisque.com
  17. 17|Blossom|1-154-406-9596|Nunc.commodo.auctor@eratSed.co.uk
  18. 18|Guy|1-869-521-3230|senectus.et.netus@lectusrutrum.com
  19. 19|Malachi|1-608-637-2772|Proin.mi.Aliquam@estarcu.net
  20. 20|Edward|1-711-710-6552|lectus@aliquetlibero.co.uk
  21. 21||1-711-710-6552|lectus@aliquetlibero.co.uk
  22. 22||1-711-710-6552|lectus@aliquetlibero.co.uk
  23. 23|NULL|1-711-710-6552|lectus@aliquetlibero.co.uk
  1. package cn.bx.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.{DataFrame, SparkSession}
  4. object StudentApp {
  5. case class Student(id:Int,name:String,phone:String,email:String)
  6. def main(args: Array[String]): Unit = {
  7. val spark: SparkSession = SparkSession.builder().
  8. appName("StudentApp")
  9. .master("local[*]").
  10. getOrCreate()
  11. val mapRDD: RDD[Array[String]] = spark.sparkContext.textFile("input/student.txt").
  12. map(_.split("\\|"))
  13. import spark.implicits._
  14. val studentDF: DataFrame = mapRDD.map(line =>Student(line(0).toInt,line(1),line(2),line(3))).toDF()
  15. studentDF.createOrReplaceTempView("student")
  16. spark.sql("select *from student where id = 2").show(false)//不截断数据
  17. spark.sql("select *from student where name = ''").show(false)
  18. spark.stop()
  19. }
  20. }

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

  • 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
  • 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
  • 数据模型: Avro, Thrift, Protocol Buffers, POJOs
    Spark已经为我们提供了parquet样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,有个users.parquet文件,这个文件格式比较特殊,如果你用vim编辑器打开,或者用cat命令查看文件内容,肉眼是一堆乱七八糟的东西,是无法理解的。只有被加载到程序中以后,Spark会对这种格式进行解析,然后我们才能理解其中的数据。
  1. package cn.bx.spark
  2. import org.apache.spark.sql.SparkSession
  3. object Parquetpp {
  4. def main(args: Array[String]): Unit = {
  5. val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  6. val peopleDF = spark.read.json("input/people.json")
  7. peopleDF.write.parquet("input/newpeople.parquet")
  8. val peopleFileDF = spark.read.parquet("input/newpeople.parquet")
  9. peopleFileDF.createOrReplaceTempView("parquetUser")
  10. val userDF = spark.sql("SELECT name,age FROM parquetUser")
  11. userDF.foreach(attributes =>println("Name: " + attributes(0)+" Age:"+attributes(1)))
  12. }
  13. }

mysql

下载驱动的地方

  1. wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz

或者maven

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.48</version>
  5. </dependency>
  1. mysql root@aliyun:(none)> CREATE DATABASE spark
  2. Query OK, 1 row affected
  3. Time: 0.020s
  4. mysql root@aliyun:(none)> use spark
  5. You are now connected to database "spark" as user "root"
  6. Time: 0.047s
  7. mysql root@aliyun:spark> create table student (id int(4), name char(20), gender char(4), age int(4));
  8. Query OK, 0 rows affected
  9. Time: 0.034s
  10. mysql root@aliyun:spark> insert into student values(1,'Xiaoming','F',13);
  11. Query OK, 1 row affected
  12. Time: 0.030s
  13. mysql root@aliyun:spark> insert into student values(2,'xiaohong','M',14);
  14. Query OK, 1 row affected
  15. Time: 0.018s
  16. mysql root@aliyun:spark>
  1. package cn.bx.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.types.{StringType, StructField}
  5. import org.apache.spark.sql.types._
  6. import org.apache.spark.sql.Row
  7. import java.util.Properties
  8. object JDBCNote {
  9. def main(args: Array[String]): Unit = {
  10. val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  11. val studentRDD: RDD[Array[String]] = spark.sparkContext.parallelize(Array("3 XiaoZhang F 15","4 XiaoLi M 17")).map(_.split(" "))
  12. val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
  13. val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
  14. val studentDF = spark.createDataFrame(rowRDD, schema)
  15. val prop = new Properties()
  16. prop.put("user", "root") //表示用户名是root
  17. prop.put("password", "baxiang") //表示密码是hadoop
  18. prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
  19. //采用append模式,表示追加记录到数据库spark的student表中
  20. studentDF.write.mode("append").jdbc("jdbc:mysql://aliyun:3306/spark", "spark.student", prop)
  21. }
  22. }

查询结果

  1. mysql root@aliyun:spark> select *from student
  2. +------+-----------+----------+-------+
  3. | id | name | gender | age |
  4. |------+-----------+----------+-------|
  5. | 1 | Xiaoming | F | 13 |
  6. | 2 | xiaohong | M | 14 |
  7. | 3 | XiaoZhang | F | 15 |
  8. | 4 | XiaoLi | M | 17 |
  9. +------+-----------+----------+-------+
  10. 4 rows in set
  11. Time: 0.017s

读取数据

  1. package cn.bx.spark
  2. import org.apache.spark.sql.SparkSession
  3. object JDBCNote {
  4. def main(args: Array[String]): Unit = {
  5. val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  6. import spark.implicits._
  7. val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://aliyun:3306/spark").
  8. option("driver","com.mysql.jdbc.Driver").
  9. option("dbtable", "student").
  10. option("user", "root").
  11. option("password", "baxiang").
  12. load()
  13. jdbcDF.createOrReplaceTempView("student")
  14. val results = spark.sql("SELECT name,age FROM student where age >15")
  15. results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
  16. }
  17. }