与4.1.1不同,本次debug的项目没有使用mybatis,为了去除第三方框架带来的源码干扰,本次采用的原生JDBC操作原生ShardingSphereAPI的读写分离+分片
以下的Debug会深刻的了解为啥ShardingJDBC是嵌入式代码,其究竟如何复写的JDBC代码。

设置断点

上一篇文章阐述了配置文件的读取过程,本次开始阅读运行过程,定位ShardingReplicaQueryAPITest的selectAllTest方法,从第一行开始设置断点进行调试。
跟随断点来到Connection connection = DATASOURCE.getConnection()方法,这里是ShardingSphere的入口方法,由此开始进行debug。

获取connection

其源码如下:

  1. @Override
  2. public ShardingSphereConnection getConnection() {
  3. return new ShardingSphereConnection(getDataSourceMap(), metaDataContexts, transactionContexts, TransactionTypeHolder.get());
  4. }

getDataSourceMap就是获取数据源的K-V值,这里不再赘述,transactionContexts、TransactionTypeHolder.get()为事务部分,这里不是重点故跳过,
这里的metaDataContexts信息如下:
image.png
显然这里存储的是一些逻辑信息,即逻辑数据库、逻辑数据表。
ShardingSphereConnection实现自Connection接口,因此其拥有Connection的所有功能,这里只需为该类初始化即可:

  1. public ShardingSphereConnection(final Map<String, DataSource> dataSourceMap,
  2. final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts, final TransactionType transactionType) {
  3. this.dataSourceMap = dataSourceMap;
  4. this.metaDataContexts = metaDataContexts;
  5. this.transactionType = transactionType;
  6. shardingTransactionManager = transactionContexts.getDefaultTransactionManagerEngine().getTransactionManager(transactionType);
  7. }

获取PreparedStatement

调试来到了:reparedStatement preparedStatement = connection.prepareStatement(sql);
其调用了connection的prepareStatement方法,其源码如下:

  1. @Override
  2. public PreparedStatement prepareStatement(final String sql) throws SQLException {
  3. return new ShardingSpherePreparedStatement(this, sql);
  4. }

这里返回的是ShardingSpherePreparedStatement类的对象,显然ShardingSpherePreparedStatement也实现了PreparedStatement的接口且拥有其全部功能。其构造方法如下:

  1. private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
  2. final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
  3. if (Strings.isNullOrEmpty(sql)) {
  4. throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
  5. }
  6. this.connection = connection;
  7. metaDataContexts = connection.getMetaDataContexts();
  8. this.sql = sql;
  9. statements = new ArrayList<>();
  10. parameterSets = new ArrayList<>();
  11. ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
  12. DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType()));
  13. sqlStatement = sqlParserEngine.parse(sql, true);
  14. parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
  15. statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
  16. JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
  17. rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
  18. driverJDBCExecutor = new DriverJDBCExecutor(connection.getDataSourceMap(), metaDataContexts, jdbcExecutor);
  19. batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
  20. kernelProcessor = new KernelProcessor();
  21. }

前面的赋值操作暂且略过,重点放在ShardingSphereSQLParserEngine对象的生成上。

ShardingSphereSQLParserEngine

该类的定义为:ShardingSphere SQL parser engine. 即SQL解析引擎。
这段代码如下:
ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine( DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType()));
其中DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType()) 这一大串获得的是数据库类型:
image.png
构造方法:

  1. public ShardingSphereSQLParserEngine(final String databaseTypeName) {
  2. sqlStatementParserEngine = SQLStatementParserEngineFactory.getSQLStatementParserEngine(databaseTypeName);
  3. distSQLStatementParserEngine = new DistSQLStatementParserEngine();
  4. parsingHookRegistry = ParsingHookRegistry.getInstance();
  5. }

这里根据数据库类型获得相应的Mysql sql解析引擎、分布式数据库引擎和解析钩子的注册。

解析SQL

语句执行到了sqlStatement = sqlParserEngine.parse(sql, true);
parse的源码如下:

  1. public SQLStatement parse(final String sql, final boolean useCache) {
  2. parsingHookRegistry.start(sql);
  3. try {
  4. SQLStatement result = parse0(sql, useCache);
  5. parsingHookRegistry.finishSuccess(result);
  6. return result;
  7. // CHECKSTYLE:OFF
  8. // TODO check whether throw SQLParsingException only
  9. } catch (final Exception ex) {
  10. // CHECKSTYLE:ON
  11. parsingHookRegistry.finishFailure(ex);
  12. throw ex;
  13. }
  14. }

