Read

ResultSet

By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time.
To enable this functionality, create a Statement instance in the following manner:

  1. stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
  2. stmt.setFetchSize(Integer.MIN_VALUE);

The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row.
There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown.
The earliest the locks these statements hold can be released (whether they be MyISAM table-level locks or row-level locks in some other storage engine such as InnoDB) is when the statement completes.
If the statement is within scope of a transaction, then locks are released when the transaction completes (which implies that the statement needs to complete first). As with most other databases, statements are not complete until all the results pending on the statement are read or the active result set for the statement is closed.
Therefore, if using streaming results, process them as quickly as possible if you want to maintain concurrent access to the tables referenced by the statement producing the result set.
Another alternative is to use cursor-based streaming to retrieve a set number of rows each time. This can be done by setting the connection property useCursorFetch to true, and then calling setFetchSize(int) with int being the desired number of rows to be fetched each time:

  1. conn = DriverManager.getConnection("jdbc:mysql://localhost/?useCursorFetch=true", "user", "s3cr3t");
  2. stmt = conn.createStatement();
  3. stmt.setFetchSize(100);
  4. rs = stmt.executeQuery("SELECT * FROM your_table_here");

Mysql - 图1VS
Mysql - 图2
When streaming, export is finished in about 9 seconds vs about 137 seconds when using paging

Code Analysis

  • com.mysql.jdbc.StatementImpl#setFetchSize

    1. public void setFetchSize(int rows) throws SQLException {
    2. synchronized (checkClosed().getConnectionMutex()) {
    3. if (((rows < 0) && (rows != Integer.MIN_VALUE))
    4. || ((this.maxRows > 0) && (rows > this
    5. .getMaxRows()))) {
    6. throw SQLError.createSQLException(
    7. Messages.getString("Statement.7"), //$NON-NLS-1$
    8. SQLError.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor()); //$NON-NLS-1$ //$NON-NLS-2$
    9. }
    10. this.fetchSize = rows;
    11. }
    12. }
  • com.mysql.jdbc.StatementImpl#createStreamingResultSet

    1. protected boolean createStreamingResultSet() {
    2. try {
    3. synchronized (checkClosed().getConnectionMutex()) {
    4. return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
    5. && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
    6. }
    7. } catch (SQLException e) {
    8. // we can't break the interface, having this be no-op in case of error is ok
    9. return false;
    10. }
    11. }
  • com.mysql.jdbc.StatementImpl#executeQuery

Use Case

Good

  • com.alibaba.datax.plugin.rdbms.util.DBUtil

    1. public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)
    2. throws SQLException {
    3. // make sure autocommit is off
    4. conn.setAutoCommit(false);
    5. Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
    6. ResultSet.CONCUR_READ_ONLY);
    7. // Integer.MIN_VALUE
    8. stmt.setFetchSize(fetchSize);
    9. stmt.setQueryTimeout(queryTimeout);
    10. return query(stmt, sql);
    11. }
  • org.apache.spark.rdd.JdbcRDD#compute

    1. override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
    2. {
    3. context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() }
    4. val part = thePart.asInstanceOf[JdbcPartition]
    5. val conn = getConnection()
    6. val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    7. val url = conn.getMetaData.getURL
    8. if (url.startsWith("jdbc:mysql:")) {
    9. // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
    10. // streaming results, rather than pulling entire resultset into memory.
    11. // See the below URL
    12. // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
    13. stmt.setFetchSize(Integer.MIN_VALUE)
    14. } else {
    15. stmt.setFetchSize(100)
    16. }
    17. logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
    18. stmt.setLong(1, part.lower)
    19. stmt.setLong(2, part.upper)
    20. val rs = stmt.executeQuery()
    21. ...

    Bad

  • org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions#fetchSize

    1. val fetchSize = {
    2. val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
    3. require(size >= 0,
    4. s"Invalid value `${size.toString}` for parameter " +
    5. s"`$JDBC_BATCH_FETCH_SIZE`. The minimum value is 0. When the value is 0, " +
    6. "the JDBC driver ignores the value and does the estimates.")
    7. size
    8. }
  • 备注: spark3.0 preview implement like this:

    • org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions#fetchSize
      1. val fetchSize = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt

思考

Reference

https://knes1.github.io/blog/2015/2015-10-19-streaming-mysql-results-using-java8-streams-and-spring-data.html