author:彭程


介绍

dble进行SQL解析之后,会进行判断对复杂SQL进一步处理,本文主要介绍复杂SQL的处理过程。
8b8d156d-c2f6-3ff1-bcad-8582399506d6.png
dble对复杂SQL的处理主要包括以下几步:

  1. 构造访问器(Visitor);
  2. 对查询计划进行优化
  3. 执行多结果集

代码入口

在MySQLShardingSQLHandler#routeEndExecuteSQL方法中调用druid能得到RouteResultset(rrs),之后会调用NonBlockingSession#execute进一步进行处理

  1. public void routeEndExecuteSQL(String sql, int type, SchemaConfig schemaConfig) {
  2. TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "route&execute");
  3. try {
  4. if (service.getSession2().isKilled()) { //检查会话状态
  5. service.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");
  6. return;
  7. }
  8. RouteResultset rrs;
  9. try {
  10. rrs = RouteService.getInstance().route(schemaConfig, type, sql, service); //这里调用了druid
  11. if (rrs == null) {
  12. return;
  13. }
  14. if (rrs.getSqlType() == ServerParse.DDL && rrs.getSchema() != null) {
  15. if (ProxyMeta.getInstance().getTmManager().getCatalogs().get(rrs.getSchema()).getView(rrs.getTable()) != null) {
  16. ProxyMeta.getInstance().getTmManager().removeMetaLock(rrs.getSchema(), rrs.getTable());
  17. String msg = "Table '" + rrs.getTable() + "' already exists as a view";
  18. LOGGER.info(msg);
  19. throw new SQLNonTransientException(msg);
  20. }
  21. }
  22. } catch (Exception e) {
  23. service.executeException(e, sql);
  24. return;
  25. }
  26. service.getSession2().endRoute(rrs);
  27. service.getSession2().execute(rrs); //对rrs进一步处理
  28. } finally {
  29. TraceManager.finishSpan(traceObject);
  30. }
  31. }

NonBlockingSession#execute方法会根据rrs#nodes的状态对SQL类型进行判断,如果nodes为null则是进行复杂SQL处理,调用NonBlockingSession#executeMultiSelect方法

  1. public void execute(RouteResultset rrs) {
  2. TraceManager.TraceObject traceObject = TraceManager.serviceTrace(shardingService, "execute-sql-for-sharding");
  3. TraceManager.log(ImmutableMap.of("route-result-set", rrs), traceObject);
  4. try {
  5. if (killed) {
  6. shardingService.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");
  7. return;
  8. }
  9. if (LOGGER.isDebugEnabled()) {
  10. StringBuilder s = new StringBuilder();
  11. LOGGER.debug(s.append(shardingService).append(rrs).toString() + " rrs ");
  12. }
  13. if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&
  14. !PauseShardingNodeManager.getInstance().checkTarget(target) &&
  15. PauseShardingNodeManager.getInstance().checkRRS(rrs)) {
  16. if (PauseShardingNodeManager.getInstance().waitForResume(rrs, shardingService, CONTINUE_TYPE_SINGLE)) {
  17. return;
  18. }
  19. }
  20. // complex query 复杂查询
  21. RouteResultsetNode[] nodes = rrs.getNodes();
  22. if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) {
  23. if (rrs.isNeedOptimizer()) {
  24. try {
  25. this.complexRrs = rrs;
  26. executeMultiSelect(rrs); //复杂SQL处理
  27. } catch (MySQLOutPutException e) {
  28. LOGGER.warn("execute complex sql cause error", e);
  29. shardingService.writeErrMessage(e.getSqlState(), e.getMessage(), e.getErrorCode());
  30. }
  31. } else {
  32. shardingService.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
  33. "No shardingNode found ,please check tables defined in schema:" + shardingService.getSchema());
  34. }
  35. return;
  36. }
  37. setRouteResultToTrace(nodes);
  38. if (rrs.getDdlHandler() != null) {
  39. executeDDL(rrs); //执行DDL语句
  40. } else {
  41. // dml or simple select 简单查询
  42. executeOther(rrs);
  43. }
  44. } finally {
  45. TraceManager.finishSpan(shardingService, traceObject);
  46. }
  47. }

NonBlockingSession#executeMultiSelect方法实现了复杂SQL处理的主要逻辑

  1. public void executeMultiSelect(RouteResultset rrs) {
  2. TraceManager.TraceObject traceObject = TraceManager.serviceTrace(shardingService, "try-complex-query");
  3. try {
  4. SQLSelectStatement ast = (SQLSelectStatement) rrs.getSqlStatement();
  5. MySQLPlanNodeVisitor visitor = new MySQLPlanNodeVisitor(shardingService.getSchema(), shardingService.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, shardingService.getUsrVariables());
  6. visitor.visit(ast); //构造访问器
  7. PlanNode node = visitor.getTableNode();
  8. if (node.isCorrelatedSubQuery()) {
  9. throw new MySQLOutPutException(ErrorCode.ER_UNKNOWN_ERROR, "", "Correlated Sub Queries is not supported ");
  10. }
  11. node.setSql(rrs.getStatement());
  12. node.setUpFields();
  13. PlanUtil.checkTablesPrivilege(shardingService, node, ast);
  14. node = MyOptimizer.optimize(node); //对查询计划进行优化
  15. if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&
  16. !PauseShardingNodeManager.getInstance().checkTarget(target) &&
  17. PauseShardingNodeManager.getInstance().checkReferredTableNodes(node.getReferedTableNodes())) {
  18. if (PauseShardingNodeManager.getInstance().waitForResume(rrs, this.shardingService, CONTINUE_TYPE_MULTIPLE)) {
  19. return;
  20. }
  21. }
  22. setPreExecuteEnd(TraceResult.SqlTraceType.COMPLEX_QUERY);
  23. if (PlanUtil.containsSubQuery(node)) {
  24. setSubQuery();
  25. final PlanNode finalNode = node;
  26. //sub Query build will be blocked, so use ComplexQueryExecutor
  27. DbleServer.getInstance().getComplexQueryExecutor().execute(() -> {
  28. executeMultiResultSet(rrs, finalNode);
  29. });
  30. } else {
  31. if (!visitor.isContainSchema()) {
  32. node.setAst(ast);
  33. }
  34. executeMultiResultSet(rrs, node); //执行多结果集
  35. }
  36. } finally {
  37. TraceManager.finishSpan(shardingService, traceObject);
  38. }
  39. }