归并分类:
结果归并 - 图1
归并流程:
结果归并 - 图2

书接上文

mybatis的execute执行完成后,将要进行结果的归并,故进入到handleResultSets方法进行探寻源码。
image.png
经过Debug,定位到ShardingPreparedStatement的getResultSet,其源码如下:

  1. public ResultSet getResultSet() throws SQLException {
  2. if (null != this.currentResultSet) {
  3. return this.currentResultSet;
  4. } else {
  5. if (this.executionContext.getSqlStatementContext() instanceof SelectStatementContext || this.executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
  6. List<ResultSet> resultSets = this.getResultSets();
  7. MergedResult mergedResult = this.mergeQuery(this.getQueryResults(resultSets));
  8. this.currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, this.executionContext);
  9. }
  10. return this.currentResultSet;
  11. }
  12. }

源码分析

经过寻找在getResultSet中找到mergeQuery的归并关键字,以下将围绕mergeQuery进行分析,其源码如下:

  1. private MergedResult mergeQuery(List<QueryResult> queryResults) throws SQLException {
  2. ShardingRuntimeContext runtimeContext = this.connection.getRuntimeContext();
  3. MergeEngine mergeEngine = new MergeEngine(((ShardingRule)runtimeContext.getRule()).toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
  4. return mergeEngine.merge(queryResults, this.executionContext.getSqlStatementContext());
  5. }

追寻merge:
image.png
process:
image.png
追寻merge:

  1. private Optional<MergedResult> merge(List<QueryResult> queryResults, SQLStatementContext sqlStatementContext) throws SQLException {
  2. Iterator var3 = this.engines.entrySet().iterator();
  3. Entry entry;
  4. do {
  5. if (!var3.hasNext()) {
  6. return Optional.empty();
  7. }
  8. entry = (Entry)var3.next();
  9. } while(!(entry.getValue() instanceof ResultMergerEngine));
  10. ResultMerger resultMerger = ((ResultMergerEngine)entry.getValue()).newInstance(this.databaseType, (BaseRule)entry.getKey(), this.properties, sqlStatementContext);
  11. return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, this.schemaMetaData));
  12. }

到这继续追寻merge方法,其有三个实现:
image.png
这就涉及到了DAL和DQL的问题,这里默认是DQL,继续探寻:

  1. public MergedResult merge(List<QueryResult> queryResults, SQLStatementContext sqlStatementContext, SchemaMetaData schemaMetaData) throws SQLException {
  2. if (1 == queryResults.size()) {
  3. return new IteratorStreamMergedResult(queryResults);
  4. } else {
  5. Map<String, Integer> columnLabelIndexMap = this.getColumnLabelIndexMap((QueryResult)queryResults.get(0));
  6. SelectStatementContext selectStatementContext = (SelectStatementContext)sqlStatementContext;
  7. selectStatementContext.setIndexes(columnLabelIndexMap);
  8. MergedResult mergedResult = this.build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
  9. return this.decorate(queryResults, selectStatementContext, mergedResult);
  10. }
  11. }

终于找到了它的核心方法build:

  1. private MergedResult build(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, Map<String, Integer> columnLabelIndexMap, SchemaMetaData schemaMetaData) throws SQLException {
  2. if (this.isNeedProcessGroupBy(selectStatementContext)) {
  3. return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
  4. } else if (this.isNeedProcessDistinctRow(selectStatementContext)) {
  5. this.setGroupByForDistinctRow(selectStatementContext);
  6. return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
  7. } else {
  8. return (MergedResult)(this.isNeedProcessOrderBy(selectStatementContext) ? new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData) : new IteratorStreamMergedResult(queryResults));
  9. }
  10. }

这里涉及到了流式归并和内存归并。

流式归并

其源码如下:

  1. public OrderByStreamMergedResult(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, SchemaMetaData schemaMetaData) throws SQLException {
  2. this.orderByItems = selectStatementContext.getOrderByContext().getItems();
  3. this.orderByValuesQueue = new PriorityQueue(queryResults.size());
  4. this.orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
  5. this.isFirstNext = true;
  6. }

orderResultSetsToQueue方法中将结果存储到一个队列中:

  1. private void orderResultSetsToQueue(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, SchemaMetaData schemaMetaData) throws SQLException {
  2. Iterator var4 = queryResults.iterator();
  3. while(var4.hasNext()) {
  4. QueryResult each = (QueryResult)var4.next();
  5. OrderByValue orderByValue = new OrderByValue(each, this.orderByItems, selectStatementContext, schemaMetaData);
  6. if (orderByValue.next()) {
  7. this.orderByValuesQueue.offer(orderByValue);
  8. }
  9. }
  10. this.setCurrentQueryResult(this.orderByValuesQueue.isEmpty() ? (QueryResult)queryResults.get(0) : ((OrderByValue)this.orderByValuesQueue.peek()).getQueryResult());
  11. }

内存归并

  1. protected MemoryMergedResult(T rule, SchemaMetaData schemaMetaData, SQLStatementContext sqlStatementContext, List<QueryResult> queryResults) throws SQLException {
  2. List<MemoryQueryResultRow> memoryQueryResultRowList = this.init(rule, schemaMetaData, sqlStatementContext, queryResults);
  3. this.memoryResultSetRows = memoryQueryResultRowList.iterator();
  4. if (!memoryQueryResultRowList.isEmpty()) {
  5. this.currentResultSetRow = (MemoryQueryResultRow)memoryQueryResultRowList.get(0);
  6. }
  7. }

显然,是直接存在private final Object[] data中的。

装饰归并

装饰归并详见:

  1. private MergedResult decorate(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, MergedResult mergedResult) throws SQLException {
  2. PaginationContext paginationContext = selectStatementContext.getPaginationContext();
  3. if (paginationContext.isHasPagination() && 1 != queryResults.size()) {
  4. String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(this.databaseType.getName()).getName();
  5. if (!"MySQL".equals(trunkDatabaseName) && !"PostgreSQL".equals(trunkDatabaseName)) {
  6. if ("Oracle".equals(trunkDatabaseName)) {
  7. return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
  8. } else {
  9. return (MergedResult)("SQLServer".equals(trunkDatabaseName) ? new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext) : mergedResult);
  10. }
  11. } else {
  12. return new LimitDecoratorMergedResult(mergedResult, paginationContext);
  13. }
  14. } else {
  15. return mergedResult;
  16. }
  17. }