响应式 MySQL 客户端

响应式 MySQL 客户端具有简单易懂的 API,专注于可扩展性和低开销。

特性

  • 事件驱动
  • 轻量级
  • 内置连接池
  • 预处理查询缓存
  • 支持游标
  • 流式行处理
  • RxJava API
  • 支持内存直接映射到对象,避免了不必要的复制
  • 完整的数据类型支持
  • 支持存储过程
  • 支持 TLS/SSL
  • MySQL 实用程序命令支持
  • 支持 MySQL 和 MariaDB
  • 丰富的字符排序(collation)和字符集支持
  • Unix 域套接字

使用方法

使用响应式 MySQL 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:

  • Maven:
  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-mysql-client</artifactId>
  4. <version>4.1.5</version>
  5. </dependency>
  • Gradle:
  1. dependencies {
  2. compile 'io.vertx:vertx-mysql-client:4.1.5'
  3. }

开始

以下是最简单的连接,查询和断开连接方法

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  2. .setPort(3306)
  3. .setHost("the-host")
  4. .setDatabase("the-db")
  5. .setUser("user")
  6. .setPassword("secret");
  7. // 连接池选项
  8. PoolOptions poolOptions = new PoolOptions()
  9. .setMaxSize(5);
  10. // 创建客户端池
  11. MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
  12. // 一个简单的查询
  13. client
  14. .query("SELECT * FROM users WHERE id='julien'")
  15. .execute(ar -> {
  16. if (ar.succeeded()) {
  17. RowSet<Row> result = ar.result();
  18. System.out.println("Got " + result.size() + " rows ");
  19. } else {
  20. System.out.println("Failure: " + ar.cause().getMessage());
  21. }
  22. // 现在关闭客户端池
  23. client.close();
  24. });

连接到 MySQL

大多数时间,您将使用连接池连接到 MySQL:

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  2. .setPort(3306)
  3. .setHost("the-host")
  4. .setDatabase("the-db")
  5. .setUser("user")
  6. .setPassword("secret");
  7. // 连接池选项
  8. PoolOptions poolOptions = new PoolOptions()
  9. .setMaxSize(5);
  10. // 创建带连接池的客户端
  11. MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

当您不再需要连接池时,您需要释放它:

  1. client.close();

当您需要在同一连接上执行多个操作时,您需要使用 connection 客户端 。

您可以轻松地从连接池中获取一个:

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  2. .setPort(3306)
  3. .setHost("the-host")
  4. .setDatabase("the-db")
  5. .setUser("user")
  6. .setPassword("secret");
  7. // 连接池选项
  8. PoolOptions poolOptions = new PoolOptions()
  9. .setMaxSize(5);
  10. // 创建带连接池的客户端
  11. MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
  12. // 从连接池获得连接
  13. client.getConnection().compose(conn -> {
  14. System.out.println("Got a connection from the pool");
  15. // 所有操作都在同一连接上执行
  16. return conn
  17. .query("SELECT * FROM users WHERE id='julien'")
  18. .execute()
  19. .compose(res -> conn
  20. .query("SELECT * FROM users WHERE id='emad'")
  21. .execute())
  22. .onComplete(ar -> {
  23. // 释放连接池的连接
  24. conn.close();
  25. });
  26. }).onComplete(ar -> {
  27. if (ar.succeeded()) {
  28. System.out.println("Done");
  29. } else {
  30. System.out.println("Something went wrong " + ar.cause().getMessage());
  31. }
  32. });

配置

有几个选项供您配置客户端。

数据对象

配置客户端的简单方法就是指定 MySQLConnectOptions 数据对象。

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  2. .setPort(3306)
  3. .setHost("the-host")
  4. .setDatabase("the-db")
  5. .setUser("user")
  6. .setPassword("secret");
  7. // 连接池选项
  8. PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
  9. // 从数据对象创建连接池
  10. MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
  11. pool.getConnection(ar -> {
  12. // 处理您的连接
  13. });

