与4.1.1不同,本次debug的项目没有使用mybatis,为了去除第三方框架带来的源码干扰,本次采用的原生JDBC操作原生ShardingSphereAPI的读写分离+分片。
以下的Debug会深刻的了解为啥ShardingJDBC是嵌入式代码,其究竟如何复写的JDBC代码。
设置断点
随着调试,断点来到了ExecutionContext result = createExecutionContext(logicSQL, metaData, routeContext, rewriteResult);
这里是为执行语句做的一些准备工作,即执行语句的上下文获取。接下来将以这里为入口进行源码分析。
创建执行上下文
追踪源码来到:
private ExecutionContext createExecutionContext(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final RouteContext routeContext, final SQLRewriteResult rewriteResult) {return new ExecutionContext(logicSQL.getSqlStatementContext(), ExecutionContextBuilder.build(metaData, rewriteResult, logicSQL.getSqlStatementContext()), routeContext);}
追随build方法:
public static Collection<ExecutionUnit> build(final ShardingSphereMetaData metaData, final SQLRewriteResult sqlRewriteResult, final SQLStatementContext<?> sqlStatementContext) {return sqlRewriteResult instanceof GenericSQLRewriteResult? build(metaData, (GenericSQLRewriteResult) sqlRewriteResult, sqlStatementContext) : build((RouteSQLRewriteResult) sqlRewriteResult);}
继续:
private static Collection<ExecutionUnit> build(final RouteSQLRewriteResult sqlRewriteResult) {Collection<ExecutionUnit> result = new LinkedHashSet<>();for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteResult.getSqlRewriteUnits().entrySet()) {result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(),new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters(), getRouteTableRouteMappers(entry.getKey().getTableMappers()))));}return result;}
这里可以看出所有的执行路径信息都被放入到ExecutionUnit的SQLUnit中。
最后被放入到了ExecutionContext中。
准备执行环境
回到执行语句:
public int executeUpdate() throws SQLException {try {clearPrevious();executionContext = createExecutionContext();if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());accumulate(executeResults);}Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();cacheStatements(executionGroups);return driverJDBCExecutor.executeUpdate(executionGroups, executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());} finally {clearBatch();}}
到这里先判断是否是原生的语句,如果不是的话就要进行分组执行:Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups() throws SQLException {int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());}
这里先获取最大的连接数,在获得执行引擎进入prepare方法准备环境:
public final Collection<ExecutionGroup<T>> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits) throws SQLException {Collection<ExecutionGroup<T>> result = new LinkedList<>();for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {String dataSourceName = entry.getKey();List<SQLUnit> sqlUnits = entry.getValue();List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits);ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;result.addAll(group(dataSourceName, sqlUnitGroups, connectionMode));}return decorate(routeContext, result);}
这里先获取各种信息后进行分组:
protected List<ExecutionGroup<T>> group(final String dataSourceName, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {List<ExecutionGroup<T>> result = new LinkedList<>();List<C> connections = executorDriverManager.getConnections(dataSourceName, sqlUnitGroups.size(), connectionMode);int count = 0;for (List<SQLUnit> each : sqlUnitGroups) {result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));}return result;}
获取连接
public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {DataSource dataSource = dataSourceMap.get(dataSourceName);Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);Collection<Connection> connections;synchronized (getCachedConnections()) {connections = getCachedConnections().get(dataSourceName);}List<Connection> result;if (connections.size() >= connectionSize) {result = new ArrayList<>(connections).subList(0, connectionSize);} else if (!connections.isEmpty()) {result = new ArrayList<>(connectionSize);result.addAll(connections);List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);result.addAll(newConnections);synchronized (getCachedConnections()) {getCachedConnections().putAll(dataSourceName, newConnections);}} else {result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));synchronized (getCachedConnections()) {getCachedConnections().putAll(dataSourceName, result);}}return result;}
创建分组
程序来到了createExecutionGroup:
private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {List<T> result = new LinkedList<>();for (SQLUnit each : sqlUnits) {result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), executorDriverManager, connection, connectionMode, option));}return new ExecutionGroup<>(result);}
build:
@Overridepublic JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCManager executorManager,final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {PreparedStatement preparedStatement = createPreparedStatement(executionUnit.getSqlUnit().getSql(), executionUnit.getSqlUnit().getParameters(), executorManager, connection, connectionMode, option);return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);}
这里其实获取的是真实的PrepareStatement放入到JDBCExecutionUnit中。
这里将所有环境处理好后返回结果如下:
创建缓存
通过cacheStatements方法将属性信息、参数信息存储起来,这里也对应着每次使用时的清理缓存。
private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) {for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {statements.addAll(each.getInputs().stream().map(jdbcExecutionUnit -> (PreparedStatement) jdbcExecutionUnit.getStorageResource()).collect(Collectors.toList()));parameterSets.addAll(each.getInputs().stream().map(input -> input.getExecutionUnit().getSqlUnit().getParameters()).collect(Collectors.toList()));}replay();}
