author:彭程
介绍
dble进行SQL解析之后,会进行判断对复杂SQL进一步处理,本文主要介绍复杂SQL的处理过程。
dble对复杂SQL的处理主要包括以下几步:
- 构造访问器(Visitor);
- 对查询计划进行优化
- 执行多结果集
代码入口
在MySQLShardingSQLHandler#routeEndExecuteSQL方法中调用druid能得到RouteResultset(rrs),之后会调用NonBlockingSession#execute进一步进行处理
public void routeEndExecuteSQL(String sql, int type, SchemaConfig schemaConfig) {TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "route&execute");try {if (service.getSession2().isKilled()) { //检查会话状态service.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");return;}RouteResultset rrs;try {rrs = RouteService.getInstance().route(schemaConfig, type, sql, service); //这里调用了druidif (rrs == null) {return;}if (rrs.getSqlType() == ServerParse.DDL && rrs.getSchema() != null) {if (ProxyMeta.getInstance().getTmManager().getCatalogs().get(rrs.getSchema()).getView(rrs.getTable()) != null) {ProxyMeta.getInstance().getTmManager().removeMetaLock(rrs.getSchema(), rrs.getTable());String msg = "Table '" + rrs.getTable() + "' already exists as a view";LOGGER.info(msg);throw new SQLNonTransientException(msg);}}} catch (Exception e) {service.executeException(e, sql);return;}service.getSession2().endRoute(rrs);service.getSession2().execute(rrs); //对rrs进一步处理} finally {TraceManager.finishSpan(traceObject);}}
NonBlockingSession#execute方法会根据rrs#nodes的状态对SQL类型进行判断,如果nodes为null则是进行复杂SQL处理,调用NonBlockingSession#executeMultiSelect方法
public void execute(RouteResultset rrs) {TraceManager.TraceObject traceObject = TraceManager.serviceTrace(shardingService, "execute-sql-for-sharding");TraceManager.log(ImmutableMap.of("route-result-set", rrs), traceObject);try {if (killed) {shardingService.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");return;}if (LOGGER.isDebugEnabled()) {StringBuilder s = new StringBuilder();LOGGER.debug(s.append(shardingService).append(rrs).toString() + " rrs ");}if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&!PauseShardingNodeManager.getInstance().checkTarget(target) &&PauseShardingNodeManager.getInstance().checkRRS(rrs)) {if (PauseShardingNodeManager.getInstance().waitForResume(rrs, shardingService, CONTINUE_TYPE_SINGLE)) {return;}}// complex query 复杂查询RouteResultsetNode[] nodes = rrs.getNodes();if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) {if (rrs.isNeedOptimizer()) {try {this.complexRrs = rrs;executeMultiSelect(rrs); //复杂SQL处理} catch (MySQLOutPutException e) {LOGGER.warn("execute complex sql cause error", e);shardingService.writeErrMessage(e.getSqlState(), e.getMessage(), e.getErrorCode());}} else {shardingService.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,"No shardingNode found ,please check tables defined in schema:" + shardingService.getSchema());}return;}setRouteResultToTrace(nodes);if (rrs.getDdlHandler() != null) {executeDDL(rrs); //执行DDL语句} else {// dml or simple select 简单查询executeOther(rrs);}} finally {TraceManager.finishSpan(shardingService, traceObject);}}
NonBlockingSession#executeMultiSelect方法实现了复杂SQL处理的主要逻辑
public void executeMultiSelect(RouteResultset rrs) {TraceManager.TraceObject traceObject = TraceManager.serviceTrace(shardingService, "try-complex-query");try {SQLSelectStatement ast = (SQLSelectStatement) rrs.getSqlStatement();MySQLPlanNodeVisitor visitor = new MySQLPlanNodeVisitor(shardingService.getSchema(), shardingService.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, shardingService.getUsrVariables());visitor.visit(ast); //构造访问器PlanNode node = visitor.getTableNode();if (node.isCorrelatedSubQuery()) {throw new MySQLOutPutException(ErrorCode.ER_UNKNOWN_ERROR, "", "Correlated Sub Queries is not supported ");}node.setSql(rrs.getStatement());node.setUpFields();PlanUtil.checkTablesPrivilege(shardingService, node, ast);node = MyOptimizer.optimize(node); //对查询计划进行优化if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&!PauseShardingNodeManager.getInstance().checkTarget(target) &&PauseShardingNodeManager.getInstance().checkReferredTableNodes(node.getReferedTableNodes())) {if (PauseShardingNodeManager.getInstance().waitForResume(rrs, this.shardingService, CONTINUE_TYPE_MULTIPLE)) {return;}}setPreExecuteEnd(TraceResult.SqlTraceType.COMPLEX_QUERY);if (PlanUtil.containsSubQuery(node)) {setSubQuery();final PlanNode finalNode = node;//sub Query build will be blocked, so use ComplexQueryExecutorDbleServer.getInstance().getComplexQueryExecutor().execute(() -> {executeMultiResultSet(rrs, finalNode);});} else {if (!visitor.isContainSchema()) {node.setAst(ast);}executeMultiResultSet(rrs, node); //执行多结果集}} finally {TraceManager.finishSpan(shardingService, traceObject);}}
