注册驱动

看似相当不起眼的一个操作,仅仅只是注册了下Mysql的Driver实现,实则内部隐含大秘密。在派生关系中,有一个NonRegisteringDriver类,是com.mysql.cj.jdbc.Driver的父类,在其静态代码块中,开启了回收丢弃连接的自旋任务,详情见com.mysql.cj.jdbc.AbandonedConnectionCleanupThread#AbandonedConnectionCleanupThread。

  1. static {
  2. try {
  3. Class.forName(AbandonedConnectionCleanupThread.class.getName());
  4. } catch (ClassNotFoundException e) {
  5. // ignore
  6. }
  7. }
  1. public void run() {
  2. for (;;) {
  3. try {
  4. checkThreadContextClassLoader();
  5. Reference<? extends MysqlConnection> reference = referenceQueue.remove(5000);
  6. if (reference != null) {
  7. finalizeResource((ConnectionFinalizerPhantomReference) reference);
  8. }
  9. } catch (InterruptedException e) {
  10. threadRefLock.lock();
  11. try {
  12. threadRef = null;
  13. // Finalize remaining references.
  14. Reference<? extends MysqlConnection> reference;
  15. while ((reference = referenceQueue.poll()) != null) {
  16. finalizeResource((ConnectionFinalizerPhantomReference) reference);
  17. }
  18. connectionFinalizerPhantomRefs.clear();
  19. } finally {
  20. threadRefLock.unlock();
  21. }
  22. return;
  23. } catch (Exception ex) {
  24. // Nowhere to really log this.
  25. }
  26. }
  27. }

打开连接

