https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

JDBC参数解析

url: 在url后加上参数rewriteBatchedStatements=true表示MySQL服务开启批次写入,此参数是批次写入的一个比较重要参数,可明显提升性能。

  • 问题:不开启批量写入,几千条数据花费2分钟
  • 原因在于:数据层面没有开启批次查询,需要在数据库连接后增加一个参数开启
    1. url = "jdbc:mysql://172.16.0.132:3306/collie_test?rewriteBatchedStatements=true"
    | Parameter | Description | example | | —- | —- | —- | | url | 连接JDBC | jdbc:mysql://172.16.0.132:3306/collie_test | | dbtable |
    1. 表名称
    1. 可以使用括号中的子查询代替完整的表
    |
    1. table_name
    1. (select * from table_name) as table1
    | | driver | 驱动 | com.mysql.cj.jdbc.Driver | | numPartitions | 并行读写数据库的分区数,决定了JDBC连接的并发数 | | | fetchsize | read: JDBC提取大小 | | | batchsize | write: 每次insert行数 | 默认1k |

Loading data

  1. jdbcDF = spark.read \
  2. .format("jdbc") \
  3. .option("url", "jdbc:postgresql:dbserver") \
  4. .option("dbtable", "schema.tablename") \
  5. .option("user", "username") \
  6. .option("password", "password") \
  7. .load()
  8. jdbcDF2 = spark.read.jdbc(
  9. url = "jdbc:postgresql:dbserver",
  10. table = "schema.tablename",
  11. properties = {"user": "username", "password": "password"})
  12. # Specifying dataframe column data types on read
  13. jdbcDF3 = spark.read \
  14. .format("jdbc") \
  15. .option("url", "jdbc:postgresql:dbserver") \
  16. .option("dbtable", "schema.tablename") \
  17. .option("user", "username") \
  18. .option("password", "password") \
  19. .option("customSchema", "id DECIMAL(38, 0), name STRING") \
  20. .load()

Saving data

  1. jdbcDF2.write.jdbc(
  2. url = "jdbc:postgresql:dbserver",
  3. table = "schema.tablename",
  4. properties= {"user": "username", "password": "password"})
  5. jdbcDF.write \
  6. .format("jdbc") \
  7. .option("url", "jdbc:postgresql:dbserver") \
  8. .option("dbtable", "schema.tablename") \
  9. .option("user", "username") \
  10. .option("password", "password") \
  11. .save()
  12. # Specifying create table column data types on write
  13. jdbcDF.write \
  14. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
  15. .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
  16. properties={"user": "username", "password": "password"})