1. 读取 mysql

  1. val jdbcDF = spark.read
  2. .format("jdbc")
  3. .option("url", "jdbc:postgresql:dbserver")
  4. .option("dbtable", "schema.tablename")
  5. .option("user", "username")
  6. .option("password", "password")
  7. .option("driver", "com.mysql.jdbc.Driver")
  8. .load()

2. 写入 mysql

  1. jdbcDF.write
  2. .format("jdbc")
  3. .option("url", "jdbc:postgresql:dbserver")
  4. .option("dbtable", "schema.tablename")
  5. .option("user", "username")
  6. .option("password", "password")
  7. .option("driver", "com.mysql.jdbc.Driver")
  8. .save()
  1. trait Base {
  2. val appName: String
  3. lazy val spark = SparkSession
  4. .builder()
  5. .appName(appName)
  6. .enableHiveSupport()
  7. .getOrCreate()
  8. def run(): Unit
  9. def fetchFromMysql(dbtable: String): DataFrame = {
  10. spark.read
  11. .format("jdbc")
  12. .option("url", MysqlUtil.url)
  13. .option("dbtable", dbtable)
  14. .option("user", MysqlUtil.user)
  15. .option("password", MysqlUtil.password)
  16. .option("driver", "com.mysql.jdbc.Driver")
  17. .load()
  18. }
  19. def loadIntoMysql(df: DataFrame, dbtable: String): Unit = {
  20. //overwrite模式会删除整个表!!!
  21. df.write
  22. .format("jdbc")
  23. .mode("append")
  24. .option("url", MysqlUtil.url)
  25. .option("dbtable", dbtable)
  26. .option("user", MysqlUtil.user)
  27. .option("password", MysqlUtil.password)
  28. .option("driver", "com.mysql.jdbc.Driver")
  29. .save()
  30. }
  31. def insertIntoMysql(df: DataFrame, dao: DaoBase): Unit = {
  32. //使用原始jdbc连接msql,防止重复插入
  33. df.coalesce(1).foreachPartition { partitionOfRecords =>
  34. val ps = MysqlUtil.getConn.prepareStatement(dao.sqlText)
  35. partitionOfRecords.foreach { row =>
  36. dao.fun(row, ps)
  37. ps.executeUpdate()
  38. }
  39. }
  40. }
  41. def writeToHive(df: DataFrame, tableName: String): Unit = {
  42. // 分区表和非分区表都可以写入
  43. // df的字段顺序要求和tableName的顺序完全一致,包括分区字段
  44. spark.sql("set hive.exec.dynamic.partition=true")
  45. spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
  46. df.coalesce(1)
  47. .write
  48. .mode("overwrite")
  49. .format("hive")
  50. .insertInto(tableName)
  51. }
  52. }
  53. trait DaoBase {
  54. /**
  55. * insert … select … where not exist
  56. * 防止重复插入
  57. */
  58. val sqlText: String
  59. /**
  60. * 从df的row中提取插入mysql表实体信息,填充ps
  61. *
  62. * @param row
  63. * @param ps
  64. */
  65. def fun(row: Row, ps: PreparedStatement): Unit
  66. }
  67. object carInfoDao extends DaoBase {
  68. val sqlText =
  69. """
  70. |INSERT INTO car_info (name
  71. | ) SELECT
  72. | ?
  73. |FROM
  74. | DUAL
  75. |WHERE
  76. | NOT EXISTS (
  77. | SELECT
  78. | id
  79. | FROM
  80. | car_info
  81. | WHERE
  82. | id = ?
  83. | )
  84. |""".stripMargin
  85. def fun(row: Row, ps: PreparedStatement): Unit = {
  86. val entity = rowToEntity(row)
  87. ps.setString(1, entity.name)
  88. }
  89. def rowToEntity(row: Row): Entity = {
  90. Entity("ok")
  91. }
  92. }

3. spark 官网资料

JDBC To Other Databases