1. build sql
def upsert(df: DataFrame, tableName: String, pks: Seq[String], url: String, user: String, password: String ): Unit = {
val fieldNames = df.schema.fieldNames
val nonPkFieldNames = df.schema.fieldNames.filterNot(c => pks.contains(c))
val size = df.schema.size
val upsertSql = s"INSERT INTO $tableName( ${fieldNames.mkString(",")} ) " +
s"VALUES (${(0 until size).map(_ => "?").mkString(",")}) " +
s"ON DUPLICATE KEY UPDATE " +
// s"${nonPkFieldNames.mkString("=?,")}=?"
s"${nonPkFieldNames.map(f => s"$f=VALUES($f)").mkString(",")}"
// TeamsSender(upsertSql)
df.foreachPartition(partition => {
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection(url, user, password)
try {
implicit val stmt = conn.prepareStatement(upsertSql)
conn.setAutoCommit(false)
partition.grouped(2000).foreach {
group => {
group.foreach {
record => {
var i = 0
record.schema.zip(record.toSeq).map(row => {
i += 1
dataTypeAdaptor(row._1.dataType, i, row._2)
})
stmt.addBatch()
}
}
stmt.executeBatch()
conn.commit()
}
}
stmt.executeBatch()//no use
conn.commit()//no use
} catch {
case e => {
e.printStackTrace()
throw e
}
} finally {
if (conn != null) {
conn.close()
}
}
})
}
def dataTypeAdaptor(dataType: DataType, index: Int, value: Any)(implicit stmt: PreparedStatement): Unit = {
dataType match {
case DateType => stmt.setDate(index, value.asInstanceOf[java.sql.Date])
case IntegerType => stmt.setInt(index, value.asInstanceOf[Int])
case LongType => stmt.setLong(index, value.asInstanceOf[Long])
case DoubleType => stmt.setDouble(index, value.asInstanceOf[Double])
case StringType => stmt.setString(index, if (value == null) null else value.toString)
case BinaryType => stmt.setBytes(index, if (value == null) null else value.asInstanceOf[Array[Byte]])
case TimestampType => {
stmt.setTimestamp(index, new Timestamp(new Date().getTime))
}
case NullType => stmt.setNull(index, 0)
}
}
2. use tispark
TiSpark User Guide
datasource_api_userguide.md
use replace=true
config
Placement Driver (PD)
3. batch delete first + insert
我这边delete 大量数据,tidb报oom,可能需要额外的配置
delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401
可能的解决办法:
- 新建 jdbc连接后,执行
set @@session.tidb_batch_delete=1;
- 然后再执行 delete语句
- 这样有可能是的服务端删除时分块来删, ```scala val tidbConf = MapString, String
df.write .format(JDBC) .mode(mode) .options(tidbConf) .save()
```
- tidb_batch_delete
- tidb_dml_batch_size
4. batch delete limit + insert
delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401 limit 10000
循环若干次,最终删除完成