1. 读取 mysql
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.option("driver", "com.mysql.jdbc.Driver")
.load()
2. 写入 mysql
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.option("driver", "com.mysql.jdbc.Driver")
.save()
trait Base {
val appName: String
lazy val spark = SparkSession
.builder()
.appName(appName)
.enableHiveSupport()
.getOrCreate()
def run(): Unit
def fetchFromMysql(dbtable: String): DataFrame = {
spark.read
.format("jdbc")
.option("url", MysqlUtil.url)
.option("dbtable", dbtable)
.option("user", MysqlUtil.user)
.option("password", MysqlUtil.password)
.option("driver", "com.mysql.jdbc.Driver")
.load()
}
def loadIntoMysql(df: DataFrame, dbtable: String): Unit = {
//overwrite模式会删除整个表!!!
df.write
.format("jdbc")
.mode("append")
.option("url", MysqlUtil.url)
.option("dbtable", dbtable)
.option("user", MysqlUtil.user)
.option("password", MysqlUtil.password)
.option("driver", "com.mysql.jdbc.Driver")
.save()
}
def insertIntoMysql(df: DataFrame, dao: DaoBase): Unit = {
//使用原始jdbc连接msql,防止重复插入
df.coalesce(1).foreachPartition { partitionOfRecords =>
val ps = MysqlUtil.getConn.prepareStatement(dao.sqlText)
partitionOfRecords.foreach { row =>
dao.fun(row, ps)
ps.executeUpdate()
}
}
}
def writeToHive(df: DataFrame, tableName: String): Unit = {
// 分区表和非分区表都可以写入
// df的字段顺序要求和tableName的顺序完全一致,包括分区字段
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
df.coalesce(1)
.write
.mode("overwrite")
.format("hive")
.insertInto(tableName)
}
}
trait DaoBase {
/**
* insert … select … where not exist
* 防止重复插入
*/
val sqlText: String
/**
* 从df的row中提取插入mysql表实体信息,填充ps
*
* @param row
* @param ps
*/
def fun(row: Row, ps: PreparedStatement): Unit
}
object carInfoDao extends DaoBase {
val sqlText =
"""
|INSERT INTO car_info (name
| ) SELECT
| ?
|FROM
| DUAL
|WHERE
| NOT EXISTS (
| SELECT
| id
| FROM
| car_info
| WHERE
| id = ?
| )
|""".stripMargin
def fun(row: Row, ps: PreparedStatement): Unit = {
val entity = rowToEntity(row)
ps.setString(1, entity.name)
}
def rowToEntity(row: Row): Entity = {
Entity("ok")
}
}
3. spark 官网资料
JDBC To Other Databases