创建
//从集合中创建val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))两者的区别:makeRDD比parallelize多一种重载,只有参数是seq: Seq[(T, Seq[String])]类型时,分区数是seq.sizedef parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism):RDD[T]def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]//从外部存储系统创建val inputRDD: RDD[String] = sc.textFile("input/1.txt")************************************mysql 读取<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>//定义连接mysql的参数val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://hadoop102:3306/rdd"val userName = "root"val passWd = "000000"//创建JdbcRDDval rdd = new JdbcRDD(sc, () => {Class.forName(driver)DriverManager.getConnection(url, userName, passWd)},"select * from `rddtable` where `id`>=? and `id`<=?;",1,10,r => (r.getInt(1), r.getString(2)))//打印最后结果println(rdd.count())rdd.foreach(println)************************************mysql 写入//定义连接mysql的参数val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://hadoop202:3306/test"val userName = "root"val passWd = "123456"val rdd: RDD[(String, Int)] = sc.makeRDD(List(("qiaofeng",18),("duanyu",20),("xuzhu",21)))rdd.foreachPartition{datas=>{//注册驱动Class.forName(driver)//获取连接val conn: Connection = DriverManager.getConnection(url,userName,passWd)//执行的sqlvar sql:String = "insert into user(name,age) values(?,?)"//获取数据库操作对象val ps: PreparedStatement = conn.prepareStatement(sql)datas.foreach{case (name,age)=>{//给参数赋值ps.setString(1,name)ps.setInt(2,age)//执行sql语句ps.executeUpdate()}}//释放资源ps.close()conn.close()}}
保存
//保存数据
inputRDD.saveAsTextFile("output")
转换
DF》》》RDD
val dfToRDD = df.rdd
DS》》》RDD
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("wangyuyan",2)).toDS()
DS.rdd