java.sql.DriverManager#getConnection(java.lang.String, java.lang.String, java.lang.String)通过委托模式,最终会调用到MySQL驱动包的connect方法
1、判断url是否支持
2、拼接参数获取cacheKey
3、从缓存中获取,如果缓存没有则生成url解析器,采用了读写锁做并发控制
4、调用com.mysql.cj.jdbc.ConnectionImpl#getInstance
5、调用com.mysql.cj.jdbc.ConnectionImpl#ConnectionImpl(com.mysql.cj.conf.HostInfo)
6、通过入参获取连接配置信息,ip,port,username,password
7、com.mysql.cj.jdbc.ConnectionImpl#createNewIO则是进行socket的建立(线程安全)
8、通过前面创建的session对象进行socketFactory的创建,以及连接socket
9、socket通信前握手

  1. public ConnectionImpl(HostInfo hostInfo) throws SQLException {
  2. try {
  3. // Stash away for later, used to clone this connection for Statement.cancel and Statement.setQueryTimeout().
  4. this.origHostInfo = hostInfo;
  5. this.origHostToConnectTo = hostInfo.getHost();
  6. this.origPortToConnectTo = hostInfo.getPort();
  7. this.database = hostInfo.getDatabase();
  8. this.user = StringUtils.isNullOrEmpty(hostInfo.getUser()) ? "" : hostInfo.getUser();
  9. this.password = StringUtils.isNullOrEmpty(hostInfo.getPassword()) ? "" : hostInfo.getPassword();
  10. this.props = hostInfo.exposeAsProperties();
  11. this.propertySet = new JdbcPropertySetImpl();
  12. this.propertySet.initializeProperties(this.props);
  13. // We need Session ASAP to get access to central driver functionality
  14. this.nullStatementResultSetFactory = new ResultSetFactory(this, null);
  15. this.session = new NativeSession(hostInfo, this.propertySet);
  16. this.session.addListener(this); // listen for session status changes
  17. // we can't cache fixed values here because properties are still not initialized with user provided values
  18. this.autoReconnectForPools = this.propertySet.getBooleanProperty(PropertyKey.autoReconnectForPools);
  19. this.cachePrepStmts = this.propertySet.getBooleanProperty(PropertyKey.cachePrepStmts);
  20. this.autoReconnect = this.propertySet.getBooleanProperty(PropertyKey.autoReconnect);
  21. this.useUsageAdvisor = this.propertySet.getBooleanProperty(PropertyKey.useUsageAdvisor);
  22. this.reconnectAtTxEnd = this.propertySet.getBooleanProperty(PropertyKey.reconnectAtTxEnd);
  23. this.emulateUnsupportedPstmts = this.propertySet.getBooleanProperty(PropertyKey.emulateUnsupportedPstmts);
  24. this.ignoreNonTxTables = this.propertySet.getBooleanProperty(PropertyKey.ignoreNonTxTables);
  25. this.pedantic = this.propertySet.getBooleanProperty(PropertyKey.pedantic);
  26. this.prepStmtCacheSqlLimit = this.propertySet.getIntegerProperty(PropertyKey.prepStmtCacheSqlLimit);
  27. this.useLocalSessionState = this.propertySet.getBooleanProperty(PropertyKey.useLocalSessionState);
  28. this.useServerPrepStmts = this.propertySet.getBooleanProperty(PropertyKey.useServerPrepStmts);
  29. this.processEscapeCodesForPrepStmts = this.propertySet.getBooleanProperty(PropertyKey.processEscapeCodesForPrepStmts);
  30. this.useLocalTransactionState = this.propertySet.getBooleanProperty(PropertyKey.useLocalTransactionState);
  31. this.disconnectOnExpiredPasswords = this.propertySet.getBooleanProperty(PropertyKey.disconnectOnExpiredPasswords);
  32. this.readOnlyPropagatesToServer = this.propertySet.getBooleanProperty(PropertyKey.readOnlyPropagatesToServer);
  33. String exceptionInterceptorClasses = this.propertySet.getStringProperty(PropertyKey.exceptionInterceptors).getStringValue();
  34. if (exceptionInterceptorClasses != null && !"".equals(exceptionInterceptorClasses)) {
  35. this.exceptionInterceptor = new ExceptionInterceptorChain(exceptionInterceptorClasses, this.props, this.session.getLog());
  36. }
  37. if (this.cachePrepStmts.getValue()) {
  38. createPreparedStatementCaches();
  39. }
  40. if (this.propertySet.getBooleanProperty(PropertyKey.cacheCallableStmts).getValue()) {
  41. this.parsedCallableStatementCache = new LRUCache<>(this.propertySet.getIntegerProperty(PropertyKey.callableStmtCacheSize).getValue());
  42. }
  43. if (this.propertySet.getBooleanProperty(PropertyKey.allowMultiQueries).getValue()) {
  44. this.propertySet.getProperty(PropertyKey.cacheResultSetMetadata).setValue(false); // we don't handle this yet
  45. }
  46. if (this.propertySet.getBooleanProperty(PropertyKey.cacheResultSetMetadata).getValue()) {
  47. this.resultSetMetadataCache = new LRUCache<>(this.propertySet.getIntegerProperty(PropertyKey.metadataCacheSize).getValue());
  48. }
  49. if (this.propertySet.getStringProperty(PropertyKey.socksProxyHost).getStringValue() != null) {
  50. this.propertySet.getProperty(PropertyKey.socketFactory).setValue(SocksProxySocketFactory.class.getName());
  51. }
  52. this.pointOfOrigin = this.useUsageAdvisor.getValue() ? LogUtils.findCallingClassAndMethod(new Throwable()) : "";
  53. this.dbmd = getMetaData(false, false);
  54. initializeSafeQueryInterceptors();
  55. } catch (CJException e1) {
  56. throw SQLExceptionsMapping.translateException(e1, getExceptionInterceptor());
  57. }
  58. try {
  59. createNewIO(false);
  60. unSafeQueryInterceptors();
  61. AbandonedConnectionCleanupThread.trackConnection(this, this.getSession().getNetworkResources());
  62. } catch (SQLException ex) {
  63. cleanup(ex);
  64. // don't clobber SQL exceptions
  65. throw ex;
  66. } catch (Exception ex) {
  67. cleanup(ex);
  68. throw SQLError
  69. .createSQLException(
  70. this.propertySet.getBooleanProperty(PropertyKey.paranoid).getValue() ? Messages.getString("Connection.0")
  71. : Messages.getString("Connection.1",
  72. new Object[] { this.session.getHostInfo().getHost(), this.session.getHostInfo().getPort() }),
  73. MysqlErrorNumbers.SQL_STATE_COMMUNICATION_LINK_FAILURE, ex, getExceptionInterceptor());
  74. }
  75. }
  1. private void connectOneTryOnly(boolean isForReconnect) throws SQLException {
  2. Exception connectionNotEstablishedBecause = null;
  3. try {
  4. JdbcConnection c = getProxy();
  5. this.session.connect(this.origHostInfo, this.user, this.password, this.database, DriverManager.getLoginTimeout() * 1000, c);
  6. // save state from old connection
  7. boolean oldAutoCommit = getAutoCommit();
  8. int oldIsolationLevel = this.isolationLevel;
  9. boolean oldReadOnly = isReadOnly(false);
  10. String oldCatalog = getCatalog();
  11. this.session.setQueryInterceptors(this.queryInterceptors);
  12. // Server properties might be different from previous connection, so initialize again...
  13. initializePropsFromServer();
  14. if (isForReconnect) {
  15. // Restore state from old connection
  16. setAutoCommit(oldAutoCommit);
  17. setTransactionIsolation(oldIsolationLevel);
  18. setCatalog(oldCatalog);
  19. setReadOnly(oldReadOnly);
  20. }
  21. return;
  22. } catch (UnableToConnectException rejEx) {
  23. close();
  24. this.session.getProtocol().getSocketConnection().forceClose();
  25. throw rejEx;
  26. } catch (Exception EEE) {
  27. if ((EEE instanceof PasswordExpiredException
  28. || EEE instanceof SQLException && ((SQLException) EEE).getErrorCode() == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD)
  29. && !this.disconnectOnExpiredPasswords.getValue()) {
  30. return;
  31. }
  32. if (this.session != null) {
  33. this.session.forceClose();
  34. }
  35. connectionNotEstablishedBecause = EEE;
  36. if (EEE instanceof SQLException) {
  37. throw (SQLException) EEE;
  38. }
  39. if (EEE.getCause() != null && EEE.getCause() instanceof SQLException) {
  40. throw (SQLException) EEE.getCause();
  41. }
  42. if (EEE instanceof CJException) {
  43. throw (CJException) EEE;
  44. }
  45. SQLException chainedEx = SQLError.createSQLException(Messages.getString("Connection.UnableToConnect"),
  46. MysqlErrorNumbers.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, getExceptionInterceptor());
  47. chainedEx.initCause(connectionNotEstablishedBecause);
  48. throw chainedEx;
  49. }
  50. }

