https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
JDBC参数解析
url: 在url后加上参数rewriteBatchedStatements=true表示MySQL服务开启批次写入,此参数是批次写入的一个比较重要参数,可明显提升性能。
- 问题:不开启批量写入,几千条数据花费2分钟
- 原因在于:数据层面没有开启批次查询,需要在数据库连接后增加一个参数开启
| Parameter | Description | example | | —- | —- | —- | | url | 连接JDBC | jdbc:mysql://172.16.0.132:3306/collie_test | | dbtable |url = "jdbc:mysql://172.16.0.132:3306/collie_test?rewriteBatchedStatements=true"
1. 表名称
1. 可以使用括号中的子查询代替完整的表
|
1. table_name
1. (select * from table_name) as table1
| | driver | 驱动 | com.mysql.cj.jdbc.Driver | | numPartitions | 并行读写数据库的分区数,决定了JDBC连接的并发数 | | | fetchsize | read: JDBC提取大小 | | | batchsize | write: 每次insert行数 | 默认1k |
Loading data
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read.jdbc(
url = "jdbc:postgresql:dbserver",
table = "schema.tablename",
properties = {"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
Saving data
jdbcDF2.write.jdbc(
url = "jdbc:postgresql:dbserver",
table = "schema.tablename",
properties= {"user": "username", "password": "password"})
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})