parse0:

  1. private SQLStatement parse0(final String sql, final boolean useCache) {
  2. try {
  3. return sqlStatementParserEngine.parse(sql, useCache);
  4. } catch (final SQLParsingException | ParseCancellationException originalEx) {
  5. try {
  6. return distSQLStatementParserEngine.parse(sql);
  7. } catch (final SQLParsingException ignored) {
  8. throw originalEx;
  9. }
  10. }
  11. }

这里调用了引擎的parse方法,这里以mysql为例:

  1. public SQLStatement parse(final String sql, final boolean useCache) {
  2. return useCache ? sqlStatementCache.getUnchecked(sql) : sqlStatementParserExecutor.parse(sql);
  3. }

这里用到了缓存,并从缓存中取出之前创建的空对象:
image.png

执行语句

经过上述的一些列初始化操作后代码来到了:
ResultSet resultSet = preparedStatement.executeQuery();
其源码如下:

  1. @Override
  2. public ResultSet executeQuery() throws SQLException {
  3. ResultSet result;
  4. try {
  5. clearPrevious();
  6. executionContext = createExecutionContext();
  7. List<QueryResult> queryResults = executeQuery0();
  8. MergedResult mergedResult = mergeQuery(queryResults);
  9. result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
  10. } finally {
  11. clearBatch();
  12. }
  13. currentResultSet = result;
  14. return result;
  15. }

这块是ShardingSphere的核心代码,稍后会在这里进行详尽分析,但目前的任务是找到路由信息如何获取。
清理完缓存后代码来到了:executionContext = createExecutionContext();

创建执行上下文

执行SQL必备的上下文信息就是在该方法中获取的,其源码如下:

  1. private ExecutionContext createExecutionContext() throws SQLException {
  2. LogicSQL logicSQL = createLogicSQL();
  3. ExecutionContext result = kernelProcessor.generateExecutionContext(logicSQL, metaDataContexts.getDefaultMetaData(), metaDataContexts.getProps());
  4. findGeneratedKey(result).ifPresent(generatedKey -> generatedValues.addAll(generatedKey.getGeneratedValues()));
  5. return result;
  6. }

创建逻辑SQL

这里的逻辑SQL包装类是LogicSQL封装的,其通过createLogicSQL方法获取:

  1. private LogicSQL createLogicSQL() {
  2. List<Object> parameters = new ArrayList<>(getParameters());
  3. ShardingSphereSchema schema = metaDataContexts.getDefaultMetaData().getSchema();
  4. SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(schema, parameters, sqlStatement);
  5. return new LogicSQL(sqlStatementContext, sql, parameters);
  6. }

先获取所有传入的参数存入到parameters中,然后获取schema,即逻辑表信息:
image.png
然后讲这些信息传入SQLStatementContext生成SQL语句的上下文信息。
最后连通上下文信息一同包装到LogicSQL中并返回。

解析路由信息

其主要通过kernelProcessor的generateExecutionContext方法来创建ExecutionContext对象,其源码如下:

  1. public ExecutionContext generateExecutionContext(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ConfigurationProperties props) throws SQLException {
  2. audit(logicSQL, metaData);
  3. RouteContext routeContext = route(logicSQL, metaData, props);
  4. SQLRewriteResult rewriteResult = rewrite(logicSQL, metaData, props, routeContext);
  5. ExecutionContext result = createExecutionContext(logicSQL, metaData, routeContext, rewriteResult);
  6. logSQL(logicSQL, props, result);
  7. return result;
  8. }

这里把上述创建的逻辑SQL、逻辑表信息、props信息一同传入。
经过审计(audit)之后第一件要做的事情就是获取路由信息,这里就是本章重点相关语句如下:
RouteContext routeContext = route(logicSQL, metaData, props);
追踪:

  1. private RouteContext route(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ConfigurationProperties props) {
  2. return new SQLRouteEngine(metaData.getRuleMetaData().getRules(), props).route(logicSQL, metaData);
  3. }