字符序(collations)和字符集(character sets)

响应式 MySQL 客户端支持配置字符序或字符集,并将它们映射到一个相关的 java.nio.charset.Charset 。 您可以为数据库连接指定字符集,例如

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions();
  2. // 将连接的字符集设置为utf8而不是默认的字符集utf8mb4
  3. connectOptions.setCharset("utf8");

响应式 MySQL 客户端的默认字符集是 utf8mb4 。字符串值,如密码和错误消息等,总是使用 UTF-8 字符集解码。

characterEncoding 选项用于设置字符串(例如查询字符串和参数值)使用的 Java 字符集,默认使用 UTF-8 字符集;如果设置为 null ,则客户端将使用 Java 的默认字符集。

您还可以为连接指定字符序,例如

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions();
  2. // 将连接的字符序设置为 utf8_general_ci 来代替默认字符序 utf8mb4_general_ci
  3. // 设置字符序将覆盖charset选项
  4. connectOptions.setCharset("gbk");
  5. connectOptions.setCollation("utf8_general_ci");

请注意,在数据对象上设置字符序将覆盖 charsetcharacterEncoding 选项。

您可以执行 SQL SHOW COLLATION;SHOW CHARACTER SET; 获取服务器支持的字符序和字符集。

连接属性

还可以使用 setPropertiesaddProperty 方法配置连接属性。注意 setProperties 将覆盖客户端的默认属性。

  1. MySQLConnectOptions connectOptions = new MySQLConnectOptions();
  2. // 添加连接属性
  3. connectOptions.addProperty("_java_version", "1.8.0_212");
  4. // 覆盖属性
  5. Map<String, String> attributes = new HashMap<>();
  6. attributes.put("_client_name", "myapp");
  7. attributes.put("_client_version", "1.0.0");
  8. connectOptions.setProperties(attributes);

配置 useAffectedRows

您可以 useAffectedRows 选项以决定是否在连接到服务器时设置标志 CLIENT_FOUND_ROWS。如果指定了 CLIENT_FOUND_ROWS 标志,则受影响的行计数(返回的)是查找到的行数,而不是受影响的行数。

连接 URI

除了使用 MySQLConnectOptions 数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:

  1. String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb";
  2. // 从连接URI创建连接池
  3. MySQLPool pool = MySQLPool.pool(connectionUri);
  4. // 从连接URI创建连接
  5. MySQLConnection.connect(vertx, connectionUri, res -> {
  6. // 处理您的连接
  7. });

目前,客户端支持以下的连接 uri 参数关键字(不区分大小写):

  • host
  • port
  • user
  • password
  • schema
  • socket
  • useAffectedRows

连接重试

您可以将客户端配置为在连接无法建立时重试。

  1. options
  2. .setReconnectAttempts(2)
  3. .setReconnectInterval(1000);

运行查询

当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询。连接池将使用其中一个连接来运行查询并将结果返回给您。

这是运行简单查询的方法:

  1. client
  2. .query("SELECT * FROM users WHERE id='julien'")
  3. .execute(ar -> {
  4. if (ar.succeeded()) {
  5. RowSet<Row> result = ar.result();
  6. System.out.println("Got " + result.size() + " rows ");
  7. } else {
  8. System.out.println("Failure: " + ar.cause().getMessage());
  9. }
  10. });

预处理查询

您可以对预处理查询执行相同的操作。

SQL字符串可以使用数据库语法 ? 按位置引用参数

  1. client
  2. .preparedQuery("SELECT * FROM users WHERE id=?")
  3. .execute(Tuple.of("julien"), ar -> {
  4. if (ar.succeeded()) {
  5. RowSet<Row> rows = ar.result();
  6. System.out.println("Got " + rows.size() + " rows ");
  7. } else {
  8. System.out.println("Failure: " + ar.cause().getMessage());
  9. }
  10. });

