https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
JDBC参数解析
url: 在url后加上参数rewriteBatchedStatements=true表示MySQL服务开启批次写入,此参数是批次写入的一个比较重要参数,可明显提升性能。
- 问题:不开启批量写入,几千条数据花费2分钟
- 原因在于:数据层面没有开启批次查询,需要在数据库连接后增加一个参数开启
| Parameter | Description | example | | —- | —- | —- | | url | 连接JDBC | jdbc:mysql://172.16.0.132:3306/collie_test | | dbtable |url = "jdbc:mysql://172.16.0.132:3306/collie_test?rewriteBatchedStatements=true"
1. 表名称
1. 可以使用括号中的子查询代替完整的表
|
1. table_name
1. (select * from table_name) as table1
| | driver | 驱动 | com.mysql.cj.jdbc.Driver | | numPartitions | 并行读写数据库的分区数,决定了JDBC连接的并发数 | | | fetchsize | read: JDBC提取大小 | | | batchsize | write: 每次insert行数 | 默认1k |
Loading data
jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()jdbcDF2 = spark.read.jdbc(url = "jdbc:postgresql:dbserver",table = "schema.tablename",properties = {"user": "username", "password": "password"})# Specifying dataframe column data types on readjdbcDF3 = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.option("customSchema", "id DECIMAL(38, 0), name STRING") \.load()
Saving data
jdbcDF2.write.jdbc(url = "jdbc:postgresql:dbserver",table = "schema.tablename",properties= {"user": "username", "password": "password"})jdbcDF.write \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.save()# Specifying create table column data types on writejdbcDF.write \.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})