构建statement

会将先前创建的连接,会话等对象作为成员变量

执行SQL

1、检查语句是否为空等基础校验,另外可能还会进行转义操作
2、sendQueryString发送SQL语句并接收返回结果集
3、NativeProtocol进行结果的读取

  1. public final <T extends Resultset> T sendQueryString(Query callingQuery, String query, String characterEncoding, int maxRows, boolean streamResults,
  2. String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
  3. ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
  4. String statementComment = this.queryComment;
  5. if (this.propertySet.getBooleanProperty(PropertyKey.includeThreadNamesAsStatementComment).getValue()) {
  6. statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();
  7. }
  8. // We don't know exactly how many bytes we're going to get from the query. Since we're dealing with UTF-8, the max is 4, so pad it
  9. // (4 * query) + space for headers
  10. int packLength = 1 + (query.length() * 4) + 2;
  11. byte[] commentAsBytes = null;
  12. if (statementComment != null) {
  13. commentAsBytes = StringUtils.getBytes(statementComment, characterEncoding);
  14. packLength += commentAsBytes.length;
  15. packLength += 6; // for /*[space] [space]*/
  16. }
  17. // TODO decide how to safely use the shared this.sendPacket
  18. //if (this.sendPacket == null) {
  19. NativePacketPayload sendPacket = new NativePacketPayload(packLength);
  20. //}
  21. sendPacket.setPosition(0);
  22. sendPacket.writeInteger(IntegerDataType.INT1, NativeConstants.COM_QUERY);
  23. if (commentAsBytes != null) {
  24. sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, Constants.SLASH_STAR_SPACE_AS_BYTES);
  25. sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, commentAsBytes);
  26. sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);
  27. }
  28. if (!this.platformDbCharsetMatches && StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) {
  29. sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, StringUtils.getBytes(query));
  30. } else {
  31. sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, StringUtils.getBytes(query, characterEncoding));
  32. }
  33. return sendQueryPacket(callingQuery, sendPacket, maxRows, streamResults, catalog, cachedMetadata, getProfilerEventHandlerInstanceFunction,
  34. resultSetFactory);
  35. }