查询方法提供异步 RowSet 实例,它适用于 SELECT 查询。

  1. client
  2. .preparedQuery("SELECT first_name, last_name FROM users")
  3. .execute(ar -> {
  4. if (ar.succeeded()) {
  5. RowSet<Row> rows = ar.result();
  6. for (Row row : rows) {
  7. System.out.println("User " + row.getString(0) + " " + row.getString(1));
  8. }
  9. } else {
  10. System.out.println("Failure: " + ar.cause().getMessage());
  11. }
  12. });

UPDATE/INSERT 查询:

  1. client
  2. .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
  3. .execute(Tuple.of("Julien", "Viet"), ar -> {
  4. if (ar.succeeded()) {
  5. RowSet<Row> rows = ar.result();
  6. System.out.println(rows.rowCount());
  7. } else {
  8. System.out.println("Failure: " + ar.cause().getMessage());
  9. }
  10. });

Row 使您可以按索引访问数据

  1. System.out.println("User " + row.getString(0) + " " + row.getString(1));

或按名字

  1. System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在这里不会使用任何魔术,并且无论您的SQL文本如何,列名都将用表中的名称标识。

您可以访问多样的类型

  1. String firstName = row.getString("first_name");
  2. Boolean male = row.getBoolean("male");
  3. Integer age = row.getInteger("age");

您可以使用缓存的预处理语句执行一次性的预处理查询:

  1. connectOptions.setCachePreparedStatements(true);
  2. client
  3. .preparedQuery("SELECT * FROM users WHERE id = ?")
  4. .execute(Tuple.of("julien"), ar -> {
  5. if (ar.succeeded()) {
  6. RowSet<Row> rows = ar.result();
  7. System.out.println("Got " + rows.size() + " rows ");
  8. } else {
  9. System.out.println("Failure: " + ar.cause().getMessage());
  10. }
  11. });

您可以创建一个 PreparedStatement 并自己管理生命周期。

  1. sqlConnection
  2. .prepare("SELECT * FROM users WHERE id = ?", ar -> {
  3. if (ar.succeeded()) {
  4. PreparedStatement preparedStatement = ar.result();
  5. preparedStatement.query()
  6. .execute(Tuple.of("julien"), ar2 -> {
  7. if (ar2.succeeded()) {
  8. RowSet<Row> rows = ar2.result();
  9. System.out.println("Got " + rows.size() + " rows ");
  10. preparedStatement.close();
  11. } else {
  12. System.out.println("Failure: " + ar2.cause().getMessage());
  13. }
  14. });
  15. } else {
  16. System.out.println("Failure: " + ar.cause().getMessage());
  17. }
  18. });

批处理

您可以执行预处理的批处理

  1. List<Tuple> batch = new ArrayList<>();
  2. batch.add(Tuple.of("julien", "Julien Viet"));
  3. batch.add(Tuple.of("emad", "Emad Alblueshi"));
  4. // 执行预处理的批处理
  5. client
  6. .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
  7. .executeBatch(batch, res -> {
  8. if (res.succeeded()) {
  9. // 处理行
  10. RowSet<Row> rows = res.result();
  11. } else {
  12. System.out.println("Batch failed " + res.cause());
  13. }
  14. });

MySQL LAST_INSERT_ID

往表中插入一条记录后,可以获得自增值。

  1. client
  2. .query("INSERT INTO test(val) VALUES ('v1')")
  3. .execute(ar -> {
  4. if (ar.succeeded()) {
  5. RowSet<Row> rows = ar.result();
  6. long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
  7. System.out.println("Last inserted id is: " + lastInsertId);
  8. } else {
  9. System.out.println("Failure: " + ar.cause().getMessage());
  10. }
  11. });

使用连接

获取连接

