书接上文
mybatis的execute执行完成后,将要进行结果的归并,故进入到handleResultSets方法进行探寻源码。
经过Debug,定位到ShardingPreparedStatement的getResultSet,其源码如下:
public ResultSet getResultSet() throws SQLException {if (null != this.currentResultSet) {return this.currentResultSet;} else {if (this.executionContext.getSqlStatementContext() instanceof SelectStatementContext || this.executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {List<ResultSet> resultSets = this.getResultSets();MergedResult mergedResult = this.mergeQuery(this.getQueryResults(resultSets));this.currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, this.executionContext);}return this.currentResultSet;}}
源码分析
经过寻找在getResultSet中找到mergeQuery的归并关键字,以下将围绕mergeQuery进行分析,其源码如下:
private MergedResult mergeQuery(List<QueryResult> queryResults) throws SQLException {ShardingRuntimeContext runtimeContext = this.connection.getRuntimeContext();MergeEngine mergeEngine = new MergeEngine(((ShardingRule)runtimeContext.getRule()).toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());return mergeEngine.merge(queryResults, this.executionContext.getSqlStatementContext());}
追寻merge:
process:
追寻merge:
private Optional<MergedResult> merge(List<QueryResult> queryResults, SQLStatementContext sqlStatementContext) throws SQLException {Iterator var3 = this.engines.entrySet().iterator();Entry entry;do {if (!var3.hasNext()) {return Optional.empty();}entry = (Entry)var3.next();} while(!(entry.getValue() instanceof ResultMergerEngine));ResultMerger resultMerger = ((ResultMergerEngine)entry.getValue()).newInstance(this.databaseType, (BaseRule)entry.getKey(), this.properties, sqlStatementContext);return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, this.schemaMetaData));}
到这继续追寻merge方法,其有三个实现:
这就涉及到了DAL和DQL的问题,这里默认是DQL,继续探寻:
public MergedResult merge(List<QueryResult> queryResults, SQLStatementContext sqlStatementContext, SchemaMetaData schemaMetaData) throws SQLException {if (1 == queryResults.size()) {return new IteratorStreamMergedResult(queryResults);} else {Map<String, Integer> columnLabelIndexMap = this.getColumnLabelIndexMap((QueryResult)queryResults.get(0));SelectStatementContext selectStatementContext = (SelectStatementContext)sqlStatementContext;selectStatementContext.setIndexes(columnLabelIndexMap);MergedResult mergedResult = this.build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);return this.decorate(queryResults, selectStatementContext, mergedResult);}}
终于找到了它的核心方法build:
private MergedResult build(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, Map<String, Integer> columnLabelIndexMap, SchemaMetaData schemaMetaData) throws SQLException {if (this.isNeedProcessGroupBy(selectStatementContext)) {return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);} else if (this.isNeedProcessDistinctRow(selectStatementContext)) {this.setGroupByForDistinctRow(selectStatementContext);return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);} else {return (MergedResult)(this.isNeedProcessOrderBy(selectStatementContext) ? new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData) : new IteratorStreamMergedResult(queryResults));}}
流式归并
其源码如下:
public OrderByStreamMergedResult(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, SchemaMetaData schemaMetaData) throws SQLException {this.orderByItems = selectStatementContext.getOrderByContext().getItems();this.orderByValuesQueue = new PriorityQueue(queryResults.size());this.orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);this.isFirstNext = true;}
orderResultSetsToQueue方法中将结果存储到一个队列中:
private void orderResultSetsToQueue(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, SchemaMetaData schemaMetaData) throws SQLException {Iterator var4 = queryResults.iterator();while(var4.hasNext()) {QueryResult each = (QueryResult)var4.next();OrderByValue orderByValue = new OrderByValue(each, this.orderByItems, selectStatementContext, schemaMetaData);if (orderByValue.next()) {this.orderByValuesQueue.offer(orderByValue);}}this.setCurrentQueryResult(this.orderByValuesQueue.isEmpty() ? (QueryResult)queryResults.get(0) : ((OrderByValue)this.orderByValuesQueue.peek()).getQueryResult());}
内存归并
protected MemoryMergedResult(T rule, SchemaMetaData schemaMetaData, SQLStatementContext sqlStatementContext, List<QueryResult> queryResults) throws SQLException {List<MemoryQueryResultRow> memoryQueryResultRowList = this.init(rule, schemaMetaData, sqlStatementContext, queryResults);this.memoryResultSetRows = memoryQueryResultRowList.iterator();if (!memoryQueryResultRowList.isEmpty()) {this.currentResultSetRow = (MemoryQueryResultRow)memoryQueryResultRowList.get(0);}}
显然,是直接存在private final Object[] data中的。
装饰归并
装饰归并详见:
private MergedResult decorate(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, MergedResult mergedResult) throws SQLException {PaginationContext paginationContext = selectStatementContext.getPaginationContext();if (paginationContext.isHasPagination() && 1 != queryResults.size()) {String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(this.databaseType.getName()).getName();if (!"MySQL".equals(trunkDatabaseName) && !"PostgreSQL".equals(trunkDatabaseName)) {if ("Oracle".equals(trunkDatabaseName)) {return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);} else {return (MergedResult)("SQLServer".equals(trunkDatabaseName) ? new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext) : mergedResult);}} else {return new LimitDecoratorMergedResult(mergedResult, paginationContext);}} else {return mergedResult;}}

