1. build sql
def upsert(df: DataFrame, tableName: String, pks: Seq[String], url: String, user: String, password: String ): Unit = {val fieldNames = df.schema.fieldNamesval nonPkFieldNames = df.schema.fieldNames.filterNot(c => pks.contains(c))val size = df.schema.sizeval upsertSql = s"INSERT INTO $tableName( ${fieldNames.mkString(",")} ) " +s"VALUES (${(0 until size).map(_ => "?").mkString(",")}) " +s"ON DUPLICATE KEY UPDATE " +// s"${nonPkFieldNames.mkString("=?,")}=?"s"${nonPkFieldNames.map(f => s"$f=VALUES($f)").mkString(",")}"// TeamsSender(upsertSql)df.foreachPartition(partition => {Class.forName("com.mysql.cj.jdbc.Driver")val conn = DriverManager.getConnection(url, user, password)try {implicit val stmt = conn.prepareStatement(upsertSql)conn.setAutoCommit(false)partition.grouped(2000).foreach {group => {group.foreach {record => {var i = 0record.schema.zip(record.toSeq).map(row => {i += 1dataTypeAdaptor(row._1.dataType, i, row._2)})stmt.addBatch()}}stmt.executeBatch()conn.commit()}}stmt.executeBatch()//no useconn.commit()//no use} catch {case e => {e.printStackTrace()throw e}} finally {if (conn != null) {conn.close()}}})}def dataTypeAdaptor(dataType: DataType, index: Int, value: Any)(implicit stmt: PreparedStatement): Unit = {dataType match {case DateType => stmt.setDate(index, value.asInstanceOf[java.sql.Date])case IntegerType => stmt.setInt(index, value.asInstanceOf[Int])case LongType => stmt.setLong(index, value.asInstanceOf[Long])case DoubleType => stmt.setDouble(index, value.asInstanceOf[Double])case StringType => stmt.setString(index, if (value == null) null else value.toString)case BinaryType => stmt.setBytes(index, if (value == null) null else value.asInstanceOf[Array[Byte]])case TimestampType => {stmt.setTimestamp(index, new Timestamp(new Date().getTime))}case NullType => stmt.setNull(index, 0)}}
2. use tispark
TiSpark User Guide
datasource_api_userguide.md
use replace=true config
Placement Driver (PD)
3. batch delete first + insert
我这边delete 大量数据,tidb报oom,可能需要额外的配置
delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401
可能的解决办法:
- 新建 jdbc连接后,执行
set @@session.tidb_batch_delete=1; - 然后再执行 delete语句
- 这样有可能是的服务端删除时分块来删, ```scala val tidbConf = MapString, String
df.write .format(JDBC) .mode(mode) .options(tidbConf) .save()
```
- tidb_batch_delete
- tidb_dml_batch_size
4. batch delete limit + insert
delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401 limit 10000
循环若干次,最终删除完成