当需要执行顺序查询(无事务)时,可以创建一个新连接或从连接池中借用一个。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。

  1. pool
  2. .getConnection()
  3. .compose(connection ->
  4. connection
  5. .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
  6. .executeBatch(Arrays.asList(
  7. Tuple.of("Julien", "Viet"),
  8. Tuple.of("Emad", "Alblueshi")
  9. ))
  10. .compose(res -> connection
  11. // 对行执行一些操作
  12. .query("SELECT COUNT(*) FROM Users")
  13. .execute()
  14. .map(rows -> rows.iterator().next().getInteger(0)))
  15. // 将连接返回到连接池中
  16. .eventually(v -> connection.close())
  17. ).onSuccess(count -> {
  18. System.out.println("Insert users, now the number of users is " + count);
  19. });

可以创建预处理查询语句:

  1. connection
  2. .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  3. .compose(pq ->
  4. pq.query()
  5. .execute(Tuple.of("Julien"))
  6. .eventually(v -> pq.close())
  7. ).onSuccess(rows -> {
  8. // 所有的行
  9. });

简单连接 API

当您创建了一个连接池, 您可以调用 withConnection 并传入一个使用连接进行处理的函数。

它从连接池中借用一个连接,并使用该连接调用函数。

该函数必须返回一个任意结果的 Future。

Future 完成后, 连接将归还至连接池,并提供全部的结果。

  1. pool.withConnection(connection ->
  2. connection
  3. .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
  4. .executeBatch(Arrays.asList(
  5. Tuple.of("Julien", "Viet"),
  6. Tuple.of("Emad", "Alblueshi")
  7. ))
  8. .compose(res -> connection
  9. // Do something with rows
  10. .query("SELECT COUNT(*) FROM Users")
  11. .execute()
  12. .map(rows -> rows.iterator().next().getInteger(0)))
  13. ).onSuccess(count -> {
  14. System.out.println("Insert users, now the number of users is " + count);
  15. });

使用事务

带事务的连接

您可以使用SQL BEGIN/COMMIT/ROLLBACK 执行事务,如果您必须这么做,就必须使用 SqlConnection 自己管理事务。

或者您使用 SqlConnection 的事务API:

  1. pool.getConnection()
  2. // 事务必须使用一个连接
  3. .onSuccess(conn -> {
  4. // 开始事务
  5. conn.begin()
  6. .compose(tx -> conn
  7. // 各种语句
  8. .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  9. .execute()
  10. .compose(res2 -> conn
  11. .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
  12. .execute())
  13. // 提交事务
  14. .compose(res3 -> tx.commit()))
  15. // 将连接返回到连接池
  16. .eventually(v -> conn.close())
  17. .onSuccess(v -> System.out.println("Transaction succeeded"))
  18. .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  19. });

当数据库服务器报告当前事务失败时(例如,臭名昭著的 current transaction is aborted, commands ignored until end of transaction block), 事务被回滚,此时 completion Future 会失败, 并返回 TransactionRollbackException 异常:

  1. tx.completion()
  2. .onFailure(err -> {
  3. System.out.println("Transaction failed => rolled back");
  4. });

简单事务 API

当您创建了一个连接池, 您可以调用 withTransaction 并传入一个使用连接进行处理的函数。

它从连接池中借用一个连接,开始事务,并且,在此事务范围内所有执行操作的客户端调用该函数。

该函数必须返回一个任意结果的Future。

  • 当Future成功,客户端提交这个事务
  • 当Future失败,客户端回滚这个事务

事务完成后, 连接将返回到连接池中,并提供全部的结果。

  1. pool.withTransaction(client -> client
  2. .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  3. .execute()
  4. .flatMap(res -> client
  5. .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
  6. .execute()
  7. // 映射一个消息结果
  8. .map("Users inserted")))
  9. .onSuccess(v -> System.out.println("Transaction succeeded"))
  10. .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流

