与4.1.1不同,本次debug的项目没有使用mybatis,为了去除第三方框架带来的源码干扰,本次采用的原生JDBC操作原生ShardingSphereAPI的读写分离+分片
以下的Debug会深刻的了解为啥ShardingJDBC是嵌入式代码,其究竟如何复写的JDBC代码。


设置断点

随着调试,断点来到了ExecutionContext result = createExecutionContext(logicSQL, metaData, routeContext, rewriteResult);
这里是为执行语句做的一些准备工作,即执行语句的上下文获取。接下来将以这里为入口进行源码分析。

创建执行上下文

追踪源码来到:

  1. private ExecutionContext createExecutionContext(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final RouteContext routeContext, final SQLRewriteResult rewriteResult) {
  2. return new ExecutionContext(logicSQL.getSqlStatementContext(), ExecutionContextBuilder.build(metaData, rewriteResult, logicSQL.getSqlStatementContext()), routeContext);
  3. }

追随build方法:

  1. public static Collection<ExecutionUnit> build(final ShardingSphereMetaData metaData, final SQLRewriteResult sqlRewriteResult, final SQLStatementContext<?> sqlStatementContext) {
  2. return sqlRewriteResult instanceof GenericSQLRewriteResult
  3. ? build(metaData, (GenericSQLRewriteResult) sqlRewriteResult, sqlStatementContext) : build((RouteSQLRewriteResult) sqlRewriteResult);
  4. }

继续:

  1. private static Collection<ExecutionUnit> build(final RouteSQLRewriteResult sqlRewriteResult) {
  2. Collection<ExecutionUnit> result = new LinkedHashSet<>();
  3. for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteResult.getSqlRewriteUnits().entrySet()) {
  4. result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(),
  5. new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters(), getRouteTableRouteMappers(entry.getKey().getTableMappers()))));
  6. }
  7. return result;
  8. }

这里可以看出所有的执行路径信息都被放入到ExecutionUnit的SQLUnit中。
image.png
最后被放入到了ExecutionContext中。
image.png

准备执行环境

回到执行语句:

  1. public int executeUpdate() throws SQLException {
  2. try {
  3. clearPrevious();
  4. executionContext = createExecutionContext();
  5. if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
  6. Collection<ExecuteResult> executeResults = rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback());
  7. accumulate(executeResults);
  8. }
  9. Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
  10. cacheStatements(executionGroups);
  11. return driverJDBCExecutor.executeUpdate(
  12. executionGroups, executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
  13. } finally {
  14. clearBatch();
  15. }
  16. }

到这里先判断是否是原生的语句,如果不是的话就要进行分组执行:
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();

  1. private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups() throws SQLException {
  2. int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
  3. DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
  4. JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
  5. return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
  6. }

这里先获取最大的连接数,在获得执行引擎进入prepare方法准备环境:

  1. public final Collection<ExecutionGroup<T>> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits) throws SQLException {
  2. Collection<ExecutionGroup<T>> result = new LinkedList<>();
  3. for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {
  4. String dataSourceName = entry.getKey();
  5. List<SQLUnit> sqlUnits = entry.getValue();
  6. List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits);
  7. ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
  8. result.addAll(group(dataSourceName, sqlUnitGroups, connectionMode));
  9. }
  10. return decorate(routeContext, result);
  11. }

这里先获取各种信息后进行分组:
image.png

  1. protected List<ExecutionGroup<T>> group(final String dataSourceName, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {
  2. List<ExecutionGroup<T>> result = new LinkedList<>();
  3. List<C> connections = executorDriverManager.getConnections(dataSourceName, sqlUnitGroups.size(), connectionMode);
  4. int count = 0;
  5. for (List<SQLUnit> each : sqlUnitGroups) {
  6. result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));
  7. }
  8. return result;
  9. }

获取连接

  1. public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
  2. DataSource dataSource = dataSourceMap.get(dataSourceName);
  3. Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
  4. Collection<Connection> connections;
  5. synchronized (getCachedConnections()) {
  6. connections = getCachedConnections().get(dataSourceName);
  7. }
  8. List<Connection> result;
  9. if (connections.size() >= connectionSize) {
  10. result = new ArrayList<>(connections).subList(0, connectionSize);
  11. } else if (!connections.isEmpty()) {
  12. result = new ArrayList<>(connectionSize);
  13. result.addAll(connections);
  14. List<Connection> newConnections = createConnections(dataSourceName, dataSource, connectionSize - connections.size(), connectionMode);
  15. result.addAll(newConnections);
  16. synchronized (getCachedConnections()) {
  17. getCachedConnections().putAll(dataSourceName, newConnections);
  18. }
  19. } else {
  20. result = new ArrayList<>(createConnections(dataSourceName, dataSource, connectionSize, connectionMode));
  21. synchronized (getCachedConnections()) {
  22. getCachedConnections().putAll(dataSourceName, result);
  23. }
  24. }
  25. return result;
  26. }

这里其实是从数据源中筛选出相对应的数据源然后获取其连接。

创建分组

程序来到了createExecutionGroup:

  1. private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {
  2. List<T> result = new LinkedList<>();
  3. for (SQLUnit each : sqlUnits) {
  4. result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), executorDriverManager, connection, connectionMode, option));
  5. }
  6. return new ExecutionGroup<>(result);
  7. }

build:

  1. @Override
  2. public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCManager executorManager,
  3. final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
  4. PreparedStatement preparedStatement = createPreparedStatement(
  5. executionUnit.getSqlUnit().getSql(), executionUnit.getSqlUnit().getParameters(), executorManager, connection, connectionMode, option);
  6. return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);
  7. }

这里其实获取的是真实的PrepareStatement放入到JDBCExecutionUnit中。
这里将所有环境处理好后返回结果如下:
image.png

创建缓存

通过cacheStatements方法将属性信息、参数信息存储起来,这里也对应着每次使用时的清理缓存。

  1. private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) {
  2. for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
  3. statements.addAll(each.getInputs().stream().map(jdbcExecutionUnit -> (PreparedStatement) jdbcExecutionUnit.getStorageResource()).collect(Collectors.toList()));
  4. parameterSets.addAll(each.getInputs().stream().map(input -> input.getExecutionUnit().getSqlUnit().getParameters()).collect(Collectors.toList()));
  5. }
  6. replay();
  7. }