显然,其是通过SQL的路由引擎来进行路由的。逻辑如下:

  1. public RouteContext route(final LogicSQL logicSQL, final ShardingSphereMetaData metaData) {
  2. routingHook.start(logicSQL.getSql());
  3. try {
  4. SQLRouteExecutor executor = isNeedAllSchemas(logicSQL.getSqlStatementContext().getSqlStatement()) ? new AllSQLRouteExecutor() : new PartialSQLRouteExecutor(rules, props);
  5. RouteContext result = executor.route(logicSQL, metaData);
  6. routingHook.finishSuccess(result, metaData.getSchema());
  7. return result;
  8. // CHECKSTYLE:OFF
  9. } catch (final Exception ex) {
  10. // CHECKSTYLE:ON
  11. routingHook.finishFailure(ex);
  12. throw ex;
  13. }
  14. }

首先根据路由信息获取相应的执行器,然后继续追踪路由方法:

  1. public RouteContext route(final LogicSQL logicSQL, final ShardingSphereMetaData metaData) {
  2. RouteContext result = new RouteContext();
  3. for (Entry<ShardingSphereRule, SQLRouter> entry : routers.entrySet()) {
  4. if (result.getRouteUnits().isEmpty()) {
  5. result = entry.getValue().createRouteContext(logicSQL, metaData, entry.getKey(), props);
  6. } else {
  7. entry.getValue().decorateRouteContext(result, logicSQL, metaData, entry.getKey(), props);
  8. }
  9. }
  10. return result;
  11. }

封装路由条件

这里来遍历所有的路由信息然后创建路由上下文对象:

  1. public RouteContext createRouteContext(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ShardingRule rule, final ConfigurationProperties props) {
  2. RouteContext result = new RouteContext();
  3. SQLStatement sqlStatement = logicSQL.getSqlStatementContext().getSqlStatement();
  4. Optional<ShardingStatementValidator> validator = ShardingStatementValidatorFactory.newInstance(sqlStatement);
  5. validator.ifPresent(optional -> optional.preValidate(rule, logicSQL.getSqlStatementContext(), logicSQL.getParameters(), metaData.getSchema()));
  6. ShardingConditions shardingConditions = createShardingConditions(logicSQL, metaData, rule);
  7. boolean needMergeShardingValues = isNeedMergeShardingValues(logicSQL.getSqlStatementContext(), rule);
  8. if (sqlStatement instanceof DMLStatement && needMergeShardingValues) {
  9. mergeShardingConditions(shardingConditions);
  10. }
  11. ShardingRouteEngineFactory.newInstance(rule, metaData, logicSQL.getSqlStatementContext(), shardingConditions, props).route(result, rule);
  12. validator.ifPresent(v -> v.postValidate(sqlStatement, result));
  13. return result;
  14. }

具体路由调条件的逻辑位于createShardingConditions:

  1. private ShardingConditions createShardingConditions(final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ShardingRule rule) {
  2. List<ShardingCondition> shardingConditions;
  3. if (logicSQL.getSqlStatementContext().getSqlStatement() instanceof DMLStatement) {
  4. ShardingConditionEngine shardingConditionEngine = ShardingConditionEngineFactory.createShardingConditionEngine(logicSQL, metaData, rule);
  5. shardingConditions = shardingConditionEngine.createShardingConditions(logicSQL.getSqlStatementContext(), logicSQL.getParameters());
  6. } else {
  7. shardingConditions = Collections.emptyList();
  8. }
  9. return new ShardingConditions(shardingConditions);
  10. }

其先获取对应的分片引擎,其内部存放着分片有关的信息:
image.png
然后通过createShardingConditions方法来获取其余属性:

  1. public List<ShardingCondition> createShardingConditions(final SQLStatementContext<?> sqlStatementContext, final List<Object> parameters) {
  2. if (!(sqlStatementContext instanceof WhereAvailable)) {
  3. return Collections.emptyList();
  4. }
  5. List<ShardingCondition> result = new ArrayList<>();
  6. ((WhereAvailable) sqlStatementContext).getWhere().ifPresent(segment -> result.addAll(createShardingConditions(sqlStatementContext, segment.getExpr(), parameters)));
  7. Collection<WhereSegment> subqueryWhereSegments = sqlStatementContext.getSqlStatement() instanceof SelectStatement
  8. ? WhereSegmentExtractUtils.getSubqueryWhereSegments((SelectStatement) sqlStatementContext.getSqlStatement()) : Collections.emptyList();
  9. for (WhereSegment each : subqueryWhereSegments) {
  10. Collection<ShardingCondition> subqueryShardingConditions = createShardingConditions(sqlStatementContext, each.getExpr(), parameters);
  11. if (!result.containsAll(subqueryShardingConditions)) {
  12. result.addAll(subqueryShardingConditions);
  13. }
  14. }
  15. return result;
  16. }