默认情况下,执行预处理查询将获取所有行,您可以使用 Cursor 控制想读取的行数:

  1. connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  2. if (ar1.succeeded()) {
  3. PreparedStatement pq = ar1.result();
  4. // 创建游标
  5. Cursor cursor = pq.cursor(Tuple.of(18));
  6. // 读取50行
  7. cursor.read(50, ar2 -> {
  8. if (ar2.succeeded()) {
  9. RowSet<Row> rows = ar2.result();
  10. // 检查更多 ?
  11. if (cursor.hasMore()) {
  12. // 重复这个过程...
  13. } else {
  14. // 没有更多行-关闭游标
  15. cursor.close();
  16. }
  17. }
  18. });
  19. }
  20. });

游标提前释放时应将其关闭:

  1. cursor.read(50, ar2 -> {
  2. if (ar2.succeeded()) {
  3. // 关闭游标
  4. cursor.close();
  5. }
  6. });

游标还可以使用流式API,这可以更加方便,尤其是在Rx化的版本中。

  1. connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  2. if (ar1.succeeded()) {
  3. PreparedStatement pq = ar1.result();
  4. // 一次获取50行
  5. RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
  6. // 使用流
  7. stream.exceptionHandler(err -> {
  8. System.out.println("Error: " + err.getMessage());
  9. });
  10. stream.endHandler(v -> {
  11. System.out.println("End of stream");
  12. });
  13. stream.handler(row -> {
  14. System.out.println("User: " + row.getString("last_name"));
  15. });
  16. }
  17. });

当这些行已传递给处理程序时,该流将批量读取 50 行并将其流化。 然后读取新一批的 50 行数据,依此类推。

流可以恢复或暂停,已加载的行将保留在内存中,直到被送达,游标将停止迭代。

MySQL 类型映射

当前客户端支持以下 MySQL 类型

  • BOOL,BOOLEAN (java.lang.Byte)
  • TINYINT (java.lang.Byte)
  • TINYINT UNSIGNED(java.lang.Short)
  • SMALLINT (java.lang.Short)
  • SMALLINT UNSIGNED(java.lang.Integer)
  • MEDIUMINT (java.lang.Integer)
  • MEDIUMINT UNSIGNED(java.lang.Integer)
  • INT,INTEGER (java.lang.Integer)
  • INTEGER UNSIGNED(java.lang.Long)
  • BIGINT (java.lang.Long)
  • BIGINT UNSIGNED(io.vertx.sqlclient.data.Numeric)
  • FLOAT (java.lang.Float)
  • FLOAT UNSIGNED(java.lang.Float)
  • DOUBLE (java.lang.Double)
  • DOUBLE UNSIGNED(java.lang.Double)
  • BIT (java.lang.Long)
  • NUMERIC (io.vertx.sqlclient.data.Numeric)
  • NUMERIC UNSIGNED(io.vertx.sqlclient.data.Numeric)
  • DATE (java.time.LocalDate)
  • DATETIME (java.time.LocalDateTime)
  • TIME (java.time.Duration)
  • TIMESTAMP (java.time.LocalDateTime)
  • YEAR (java.lang.Short)
  • CHAR (java.lang.String)
  • VARCHAR (java.lang.String)
  • BINARY (io.vertx.core.buffer.Buffer)
  • VARBINARY (io.vertx.core.buffer.Buffer)
  • TINYBLOB (io.vertx.core.buffer.Buffer)
  • TINYTEXT (java.lang.String)
  • BLOB (io.vertx.core.buffer.Buffer)
  • TEXT (java.lang.String)
  • MEDIUMBLOB (io.vertx.core.buffer.Buffer)
  • MEDIUMTEXT (java.lang.String)
  • LONGBLOB (io.vertx.core.buffer.Buffer)
  • LONGTEXT (java.lang.String)
  • ENUM (java.lang.String)
  • SET (java.lang.String)
  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)
  • GEOMETRY(io.vertx.mysqlclient.data.spatial.*)

元组解码在存储值时使用上述类型

