3.24增量数据抽取

1.题目要求

  1. 1.使用Spark抽取MySQL指定数据表中的新增的商品数据到ODS层的指定的分区表中
  2. 2.使用Spark抽取MySQL指定数据表中的新增的用户数据到ODS层的指定的分区表中
  3. 3.使用Spark抽取MySQL指定数据表中的新增的订单数据到ODS层的指定的分区表中

2.题目分析

  1. 通过时间进行建立分区表以data为关键字,考察连接数据库,数据库的数据筛选后写入hive分区表里面,一般为增量数据,就是昨天增加的数据。

3.实现步骤

  1. 编写spark-sql程序
  1. package work_Test
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.SparkSession
  4. import java.text.SimpleDateFormat
  5. import java.util.{Calendar, Properties}
  6. object MySQLToHive {
  7. def main(args: Array[String]): Unit = {
  8. //初始化
  9. val conf = new SparkConf().setMaster("local").setAppName("MySQLTohive")
  10. val spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
  11. //连接数据库
  12. val url = "jdbc:mysql://master/test"
  13. val prop = new Properties()
  14. prop.put("user","root")
  15. prop.put("password","123456")
  16. prop.put("driver","com.mysql.jdbc.Driver")
  17. //spark读取MySQL
  18. val df = spark.read.jdbc(url, "t_oder", prop)
  19. df.createOrReplaceTempView("v_oder")
  20. //获取昨天的日期,进行增量抽取
  21. val day = Calendar.getInstance
  22. day.add(Calendar.DATE, -1)
  23. val sdf = new SimpleDateFormat("yyyy-MM-dd")
  24. val yesDate = sdf.format(day.getTime)
  25. //把读进来的数据写入hive
  26. //patition根据oserdata建立分区表
  27. spark.sql("insert overwrite table test_waimai.oder_part patition(oderdata='2022-02-23') select oserkey,custkey,oderparice from v_oder where oderdata='2022-03-23'")
  28. spark.stop()
  1. 打包上传到spark集群运行
  1. /root/local/src/spark-2.4.8-bin-hadoop2.6/bin/spark-submit spark,bin目录下的spark-submit\
  2. --class work_Test.HiveToMySQL (包名.类名)\
  3. --master spark://master:7077 \
  4. /root/local/src/spark-2.4.8-bin-hadoop2.6/data/graphx/spark.jar 10(架包存放位置加10
  1. 重点:
  2. mysql语句筛选新增的数据写入hive分区表里面
  1. insert overwrite table test_waimai.oder_part patition(oderdata='2022-02-23') select oserkey,custkey,oderparice from v_oder where oderdata='2022-03-23'
  1. 日期的获取(获取昨天的-1,前天-2,明天+1以此类推)
  1. package work_Test
  2. import java.text.SimpleDateFormat
  3. import java.util.Calendar
  4. object yesDate_Test {
  5. def main(args: Array[String]): Unit = {
  6. val day = Calendar.getInstance
  7. day.add(Calendar.DATE, -1)
  8. val sdf = new SimpleDateFormat("yyyy-MM-dd")
  9. val yesDate = sdf.format(day.getTime)
  10. println(yesDate)
  11. }
  12. }