3.24增量数据抽取
1.题目要求
1.使用Spark抽取MySQL指定数据表中的新增的商品数据到ODS层的指定的分区表中 2.使用Spark抽取MySQL指定数据表中的新增的用户数据到ODS层的指定的分区表中 3.使用Spark抽取MySQL指定数据表中的新增的订单数据到ODS层的指定的分区表中
2.题目分析
通过时间进行建立分区表以data为关键字,考察连接数据库,数据库的数据筛选后写入hive分区表里面,一般为增量数据,就是昨天增加的数据。
3.实现步骤
编写spark-sql程序
package work_Testimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormatimport java.util.{Calendar, Properties}object MySQLToHive { def main(args: Array[String]): Unit = { //初始化 val conf = new SparkConf().setMaster("local").setAppName("MySQLTohive") val spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate() //连接数据库 val url = "jdbc:mysql://master/test" val prop = new Properties() prop.put("user","root") prop.put("password","123456") prop.put("driver","com.mysql.jdbc.Driver") //spark读取MySQL val df = spark.read.jdbc(url, "t_oder", prop) df.createOrReplaceTempView("v_oder") //获取昨天的日期,进行增量抽取 val day = Calendar.getInstance day.add(Calendar.DATE, -1) val sdf = new SimpleDateFormat("yyyy-MM-dd") val yesDate = sdf.format(day.getTime) //把读进来的数据写入hive //patition根据oserdata建立分区表 spark.sql("insert overwrite table test_waimai.oder_part patition(oderdata='2022-02-23') select oserkey,custkey,oderparice from v_oder where oderdata='2022-03-23'") spark.stop()
打包上传到spark集群运行
/root/local/src/spark-2.4.8-bin-hadoop2.6/bin/spark-submit (spark,bin目录下的spark-submit)\--class work_Test.HiveToMySQL (包名.类名)\--master spark://master:7077 \/root/local/src/spark-2.4.8-bin-hadoop2.6/data/graphx/spark.jar 10(架包存放位置加10)
重点: mysql语句筛选新增的数据写入hive分区表里面
insert overwrite table test_waimai.oder_part patition(oderdata='2022-02-23') select oserkey,custkey,oderparice from v_oder where oderdata='2022-03-23'
日期的获取(获取昨天的-1,前天-2,明天+1以此类推)
package work_Testimport java.text.SimpleDateFormatimport java.util.Calendarobject yesDate_Test { def main(args: Array[String]): Unit = { val day = Calendar.getInstance day.add(Calendar.DATE, -1) val sdf = new SimpleDateFormat("yyyy-MM-dd") val yesDate = sdf.format(day.getTime) println(yesDate) }}