请注意:在Java中,没有无符号数字值的具体表示形式,因此客户端会将无符号值转换为相关的Java类型。

隐式类型转换

当执行预处理语句时,响应式 MySQL 客户端支持隐式类型转换。 假设您的表中有一个 TIME 列,下面的两个示例都是有效的。

  1. client
  2. .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  3. .execute(Tuple.of(LocalTime.of(19, 10, 25)), ar -> {
  4. // 处理结果
  5. });
  6. // 这个也适用于隐式类型转换
  7. client
  8. .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  9. .execute(Tuple.of("19:10:25"), ar -> {
  10. // 处理结果
  11. });

MySQL 数据类型编码是根据参数值推断的。下面是具体的类型映射:

参数值 MySQL 类型编码
null MYSQL_TYPE_NULL
java.lang.Byte MYSQL_TYPE_TINY
java.lang.Boolean MYSQL_TYPE_TINY
java.lang.Short MYSQL_TYPE_SHORT
java.lang.Integer MYSQL_TYPE_LONG
java.lang.Long MYSQL_TYPE_LONGLONG
java.lang.Double MYSQL_TYPE_DOUBLE
java.lang.Float MYSQL_TYPE_FLOAT
java.time.LocalDate MYSQL_TYPE_DATE
java.time.Duration MYSQL_TYPE_TIME
java.time.LocalTime MYSQL_TYPE_TIME
io.vertx.core.buffer.Buffer MYSQL_TYPE_BLOB
java.time.LocalDateTime MYSQL_TYPE_DATETIME
io.vertx.mysqlclient.data.spatial.* MYSQL_TYPE_BLOB
default MYSQL_TYPE_STRING

集合类查询

您可以将查询API与Java集合类结合使用:

  1. Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  2. row -> row.getLong("id"),
  3. row -> row.getString("last_name"));
  4. // 运行查询使用集合类
  5. client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
  6. if (ar.succeeded()) {
  7. SqlResult<Map<Long, String>> result = ar.result();
  8. // 获取用集合类创建的map
  9. Map<Long, String> map = result.value();
  10. System.out.println("Got " + map);
  11. } else {
  12. System.out.println("Failure: " + ar.cause().getMessage());
  13. }
  14. });

集合类处理不能保留 Row 的引用,因为只有一个 Row 对象用于处理整个集合。

Java Collectors 提供了许多有趣的预定义集合类,例如, 您可以直接用 Row 中的集合轻松拼接成一个字符串:

  1. Collector<Row, ?, String> collector = Collectors.mapping(
  2. row -> row.getString("last_name"),
  3. Collectors.joining(",", "(", ")")
  4. );
  5. // 运行查询使用集合类
  6. client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
  7. if (ar.succeeded()) {
  8. SqlResult<String> result = ar.result();
  9. // 获取用集合类创建的String
  10. String list = result.value();
  11. System.out.println("Got " + list);
  12. } else {
  13. System.out.println("Failure: " + ar.cause().getMessage());
  14. }
  15. });

MySQL 存储过程

您可以在查询中运行存储过程。结果将按照 MySQL 协议 从服务器获取,无需任何魔法。

  1. client.query("CREATE PROCEDURE multi() BEGIN\n" +
  2. " SELECT 1;\n" +
  3. " SELECT 1;\n" +
  4. " INSERT INTO ins VALUES (1);\n" +
  5. " INSERT INTO ins VALUES (2);\n" +
  6. "END;").execute(ar1 -> {
  7. if (ar1.succeeded()) {
  8. // 创建存储过程成功
  9. client
  10. .query("CALL multi();")
  11. .execute(ar2 -> {
  12. if (ar2.succeeded()) {
  13. // 处理结果
  14. RowSet<Row> result1 = ar2.result();
  15. Row row1 = result1.iterator().next();
  16. System.out.println("First result: " + row1.getInteger(0));
  17. RowSet<Row> result2 = result1.next();
  18. Row row2 = result2.iterator().next();
  19. System.out.println("Second result: " + row2.getInteger(0));
  20. RowSet<Row> result3 = result2.next();
  21. System.out.println("Affected rows: " + result3.rowCount());
  22. } else {
  23. System.out.println("Failure: " + ar2.cause().getMessage());
  24. }
  25. });
  26. } else {
  27. System.out.println("Failure: " + ar1.cause().getMessage());
  28. }
  29. });