public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults,
            String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
            ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
        this.statementExecutionDepth++;

        byte[] queryBuf = null;
        int oldPacketPosition = 0;
        long queryStartTime = 0;
        long queryEndTime = 0;

        queryBuf = queryPacket.getByteBuffer();
        oldPacketPosition = queryPacket.getPosition(); // save the packet position

        queryStartTime = getCurrentTimeNanosOrMillis();

        LazyString query = new LazyString(queryBuf, 1, (oldPacketPosition - 1));

        try {

            if (this.queryInterceptors != null) {
                T interceptedResults = invokeQueryInterceptorsPre(query, callingQuery, false);

                if (interceptedResults != null) {
                    return interceptedResults;
                }
            }

            if (this.autoGenerateTestcaseScript) {
                StringBuilder debugBuf = new StringBuilder(query.length() + 32);
                generateQueryCommentBlock(debugBuf);
                debugBuf.append(query);
                debugBuf.append(';');
                TestUtils.dumpTestcaseQuery(debugBuf.toString());
            }

            // Send query command and sql query string
            NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);

            long fetchBeginTime = 0;
            long fetchEndTime = 0;

            String profileQueryToLog = null;

            boolean queryWasSlow = false;

            if (this.profileSQL || this.logSlowQueries) {
                queryEndTime = getCurrentTimeNanosOrMillis();

                boolean shouldExtractQuery = false;

                if (this.profileSQL) {
                    shouldExtractQuery = true;
                } else if (this.logSlowQueries) {
                    long queryTime = queryEndTime - queryStartTime;

                    boolean logSlow = false;

                    if (!this.useAutoSlowLog) {
                        logSlow = queryTime > this.propertySet.getIntegerProperty(PropertyKey.slowQueryThresholdMillis).getValue();
                    } else {
                        logSlow = this.metricsHolder.isAbonormallyLongQuery(queryTime);
                        this.metricsHolder.reportQueryTime(queryTime);
                    }

                    if (logSlow) {
                        shouldExtractQuery = true;
                        queryWasSlow = true;
                    }
                }

                if (shouldExtractQuery) {
                    // Extract the actual query from the network packet
                    boolean truncated = false;

                    int extractPosition = oldPacketPosition;

                    if (oldPacketPosition > this.maxQuerySizeToLog.getValue()) {
                        extractPosition = this.maxQuerySizeToLog.getValue() + 1;
                        truncated = true;
                    }

                    profileQueryToLog = StringUtils.toString(queryBuf, 1, (extractPosition - 1));

                    if (truncated) {
                        profileQueryToLog += Messages.getString("Protocol.2");
                    }
                }

                fetchBeginTime = queryEndTime;
            }

            T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);

            long threadId = getServerSession().getCapabilities().getThreadId();
            int queryId = (callingQuery != null) ? callingQuery.getId() : 999;
            int resultSetId = rs.getResultId();
            long eventDuration = queryEndTime - queryStartTime;

            if (queryWasSlow && !this.serverSession.queryWasSlow() /* don't log slow queries twice */) {
                StringBuilder mesgBuf = new StringBuilder(48 + profileQueryToLog.length());

                mesgBuf.append(Messages.getString("Protocol.SlowQuery",
                        new Object[] { String.valueOf(this.useAutoSlowLog ? " 95% of all queries " : this.slowQueryThreshold), this.queryTimingUnits,
                                Long.valueOf(queryEndTime - queryStartTime) }));
                mesgBuf.append(profileQueryToLog);

                ProfilerEventHandler eventSink = getProfilerEventHandlerInstanceFunction.apply();

                eventSink.consumeEvent(
                        new ProfilerEventImpl(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, threadId, queryId, resultSetId, System.currentTimeMillis(),
                                eventDuration, this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf.toString()));

                if (this.propertySet.getBooleanProperty(PropertyKey.explainSlowQueries).getValue()) {
                    if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
                        queryPacket.setPosition(1); // skip first byte 
                        explainSlowQuery(query.toString(), profileQueryToLog);
                    } else {
                        this.log.logWarn(Messages.getString("Protocol.3", new Object[] { MAX_QUERY_SIZE_TO_EXPLAIN }));
                    }
                }
            }

            if (this.profileSQL || this.logSlowQueries) {

                ProfilerEventHandler eventSink = getProfilerEventHandlerInstanceFunction.apply();

                String eventCreationPoint = LogUtils.findCallingClassAndMethod(new Throwable());

                if (this.logSlowQueries) {
                    if (this.serverSession.noGoodIndexUsed()) {
                        eventSink.consumeEvent(
                                new ProfilerEventImpl(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, threadId, queryId, resultSetId, System.currentTimeMillis(),
                                        eventDuration, this.queryTimingUnits, null, eventCreationPoint, Messages.getString("Protocol.4") + profileQueryToLog));
                    }
                    if (this.serverSession.noIndexUsed()) {
                        eventSink.consumeEvent(
                                new ProfilerEventImpl(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, threadId, queryId, resultSetId, System.currentTimeMillis(),
                                        eventDuration, this.queryTimingUnits, null, eventCreationPoint, Messages.getString("Protocol.5") + profileQueryToLog));
                    }
                    if (this.serverSession.queryWasSlow()) {
                        eventSink.consumeEvent(new ProfilerEventImpl(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, threadId, queryId, resultSetId,
                                System.currentTimeMillis(), eventDuration, this.queryTimingUnits, null, eventCreationPoint,
                                Messages.getString("Protocol.ServerSlowQuery") + profileQueryToLog));
                    }
                }

                fetchEndTime = getCurrentTimeNanosOrMillis();

                eventSink.consumeEvent(new ProfilerEventImpl(ProfilerEvent.TYPE_QUERY, "", catalog, threadId, queryId, resultSetId, System.currentTimeMillis(),
                        eventDuration, this.queryTimingUnits, null, eventCreationPoint, profileQueryToLog));

                eventSink.consumeEvent(new ProfilerEventImpl(ProfilerEvent.TYPE_FETCH, "", catalog, threadId, queryId, resultSetId, System.currentTimeMillis(),
                        (fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, eventCreationPoint, null));
            }

            if (this.hadWarnings) {
                scanForAndThrowDataTruncation();
            }

            if (this.queryInterceptors != null) {
                T interceptedResults = invokeQueryInterceptorsPost(query, callingQuery, rs, false);

                if (interceptedResults != null) {
                    rs = interceptedResults;
                }
            }

            return rs;
        } catch (CJException sqlEx) {
            if (this.queryInterceptors != null) {
                invokeQueryInterceptorsPost(query, callingQuery, null, false); // we don't do anything with the result set in this case
            }

            if (callingQuery != null) {
                callingQuery.checkCancelTimeout();
            }

            throw sqlEx;

        } finally {
            this.statementExecutionDepth--;
        }
    }