1. 读取 mysql
val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .option("driver", "com.mysql.jdbc.Driver") .load()
2. 写入 mysql
jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .option("driver", "com.mysql.jdbc.Driver") .save()
trait Base { val appName: String lazy val spark = SparkSession .builder() .appName(appName) .enableHiveSupport() .getOrCreate() def run(): Unit def fetchFromMysql(dbtable: String): DataFrame = { spark.read .format("jdbc") .option("url", MysqlUtil.url) .option("dbtable", dbtable) .option("user", MysqlUtil.user) .option("password", MysqlUtil.password) .option("driver", "com.mysql.jdbc.Driver") .load() } def loadIntoMysql(df: DataFrame, dbtable: String): Unit = { //overwrite模式会删除整个表!!! df.write .format("jdbc") .mode("append") .option("url", MysqlUtil.url) .option("dbtable", dbtable) .option("user", MysqlUtil.user) .option("password", MysqlUtil.password) .option("driver", "com.mysql.jdbc.Driver") .save() } def insertIntoMysql(df: DataFrame, dao: DaoBase): Unit = { //使用原始jdbc连接msql,防止重复插入 df.coalesce(1).foreachPartition { partitionOfRecords => val ps = MysqlUtil.getConn.prepareStatement(dao.sqlText) partitionOfRecords.foreach { row => dao.fun(row, ps) ps.executeUpdate() } } } def writeToHive(df: DataFrame, tableName: String): Unit = { // 分区表和非分区表都可以写入 // df的字段顺序要求和tableName的顺序完全一致,包括分区字段 spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") df.coalesce(1) .write .mode("overwrite") .format("hive") .insertInto(tableName) }}trait DaoBase { /** * insert … select … where not exist * 防止重复插入 */ val sqlText: String /** * 从df的row中提取插入mysql表实体信息,填充ps * * @param row * @param ps */ def fun(row: Row, ps: PreparedStatement): Unit}object carInfoDao extends DaoBase { val sqlText = """ |INSERT INTO car_info (name | ) SELECT | ? |FROM | DUAL |WHERE | NOT EXISTS ( | SELECT | id | FROM | car_info | WHERE | id = ? | ) |""".stripMargin def fun(row: Row, ps: PreparedStatement): Unit = { val entity = rowToEntity(row) ps.setString(1, entity.name) } def rowToEntity(row: Row): Entity = { Entity("ok") }}
3. spark 官网资料
JDBC To Other Databases