Note: 目前尚不支持绑定OUT参数的预处理语句。

SQL 客户端模版

SQL 客户端模版是一个用来方便执行SQL查询的库。

用法

要使用 SQL 客户端模版,需添加如下依赖:

  • Maven:
  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-sql-client-templates</artifactId>
  4. <version>4.1.5</version>
  5. </dependency>
  • Gradle:
  1. dependencies {
  2. implementation 'io.vertx:vertx-sql-client-templates:4.1.5'
  3. }

开始

以下是 SQL 模版最简易的使用方式。

一个 SQL 模版接收 已命名的 参数,因此,默认情况下,它会接收一个map作为参数载体,而非接收元组(tuple)作为参数。

一个SQL 模版默认情况下生成一个类似 PreparedQueryRowSet<Row> 。 实际上这个模版是 PreparedQuery 的轻量级封装。

  1. Map<String, Object> parameters = Collections.singletonMap("id", 1);
  2. SqlTemplate
  3. .forQuery(client, "SELECT * FROM users WHERE id=#{id}")
  4. .execute(parameters)
  5. .onSuccess(users -> {
  6. users.forEach(row -> {
  7. System.out.println(row.getString("first_name") + " " + row.getString("last_name"));
  8. });
  9. });

当您需要执行一个插入或更新操作,而您并不关心执行结果,您可以用 SqlTemplate.forUpdate

  1. Map<String, Object> parameters = new HashMap<>();
  2. parameters.put("id", 1);
  3. parameters.put("firstName", "Dale");
  4. parameters.put("lastName", "Cooper");
  5. SqlTemplate
  6. .forUpdate(client, "INSERT INTO users VALUES (#{id},#{firstName},#{lastName})")
  7. .execute(parameters)
  8. .onSuccess(v -> {
  9. System.out.println("Successful update");
  10. });

模板语法

模板语法使用 #{XXX} 的语法,其中 {XXX} 是一个有效的 java identifier 字符串 (不受关键字约束)

您可以用反斜杠(\)来转义 ` 字符,例如{foo}会被解析成#{foo}字符串,而不是名为foo` 的参数。

行映射

默认情况下模版以 Row 作为结果值类型。

您可以提供一个自定义的 RowMapper 来实现底层的映射操作:

  1. RowMapper<User> ROW_USER_MAPPER = row -> {
  2. User user = new User();
  3. user.id = row.getInteger("id");
  4. user.firstName = row.getString("firstName");
  5. user.lastName = row.getString("lastName");
  6. return user;
  7. };

实现底层映射操作:

  1. SqlTemplate
  2. .forQuery(client, "SELECT * FROM users WHERE id=#{id}")
  3. .mapTo(ROW_USER_MAPPER)
  4. .execute(Collections.singletonMap("id", 1))
  5. .onSuccess(users -> {
  6. users.forEach(user -> {
  7. System.out.println(user.firstName + " " + user.lastName);
  8. });
  9. });

JSON行映射

JSON 行映射是一个简单的模板映射,它用 toJson 将数据行映射成JSON对象。

  1. SqlTemplate
  2. .forQuery(client, "SELECT * FROM users WHERE id=#{id}")
  3. .mapTo(Row::toJson)
  4. .execute(Collections.singletonMap("id", 1))
  5. .onSuccess(users -> {
  6. users.forEach(user -> {
  7. System.out.println(user.encode());
  8. });
  9. });

参数映射

模板默认接收一个 Map<String, Object> 作为输入参数。

