1. 读取 mysql
val jdbcDF = spark.read  .format("jdbc")  .option("url", "jdbc:postgresql:dbserver")  .option("dbtable", "schema.tablename")  .option("user", "username")  .option("password", "password")  .option("driver", "com.mysql.jdbc.Driver")  .load()
2. 写入 mysql
jdbcDF.write  .format("jdbc")  .option("url", "jdbc:postgresql:dbserver")  .option("dbtable", "schema.tablename")  .option("user", "username")  .option("password", "password")  .option("driver", "com.mysql.jdbc.Driver")  .save()
trait Base {  val appName: String  lazy val spark = SparkSession    .builder()    .appName(appName)    .enableHiveSupport()    .getOrCreate()  def run(): Unit  def fetchFromMysql(dbtable: String): DataFrame = {    spark.read      .format("jdbc")      .option("url", MysqlUtil.url)      .option("dbtable", dbtable)      .option("user", MysqlUtil.user)      .option("password", MysqlUtil.password)      .option("driver", "com.mysql.jdbc.Driver")      .load()  }  def loadIntoMysql(df: DataFrame, dbtable: String): Unit = {    //overwrite模式会删除整个表!!!    df.write      .format("jdbc")      .mode("append")      .option("url", MysqlUtil.url)      .option("dbtable", dbtable)      .option("user", MysqlUtil.user)      .option("password", MysqlUtil.password)      .option("driver", "com.mysql.jdbc.Driver")      .save()  }  def insertIntoMysql(df: DataFrame, dao: DaoBase): Unit = {    //使用原始jdbc连接msql,防止重复插入    df.coalesce(1).foreachPartition { partitionOfRecords =>      val ps = MysqlUtil.getConn.prepareStatement(dao.sqlText)      partitionOfRecords.foreach { row =>        dao.fun(row, ps)        ps.executeUpdate()      }    }  }  def writeToHive(df: DataFrame, tableName: String): Unit = {    // 分区表和非分区表都可以写入    // df的字段顺序要求和tableName的顺序完全一致,包括分区字段    spark.sql("set hive.exec.dynamic.partition=true")    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")    df.coalesce(1)      .write      .mode("overwrite")      .format("hive")      .insertInto(tableName)  }}trait DaoBase {  /**   * insert … select … where not exist   * 防止重复插入   */  val sqlText: String  /**   * 从df的row中提取插入mysql表实体信息,填充ps   *   * @param row   * @param ps   */  def fun(row: Row, ps: PreparedStatement): Unit}object carInfoDao extends DaoBase {  val sqlText =    """      |INSERT INTO car_info (name      |      ) SELECT      |    ?      |FROM      |    DUAL      |WHERE      |    NOT EXISTS (      |        SELECT      |            id      |        FROM      |            car_info      |        WHERE      |            id = ?      |    )      |""".stripMargin  def fun(row: Row, ps: PreparedStatement): Unit = {    val entity = rowToEntity(row)    ps.setString(1, entity.name)  }  def rowToEntity(row: Row): Entity = {    Entity("ok")  }}
3. spark 官网资料
JDBC To Other Databases