创建

  1. //从集合中创建
  2. val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))
  3. val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))
  4. 两者的区别:makeRDDparallelize多一种重载,只有参数是seq: Seq[(T, Seq[String])]类型时,分区数是seq.size
  5. def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism):RDD[T]
  6. def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  7. def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
  8. //从外部存储系统创建
  9. val inputRDD: RDD[String] = sc.textFile("input/1.txt")
  10. ************************************mysql 读取
  11. <dependency>
  12. <groupId>mysql</groupId>
  13. <artifactId>mysql-connector-java</artifactId>
  14. <version>5.1.27</version>
  15. </dependency>
  16. //定义连接mysql的参数
  17. val driver = "com.mysql.jdbc.Driver"
  18. val url = "jdbc:mysql://hadoop102:3306/rdd"
  19. val userName = "root"
  20. val passWd = "000000"
  21. //创建JdbcRDD
  22. val rdd = new JdbcRDD(sc, () => {
  23. Class.forName(driver)
  24. DriverManager.getConnection(url, userName, passWd)
  25. },
  26. "select * from `rddtable` where `id`>=? and `id`<=?;",
  27. 1,
  28. 10,
  29. r => (r.getInt(1), r.getString(2))
  30. )
  31. //打印最后结果
  32. println(rdd.count())
  33. rdd.foreach(println)
  34. ************************************mysql 写入
  35. //定义连接mysql的参数
  36. val driver = "com.mysql.jdbc.Driver"
  37. val url = "jdbc:mysql://hadoop202:3306/test"
  38. val userName = "root"
  39. val passWd = "123456"
  40. val rdd: RDD[(String, Int)] = sc.makeRDD(List(("qiaofeng",18),("duanyu",20),("xuzhu",21)))
  41. rdd.foreachPartition{
  42. datas=>{
  43. //注册驱动
  44. Class.forName(driver)
  45. //获取连接
  46. val conn: Connection = DriverManager.getConnection(url,userName,passWd)
  47. //执行的sql
  48. var sql:String = "insert into user(name,age) values(?,?)"
  49. //获取数据库操作对象
  50. val ps: PreparedStatement = conn.prepareStatement(sql)
  51. datas.foreach{
  52. case (name,age)=>{
  53. //给参数赋值
  54. ps.setString(1,name)
  55. ps.setInt(2,age)
  56. //执行sql语句
  57. ps.executeUpdate()
  58. }
  59. }
  60. //释放资源
  61. ps.close()
  62. conn.close()
  63. }
  64. }

保存

//保存数据
inputRDD.saveAsTextFile("output")

转换

DF》》》RDD

val dfToRDD = df.rdd

DS》》》RDD

case class Person(name: String, age: Long) 
val caseClassDS = Seq(Person("wangyuyan",2)).toDS() 

DS.rdd