您可以提供一个自定义的映射(Mapper):

  1. TupleMapper<User> PARAMETERS_USER_MAPPER = TupleMapper.mapper(user -> {
  2. Map<String, Object> parameters = new HashMap<>();
  3. parameters.put("id", user.id);
  4. parameters.put("firstName", user.firstName);
  5. parameters.put("lastName", user.lastName);
  6. return parameters;
  7. });

实现参数映射:

  1. User user = new User();
  2. user.id = 1;
  3. user.firstName = "Dale";
  4. user.firstName = "Cooper";
  5. SqlTemplate
  6. .forUpdate(client, "INSERT INTO users VALUES (#{id},#{firstName},#{lastName})")
  7. .mapFrom(PARAMETERS_USER_MAPPER)
  8. .execute(user)
  9. .onSuccess(res -> {
  10. System.out.println("User inserted");
  11. });

JSON 参数映射

(译者注:原文为 anemic json parameters mapping,即anemic mapping,指单纯的属性映射,无行为)

JSON 参数映射是一个在模板参数和JSON对象之间的简单映射:

  1. JsonObject user = new JsonObject();
  2. user.put("id", 1);
  3. user.put("firstName", "Dale");
  4. user.put("lastName", "Cooper");
  5. SqlTemplate
  6. .forUpdate(client, "INSERT INTO users VALUES (#{id},#{firstName},#{lastName})")
  7. .mapFrom(TupleMapper.jsonObject())
  8. .execute(user)
  9. .onSuccess(res -> {
  10. System.out.println("User inserted");
  11. });

用Jackson的数据绑定功能做映射

您可以用Jackson的数据绑定功能来实现映射。

您需要添加 jackson-databind 依赖:

  • Maven:
  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. <version>${jackson.version}</version>
  5. </dependency>
  • Gradle:
  1. dependencies {
  2. compile 'com.fasterxml.jackson.core:jackson-databind:${jackson.version}'
  3. }

行映射是通过用键值对(key/value pair)来创建 JsonObject 实现的,然后 调用 mapTo 来将它映射为任何Java类。

  1. SqlTemplate
  2. .forQuery(client, "SELECT * FROM users WHERE id=#{id}")
  3. .mapTo(User.class)
  4. .execute(Collections.singletonMap("id", 1))
  5. .onSuccess(users -> {
  6. users.forEach(user -> {
  7. System.out.println(user.firstName + " " + user.lastName);
  8. });
  9. });

相似的,参数映射是用 JsonObject.mapFrom 将对象映射为 JsonObject 而实现的, 而后用 key/value pairs 来生成模板参数。

  1. User u = new User();
  2. u.id = 1;
  3. SqlTemplate
  4. .forUpdate(client, "INSERT INTO users VALUES (#{id},#{firstName},#{lastName})")
  5. .mapFrom(User.class)
  6. .execute(u)
  7. .onSuccess(res -> {
  8. System.out.println("User inserted");
  9. });

Java Date/Time API 映射

您可以用 jackson-modules-java8 的Jackson扩展包来实现对 java.time 的映射。

您需要加入 Jackson JSR 310 datatype 依赖:

  • Maven:
  1. <dependency>
  2. <groupId>com.fasterxml.jackson.datatype</groupId>
  3. <artifactId>jackson-datatype-jsr310</artifactId>
  4. <version>${jackson.version}</version>
  5. </dependency>
  • Gradle :
  1. dependencies {
  2. compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jackson.version}'
  3. }

然后您需要将时间模块注册到 Jackson的 ObjectMapper

  1. ObjectMapper mapper = io.vertx.core.json.jackson.DatabindCodec.mapper();
  2. mapper.registerModule(new JavaTimeModule());

您可以用 java.time 包中的类型,例如 LocalDateTime

  1. public class LocalDateTimePojo {
  2. public LocalDateTime localDateTime;
  3. }