封装后返回,

获取全部路径

获取完各种信息后代码来到:ShardingRouteEngineFactory.newInstance(rule, metaData, logicSQL.getSqlStatementContext(), shardingConditions, props).route(result, rule);
路由源码如下:

  1. public void route(final RouteContext routeContext, final ShardingRule shardingRule) {
  2. Collection<DataNode> dataNodes = getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName));
  3. routeContext.getOriginalDataNodes().addAll(originalDataNodes);
  4. for (DataNode each : dataNodes) {
  5. routeContext.getRouteUnits().add(
  6. new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
  7. }
  8. }

获取分表的规则方法如下:

  1. public TableRule getTableRule(final String logicTableName) {
  2. Optional<TableRule> tableRule = findTableRule(logicTableName);
  3. if (tableRule.isPresent()) {
  4. return tableRule.get();
  5. }
  6. if (isBroadcastTable(logicTableName)) {
  7. return new TableRule(dataSourceNames, logicTableName);
  8. }
  9. throw new ShardingSphereConfigurationException("Cannot find table rule with logic table: '%s'", logicTableName);
  10. }

然后根据分库分表规则获取数据节点:

  1. private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
  2. ShardingStrategy databaseShardingStrategy = createShardingStrategy(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule), shardingRule.getShardingAlgorithms());
  3. ShardingStrategy tableShardingStrategy = createShardingStrategy(shardingRule.getTableShardingStrategyConfiguration(tableRule), shardingRule.getShardingAlgorithms());
  4. if (isRoutingByHint(shardingRule, tableRule)) {
  5. return routeByHint(tableRule, databaseShardingStrategy, tableShardingStrategy);
  6. }
  7. if (isRoutingByShardingConditions(shardingRule, tableRule)) {
  8. return routeByShardingConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
  9. }
  10. return routeByMixedConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
  11. }

首先先获取分库分表策略:
image.png
然后进入到routeByShardingConditions方法获取全部路径:

  1. private Collection<DataNode> routeByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule,
  2. final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
  3. return shardingConditions.getConditions().isEmpty()
  4. ? route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, Collections.emptyList())
  5. : routeByShardingConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
  6. }

route0:

  1. private Collection<DataNode> route0(final TableRule tableRule,
  2. final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues,
  3. final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
  4. Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingStrategy, databaseShardingValues);
  5. Collection<DataNode> result = new LinkedList<>();
  6. for (String each : routedDataSources) {
  7. result.addAll(routeTables(tableRule, each, tableShardingStrategy, tableShardingValues));
  8. }
  9. return result;
  10. }

封装后的结果如下:
image.png
然后将路径存储到路由上下文中。
最后把这些消息放入到RouteUnit对象中:
image.png

调整路径信息

获取上下文后返回到result然后调用decorateRouteContext进行装饰:

  1. public void decorateRouteContext(final RouteContext routeContext,
  2. final LogicSQL logicSQL, final ShardingSphereMetaData metaData, final ReplicaQueryRule rule, final ConfigurationProperties props) {
  3. Collection<RouteUnit> toBeRemoved = new LinkedList<>();
  4. Collection<RouteUnit> toBeAdded = new LinkedList<>();
  5. for (RouteUnit each : routeContext.getRouteUnits()) {
  6. String dataSourceName = each.getDataSourceMapper().getLogicName();
  7. Optional<ReplicaQueryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);
  8. if (dataSourceRule.isPresent() && dataSourceRule.get().getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
  9. toBeRemoved.add(each);
  10. String actualDataSourceName = new ReplicaQueryDataSourceRouter(dataSourceRule.get()).route(logicSQL.getSqlStatementContext().getSqlStatement());
  11. toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
  12. }
  13. }
  14. routeContext.getRouteUnits().removeAll(toBeRemoved);
  15. routeContext.getRouteUnits().addAll(toBeAdded);
  16. }

这里分为两个集合,一个是需要被添加的,一个是需要被移除的。这里其实做的是容器里面的数据的筛选与调整。
到这里,路由信息就全部解析完成。下一步就是语句的改写,核心方法位于rewrite中,将在下节进行分析。