1. build sql

  1. def upsert(df: DataFrame, tableName: String, pks: Seq[String], url: String, user: String, password: String ): Unit = {
  2. val fieldNames = df.schema.fieldNames
  3. val nonPkFieldNames = df.schema.fieldNames.filterNot(c => pks.contains(c))
  4. val size = df.schema.size
  5. val upsertSql = s"INSERT INTO $tableName( ${fieldNames.mkString(",")} ) " +
  6. s"VALUES (${(0 until size).map(_ => "?").mkString(",")}) " +
  7. s"ON DUPLICATE KEY UPDATE " +
  8. // s"${nonPkFieldNames.mkString("=?,")}=?"
  9. s"${nonPkFieldNames.map(f => s"$f=VALUES($f)").mkString(",")}"
  10. // TeamsSender(upsertSql)
  11. df.foreachPartition(partition => {
  12. Class.forName("com.mysql.cj.jdbc.Driver")
  13. val conn = DriverManager.getConnection(url, user, password)
  14. try {
  15. implicit val stmt = conn.prepareStatement(upsertSql)
  16. conn.setAutoCommit(false)
  17. partition.grouped(2000).foreach {
  18. group => {
  19. group.foreach {
  20. record => {
  21. var i = 0
  22. record.schema.zip(record.toSeq).map(row => {
  23. i += 1
  24. dataTypeAdaptor(row._1.dataType, i, row._2)
  25. })
  26. stmt.addBatch()
  27. }
  28. }
  29. stmt.executeBatch()
  30. conn.commit()
  31. }
  32. }
  33. stmt.executeBatch()//no use
  34. conn.commit()//no use
  35. } catch {
  36. case e => {
  37. e.printStackTrace()
  38. throw e
  39. }
  40. } finally {
  41. if (conn != null) {
  42. conn.close()
  43. }
  44. }
  45. })
  46. }
  47. def dataTypeAdaptor(dataType: DataType, index: Int, value: Any)(implicit stmt: PreparedStatement): Unit = {
  48. dataType match {
  49. case DateType => stmt.setDate(index, value.asInstanceOf[java.sql.Date])
  50. case IntegerType => stmt.setInt(index, value.asInstanceOf[Int])
  51. case LongType => stmt.setLong(index, value.asInstanceOf[Long])
  52. case DoubleType => stmt.setDouble(index, value.asInstanceOf[Double])
  53. case StringType => stmt.setString(index, if (value == null) null else value.toString)
  54. case BinaryType => stmt.setBytes(index, if (value == null) null else value.asInstanceOf[Array[Byte]])
  55. case TimestampType => {
  56. stmt.setTimestamp(index, new Timestamp(new Date().getTime))
  57. }
  58. case NullType => stmt.setNull(index, 0)
  59. }
  60. }

2. use tispark

TiSpark User Guide
datasource_api_userguide.md
use replace=true config

Placement Driver (PD)

3. batch delete first + insert

我这边delete 大量数据,tidb报oom,可能需要额外的配置
delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401

可能的解决办法:

  • 新建 jdbc连接后,执行set @@session.tidb_batch_delete=1;
  • 然后再执行 delete语句
  • 这样有可能是的服务端删除时分块来删, ```scala val tidbConf = MapString, String

df.write .format(JDBC) .mode(mode) .options(tidbConf) .save()

```

如何绕开大事务的限制

  • tidb_batch_delete
  • tidb_dml_batch_size

4. batch delete limit + insert



delete from ADS_OV_JMT_RAW_DATA_MONTHLY where month= 20220401 limit 10000
循环若干次,最终删除完成