分组执行将准备执行阶段生成的执行单元分组下发至底层并发执行引擎,并针对执行过程中的每个关键步骤发送事件。
SQL语句的执行 - 图1

书接上文

image.png
结束完initPreparedStatementExecutor方法就该执行execute方法了,即真正的执行SQL语句,本节也将主要围绕该方法进行展开。

源码分析

execute源码如下:

  1. public boolean execute() throws SQLException {
  2. boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
  3. SQLExecuteCallback<Boolean> executeCallback = SQLExecuteCallbackFactory.getPreparedSQLExecuteCallback(this.getDatabaseType(), isExceptionThrown);
  4. List<Boolean> result = this.executeCallback(executeCallback);
  5. return null != result && !result.isEmpty() && null != result.get(0) ? (Boolean)result.get(0) : false;
  6. }

这块的逻辑即SQLExecuteCallback内部存放真正的SQL,executeCallback来执行并获得回调。
executeCallback:

  1. protected final <T> List<T> executeCallback(SQLExecuteCallback<T> executeCallback) throws SQLException {
  2. List<T> result = this.sqlExecuteTemplate.execute(this.inputGroups, executeCallback);
  3. this.refreshMetaDataIfNeeded(this.connection.getRuntimeContext(), this.sqlStatementContext);
  4. return result;
  5. }

追踪execute:

  1. public <I, O> List<O> execute(Collection<InputGroup<I>> inputGroups, GroupedCallback<I, O> firstCallback, GroupedCallback<I, O> callback, boolean serial) throws SQLException {
  2. if (inputGroups.isEmpty()) {
  3. return Collections.emptyList();
  4. } else {
  5. return serial ? this.serialExecute(inputGroups, firstCallback, callback) : this.parallelExecute(inputGroups, firstCallback, callback);
  6. }
  7. }

这里把执行分为了并发执行和串行执行,这里以parallelExecute为例进行分析。

parallelExecute

  1. private <I, O> List<O> parallelExecute(Collection<InputGroup<I>> inputGroups, GroupedCallback<I, O> firstCallback, GroupedCallback<I, O> callback) throws SQLException {
  2. Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
  3. InputGroup<I> firstInputs = (InputGroup)inputGroupsIterator.next();
  4. Collection<ListenableFuture<Collection<O>>> restResultFutures = this.asyncExecute((List)Lists.newArrayList(inputGroupsIterator), callback);
  5. return this.getGroupResults(this.syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
  6. }

inputGroupsIterator是准备阶段的结果,这段代码里,第一个同步执行,剩下的都异步执行。

asyncExecute

异步执行的源码如下:

  1. private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(List<InputGroup<I>> inputGroups, GroupedCallback<I, O> callback) {
  2. Collection<ListenableFuture<Collection<O>>> result = new LinkedList();
  3. Iterator var4 = inputGroups.iterator();
  4. while(var4.hasNext()) {
  5. InputGroup<I> each = (InputGroup)var4.next();
  6. result.add(this.asyncExecute(each, callback));
  7. }
  8. return result;
  9. }

继续:

  1. private <I, O> ListenableFuture<Collection<O>> asyncExecute(InputGroup<I> inputGroup, GroupedCallback<I, O> callback) {
  2. Map<String, Object> dataMap = ExecutorDataMap.getValue();
  3. return this.executorService.getExecutorService().submit(() -> {
  4. return callback.execute(inputGroup.getInputs(), false, dataMap);
  5. });
  6. }

继续:

  1. public final Collection<T> execute(Collection<StatementExecuteUnit> statementExecuteUnits, boolean isTrunkThread, Map<String, Object> dataMap) throws SQLException {
  2. Collection<T> result = new LinkedList();
  3. Iterator var5 = statementExecuteUnits.iterator();
  4. while(var5.hasNext()) {
  5. StatementExecuteUnit each = (StatementExecuteUnit)var5.next();
  6. result.add(this.execute0(each, isTrunkThread, dataMap));
  7. }
  8. return result;
  9. }

继续:

  1. private T execute0(StatementExecuteUnit statementExecuteUnit, boolean isTrunkThread, Map<String, Object> dataMap) throws SQLException {
  2. ExecutorExceptionHandler.setExceptionThrown(this.isExceptionThrown);
  3. DataSourceMetaData dataSourceMetaData = this.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData());
  4. SPISQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
  5. try {
  6. ExecutionUnit executionUnit = statementExecuteUnit.getExecutionUnit();
  7. sqlExecutionHook.start(executionUnit.getDataSourceName(), executionUnit.getSqlUnit().getSql(), executionUnit.getSqlUnit().getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
  8. T result = this.executeSQL(executionUnit.getSqlUnit().getSql(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
  9. sqlExecutionHook.finishSuccess();
  10. return result;
  11. } catch (SQLException var8) {
  12. sqlExecutionHook.finishFailure(var8);
  13. ExecutorExceptionHandler.handleException(var8);
  14. return null;
  15. }
  16. }

getGroupResults

异步执行完以后通过getGroupResults来获取结果,其源码如下:

  1. private <O> List<O> getGroupResults(Collection<O> firstResults, Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
  2. List<O> result = new LinkedList(firstResults);
  3. Iterator var4 = restFutures.iterator();
  4. while(var4.hasNext()) {
  5. ListenableFuture each = (ListenableFuture)var4.next();
  6. try {
  7. result.addAll((Collection)each.get());
  8. } catch (ExecutionException | InterruptedException var7) {
  9. return this.throwException(var7);
  10. }
  11. }
  12. return result;
  13. }

回到Mybatis的query方法中:

  1. public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
  2. PreparedStatement ps = (PreparedStatement)statement;
  3. ps.execute();
  4. return this.resultSetHandler.handleResultSets(ps);
  5. }

准备获取结果,即归并的过程。