官方对于连接数据库有两种模式一种是内存限制模式,一种是连接限制模式,即限制连接数量和不限制连接数量的两种方式,无论哪种都不需要用户进行操作ShardingSphere都会自动去选择使用。

书接上文

经过路由和改写以后,代码行来到了initPreparedStatementExecutor,其源码如下:
image.png
这里显然进行了许多初始化工作,想必会有建立连接的步骤,因此定位于此,准备进行分析。

init

init方法是初始化的第一步,其源码如下:
image.png
这里先设置了SqlStatement的上下文,然后调用了obtainExecuteGroups方法,其源码如下:

  1. private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(Collection<ExecutionUnit> executionUnits) throws SQLException {
  2. return this.getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
  3. public List<Connection> getConnections(ConnectionMode connectionMode, String dataSourceName, int connectionSize) throws SQLException {
  4. return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
  5. }
  6. public StatementExecuteUnit createStatementExecuteUnit(Connection connection, ExecutionUnit executionUnit, ConnectionMode connectionMode) throws SQLException {
  7. return new StatementExecuteUnit(executionUnit, PreparedStatementExecutor.this.createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
  8. }
  9. });
  10. }

这里创建了两个回调函数:getConnections、createStatementExecuteUnit,先不管是啥,后续有分析。
getExecuteUnitGroups方法以数据源为单位进行分组,其源码如下:

  1. public Collection<InputGroup<StatementExecuteUnit>> getExecuteUnitGroups(Collection<ExecutionUnit> executionUnits, SQLExecutePrepareCallback callback) throws SQLException {
  2. return this.getSynchronizedExecuteUnitGroups(executionUnits, callback);
  3. }

getSynchronizedExecuteUnitGroups:

  1. private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(Collection<ExecutionUnit> executionUnits, SQLExecutePrepareCallback callback) throws SQLException {
  2. Map<String, List<SQLUnit>> sqlUnitGroups = this.getSQLUnitGroups(executionUnits);
  3. Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList();
  4. Iterator var5 = sqlUnitGroups.entrySet().iterator();
  5. while(var5.hasNext()) {
  6. Entry<String, List<SQLUnit>> entry = (Entry)var5.next();
  7. result.addAll(this.getSQLExecuteGroups((String)entry.getKey(), (List)entry.getValue(), callback));
  8. }
  9. return result;
  10. }

getSQLExecuteGroups:

  1. private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(String dataSourceName, List<SQLUnit> sqlUnits, SQLExecutePrepareCallback callback) throws SQLException {
  2. List<InputGroup<StatementExecuteUnit>> result = new LinkedList();
  3. int desiredPartitionSize = Math.max(0 == sqlUnits.size() % this.maxConnectionsSizePerQuery ? sqlUnits.size() / this.maxConnectionsSizePerQuery : sqlUnits.size() / this.maxConnectionsSizePerQuery + 1, 1);
  4. List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
  5. ConnectionMode connectionMode = this.maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
  6. List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
  7. int count = 0;
  8. Iterator var10 = sqlUnitPartitions.iterator();
  9. while(var10.hasNext()) {
  10. List<SQLUnit> each = (List)var10.next();
  11. result.add(this.getSQLExecuteGroup(connectionMode, (Connection)connections.get(count++), dataSourceName, each, callback));
  12. }
  13. return result;
  14. }

创建连接

sqlUnitPartitions根据执行的sql数量和desiredPartitionSize来给sql分组,计算规则如下图所示:
获取连接等资源 - 图3分组后通过回调函数getConnections将生成的连接放入到connections中,getConnection的源码如下:

  1. public final List<Connection> getConnections(ConnectionMode connectionMode, String dataSourceName, int connectionSize) throws SQLException {
  2. DataSource dataSource = (DataSource)this.getDataSourceMap().get(dataSourceName);
  3. Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", new Object[]{dataSourceName});
  4. Collection connections;
  5. synchronized(this.cachedConnections) {
  6. connections = this.cachedConnections.get(dataSourceName);
  7. }
  8. Object result;
  9. if (connections.size() >= connectionSize) {
  10. result = (new ArrayList(connections)).subList(0, connectionSize);
  11. } else if (!connections.isEmpty()) {
  12. result = new ArrayList(connectionSize);
  13. ((List)result).addAll(connections);
  14. List<Connection> newConnections = this.createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
  15. ((List)result).addAll(newConnections);
  16. synchronized(this.cachedConnections) {
  17. this.cachedConnections.putAll(dataSourceName, newConnections);
  18. }
  19. } else {
  20. result = new ArrayList(this.createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
  21. synchronized(this.cachedConnections) {
  22. this.cachedConnections.putAll(dataSourceName, (Iterable)result);
  23. }
  24. }
  25. return (List)result;
  26. }

createConnections:

  1. private List<Connection> createConnections(String dataSourceName, ConnectionMode connectionMode, DataSource dataSource, int connectionSize) throws SQLException {
  2. if (1 == connectionSize) {
  3. Connection connection = this.createConnection(dataSourceName, dataSource);
  4. this.replayMethodsInvocation(connection);
  5. return Collections.singletonList(connection);
  6. } else if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
  7. return this.createConnections(dataSourceName, dataSource, connectionSize);
  8. } else {
  9. synchronized(dataSource) {
  10. return this.createConnections(dataSourceName, dataSource, connectionSize);
  11. }
  12. }
  13. }

存储连接

创建完连接后通过getSQLExecuteGroup方法存储并返回:

  1. private InputGroup<StatementExecuteUnit> getSQLExecuteGroup(ConnectionMode connectionMode, Connection connection, String dataSourceName, List<SQLUnit> sqlUnitGroup, SQLExecutePrepareCallback callback) throws SQLException {
  2. List<StatementExecuteUnit> result = new LinkedList();
  3. Iterator var7 = sqlUnitGroup.iterator();
  4. while(var7.hasNext()) {
  5. SQLUnit each = (SQLUnit)var7.next();
  6. result.add(callback.createStatementExecuteUnit(connection, new ExecutionUnit(dataSourceName, each), connectionMode));
  7. }
  8. return new InputGroup(result);
  9. }

createStatementExecuteUnit调用了回调方法创建执行单元。

总结

创建完连接后,在通过initPreparedStatementExecutor的其他方法进行一些必要的初始化操作,准备执行SQL语句。