1.内存初始化

初始化代码在LocalMemoryManager中,启动时将内存分为2个内存池,分别是:

  • RESERVED_POOL:预留内存池,用于执行最耗费内存资源的查询。
  • GENERAL_POOL:普通内存池,用于执行除最耗费内存查询以外的查询。

image.png
LocalMemoryManager的configureMemoryPools方法实现内存的分配
1.首先扣除heapHeadroom的大小,占用堆空间大小的30%,剩下的内存留给GENERAL_POOL(普通内存池)和RESERVED_POOL(预留内存池)
2.判断是否启动预留内存池(默认是开启),分配的堆内存大小占堆空间大小的30%,主要用于query比较占用内存使用
3.剩下的内存分配给普通内存池

  1. private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
  2. {
  3. validateHeapHeadroom(config, availableMemory);
  4. // 扣除系统 jvm(xmX)的0.3 表示堆大小
  5. maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
  6. checkArgument(
  7. config.getMaxQueryMemoryPerNode().toBytes() <= config.getMaxQueryTotalMemoryPerNode().toBytes(),
  8. "Max query memory per node (%s) cannot be greater than the max query total memory per node (%s).",
  9. QUERY_MAX_MEMORY_PER_NODE_CONFIG,
  10. QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG);
  11. // 缓存不同内存类型 与 内存池
  12. ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();
  13. // 普通内存池
  14. long generalPoolSize = maxMemory.toBytes();
  15. // 是否开启 预留内存池,用于执行最耗费内存资源的查询
  16. if (config.isReservedPoolEnabled()) {
  17. // AVAILABLE_HEAP_MEMORY * 0.3 每个query最大内存限制条件
  18. builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryTotalMemoryPerNode()));
  19. // 重新计普通内存大小
  20. generalPoolSize -= config.getMaxQueryTotalMemoryPerNode().toBytes();
  21. }
  22. verify(generalPoolSize > 0, "general memory pool size is 0");
  23. // 剩余的普通内存池的大小
  24. builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, new DataSize(generalPoolSize, BYTE)));
  25. this.pools = builder.build();
  26. }

2.定时调整内存池分配策略

SqlQueryManager中的start方法是在创建该实例之后就开始调用,其中enforceMemoryLimits()方法用于内存调整

  1. @PostConstruct // 实现类执行构造函数之后调用该方法
  2. public void start()
  3. {
  4. queryTracker.start();
  5. // 定时调度
  6. queryManagementExecutor.scheduleWithFixedDelay(() -> {
  7. try {
  8. killBlockingQuery();
  9. // 强制内存调整
  10. enforceMemoryLimits();
  11. }
  12. catch (Throwable e) {
  13. log.error(e, "Error enforcing memory limits");
  14. }
  15. try {
  16. enforceCpuLimits();
  17. }
  18. catch (Throwable e) {
  19. log.error(e, "Error enforcing query CPU time limits");
  20. }
  21. }, 1, 1, TimeUnit.SECONDS); // 每一秒触发一次执行
  22. }

进一步细分enforceMemoryLimits方法实现细节

  1. /**
  2. * Enforce memory limits at the query level 在查询级别强制执行内存限制
  3. */
  4. private void enforceMemoryLimits()
  5. {
  6. List<QueryExecution> runningQueries;
  7. Supplier<List<BasicQueryInfo>> allQueryInfoSupplier;
  8. Map<String, SharedQueryState> queryStates = StateCacheStore.get().getCachedStates(StateStoreConstants.QUERY_STATE_COLLECTION_NAME);
  9. if (isMultiCoordinatorEnabled() && queryStates != null) {
  10. // Get all queries from state store
  11. runningQueries = queryStates.values().stream()
  12. .filter(state -> state.getBasicQueryInfo().getState() == RUNNING)
  13. .map(state -> new SharedQueryExecution(state, sessionPropertyManager)).collect(Collectors.toList());
  14. allQueryInfoSupplier = () -> queryStates.values().stream().map(state -> state.getBasicQueryInfo()).collect(Collectors.toList());
  15. }
  16. else {
  17. runningQueries = queryTracker.getAllQueries().stream()
  18. .filter(query -> query.getState() == RUNNING)
  19. .collect(toImmutableList());
  20. allQueryInfoSupplier = this::getQueries;
  21. }
  22. // 核心处理源码
  23. memoryManager.process(runningQueries, allQueryInfoSupplier);
  24. // Put the chosen query to kill into state store
  25. // 将所选查询杀死以进入状态存储
  26. if (isMultiCoordinatorEnabled() && queryStates != null) {
  27. updateKillingQuery(runningQueries, queryStates);
  28. }
  29. }

查询一下核心处理代码process

  1. public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier)
  2. {
  3. // 判断是否启动集群
  4. if (!enabled) {
  5. return;
  6. }
  7. //make sure state store is loaded when multiple coordinator is enabled
  8. // 确保在启用多个协调器时加载状态存储
  9. if (!hetuConfig.isMultipleCoordinatorEnabled()
  10. || (hetuConfig.isMultipleCoordinatorEnabled()
  11. && stateStoreProvider.getStateStore() != null)) {
  12. Lock lock = null;
  13. boolean locked = false;
  14. try {
  15. if (hetuConfig.isMultipleCoordinatorEnabled()) {
  16. lock = stateStoreProvider.getStateStore().getLock(StateStoreConstants.HANDLE_OOM_QUERY_LOCK_NAME);
  17. }
  18. else {
  19. lock = new ReentrantLock();
  20. }
  21. // 加锁
  22. locked = lock.tryLock(StateStoreConstants.DEFAULT_ACQUIRED_LOCK_TIME_MS, TimeUnit.MILLISECONDS);
  23. if (locked) {
  24. // TODO revocable memory reservations can also leak and may need to be detected in the future
  25. // 可撤销的内存预留也可能会泄漏,将来可能需要检测到
  26. // We are only concerned about the leaks in general pool.
  27. // 我们只关心普通池中的泄漏
  28. // 检查内存泄漏,并保存起来
  29. memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pools.get(GENERAL_POOL).getQueryMemoryReservations());
  30. // 判断集群是否出现内存泄漏
  31. /**
  32. * 以下两种情况出现:
  33. * 1.当预留内存池空间为空, 普通内存池出现堵塞的节点,则表示集群出现OOM
  34. * 2.当预留内存池不为空,并且普通内存池出现堵塞节点,则表示集群出现OOM
  35. */
  36. boolean outOfMemory = isClusterOutOfMemory();
  37. if (!outOfMemory) {
  38. lastTimeNotOutOfMemory = System.nanoTime(); // 最后一次检查的时间
  39. }
  40. boolean queryKilled = false;
  41. // 所有用户内存字节数
  42. long totalUserMemoryBytes = 0L;
  43. // 所有字节数
  44. long totalMemoryBytes = 0L;
  45. // 遍历正在运行的查询语句
  46. for (QueryExecution query : runningQueries) {
  47. boolean resourceOvercommit = resourceOvercommit(query.getSession());
  48. // 用户剩余的内存大小
  49. long userMemoryReservation = query.getUserMemoryReservation().toBytes();
  50. // 所有剩余内存的大小
  51. long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
  52. // 当出现资源超支,并且 出现OOM, 出现执行kill 一些query
  53. if (resourceOvercommit && outOfMemory) {
  54. // If a query has requested resource overcommit, only kill it if the cluster has run out of memory
  55. // 如果查询请求了资源过量使用,则只有在群集内存不足时才将其杀死
  56. DataSize memory = succinctBytes(getQueryMemoryReservation(query));
  57. //
  58. query.fail(new PrestoException(CLUSTER_OUT_OF_MEMORY,
  59. format("The cluster is out of memory and %s=true, so this query was killed. It was using %s of memory", RESOURCE_OVERCOMMIT, memory)));
  60. queryKilled = true;
  61. }
  62. // 请求资料没有过量时
  63. if (!resourceOvercommit) {
  64. // 获取用户内存使用最小值
  65. long userMemoryLimit = min(maxQueryMemory.toBytes(), getQueryMaxMemory(query.getSession()).toBytes());
  66. // 用户请求内存大于限制条件时,则请求失败,并标记为可以kill
  67. if (userMemoryReservation > userMemoryLimit) {
  68. query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));
  69. queryKilled = true;
  70. }
  71. // 请求所有内存限制值
  72. long totalMemoryLimit = min(maxQueryTotalMemory.toBytes(), getQueryMaxTotalMemory(query.getSession()).toBytes());
  73. // 大于所有内存最大小值时,则标记kill
  74. if (totalMemoryReservation > totalMemoryLimit) {
  75. query.fail(exceededGlobalTotalLimit(succinctBytes(totalMemoryLimit)));
  76. queryKilled = true;
  77. }
  78. }
  79. // 并累加之前的内存值, 统计所有的用户使用的内存大小
  80. // 所有内存的总大小
  81. totalUserMemoryBytes += userMemoryReservation;
  82. totalMemoryBytes += totalMemoryReservation;
  83. }
  84. // 再次设置需要所有内存值
  85. clusterUserMemoryReservation.set(totalUserMemoryBytes);
  86. clusterTotalMemoryReservation.set(totalMemoryBytes);
  87. // 当lowMemoryKiller是TotalReservationLowMemoryKiller, TotalReservationOnBlockedNodesLowMemoryKiller时
  88. // 出现OOM
  89. // 非querykilled
  90. // 最后一次出现的OOM时间大于上次OOM的时间时
  91. //
  92. if (!(lowMemoryKiller instanceof NoneLowMemoryKiller) &&
  93. outOfMemory &&
  94. !queryKilled &&
  95. nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {
  96. // 上一次killquery是否完成
  97. if (isLastKilledQueryGone()) {
  98. // 调用oom的killer
  99. callOomKiller(runningQueries);
  100. }
  101. else {
  102. log.debug("Last killed query is still not gone: %s", lastKilledQuery);
  103. }
  104. }
  105. }
  106. }
  107. catch (Exception e) {
  108. log.error("Error handleOOMQuery: " + e.getMessage());
  109. }
  110. finally {
  111. if (lock != null && locked) {
  112. lock.unlock();
  113. }
  114. }
  115. }
  116. Map<MemoryPoolId, Integer> countByPool = new HashMap<>();
  117. // 统计不同的query使用线程池
  118. for (QueryExecution query : runningQueries) {
  119. MemoryPoolId id = query.getMemoryPool().getId();
  120. countByPool.put(id, countByPool.getOrDefault(id, 0) + 1);
  121. }
  122. // 更新线程池
  123. updatePools(countByPool);
  124. MemoryPoolAssignmentsRequest assignmentsRequest;
  125. //
  126. if (pools.containsKey(RESERVED_POOL)) {
  127. // 如果预留线程池已使用,需要更新
  128. assignmentsRequest = updateAssignments(runningQueries);
  129. }
  130. else {
  131. // If reserved pool is not enabled, we don't create a MemoryPoolAssignmentsRequest that puts all the queries
  132. // in the general pool (as they already are). In this case we create an effectively NOOP MemoryPoolAssignmentsRequest.
  133. // Once the reserved pool is removed we should get rid of the logic of putting queries into reserved pool including
  134. // this piece of code.
  135. // 如果未启用保留池,则不会创建将所有查询放入常规池中的MemoryPoolAssignmentsRequest(因为它们已经存在)。
  136. // 在这种情况下,我们将创建一个有效的NOOP MemoryPoolAssignmentsRequest。
  137. // 除去保留池后,我们应该摆脱将查询放入保留池的逻辑,包括这段代码。
  138. assignmentsRequest = new MemoryPoolAssignmentsRequest(coordinatorId, Long.MIN_VALUE, ImmutableList.of());
  139. }
  140. // 更新节点
  141. updateNodes(assignmentsRequest);
  142. }

updateNodes方法更新所有节点

  1. private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments)
  2. {
  3. ImmutableSet.Builder<InternalNode> builder = ImmutableSet.builder();
  4. // 统计所有的内部节点
  5. Set<InternalNode> aliveNodes = builder
  6. .addAll(nodeManager.getNodes(ACTIVE))
  7. .addAll(nodeManager.getNodes(ISOLATING))
  8. .addAll(nodeManager.getNodes(ISOLATED))
  9. .addAll(nodeManager.getNodes(SHUTTING_DOWN))
  10. .build();
  11. ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
  12. .map(InternalNode::getNodeIdentifier)
  13. .collect(toImmutableSet());
  14. // Remove nodes that don't exist anymore
  15. // Make a copy to materialize the set difference
  16. Set<String> deadNodes = ImmutableSet.copyOf(difference(nodes.keySet(), aliveNodeIds));
  17. // 移除已经dead的node
  18. nodes.keySet().removeAll(deadNodes);
  19. // Add new nodes
  20. for (InternalNode node : aliveNodes) {
  21. if (!nodes.containsKey(node.getNodeIdentifier()) && shouldIncludeNode(node)) {
  22. // nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
  23. nodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
  24. allNodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
  25. }
  26. }
  27. // Schedule refresh 刷新集群信息
  28. for (RemoteNodeMemory node : nodes.values()) {
  29. node.asyncRefresh(assignments);
  30. }
  31. // Schedule All refresh
  32. //
  33. for (RemoteNodeMemory node : allNodes.values()) {
  34. node.asyncRefresh(assignments);
  35. }
  36. }

asyncRefresh方法,将目前集群的所有资源信息发送到每个worker上,并获取对应的本地资源信息

  1. // 异步刷新
  2. public void asyncRefresh(MemoryPoolAssignmentsRequest assignments)
  3. {
  4. Duration sinceUpdate = nanosSince(lastUpdateNanos.get());
  5. //
  6. if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 &&
  7. sinceUpdate.toMillis() > 10_000 &&
  8. future.get() != null) {
  9. log.warn("Memory info update request to %s has not returned in %s", memoryInfoUri, sinceUpdate.toString(SECONDS));
  10. lastWarningLogged.set(System.nanoTime());
  11. }
  12. //
  13. if (sinceUpdate.toMillis() > 1_000 && future.get() == null) {
  14. Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
  15. .setUri(memoryInfoUri)
  16. .setBodyGenerator(createBodyGenerator(assignments))
  17. .build();
  18. ResponseHandler responseHandler;
  19. if (isBinaryEncoding) {
  20. responseHandler = createFullSmileResponseHandler((SmileCodec<MemoryInfo>) memoryInfoCodec);
  21. }
  22. else {
  23. responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(memoryInfoCodec));
  24. }
  25. HttpResponseFuture<BaseResponse<MemoryInfo>> responseFuture = httpClient.executeAsync(request, responseHandler);
  26. future.compareAndSet(null, responseFuture);
  27. Futures.addCallback(responseFuture, new FutureCallback<BaseResponse<MemoryInfo>>()
  28. {
  29. @Override
  30. public void onSuccess(@Nullable BaseResponse<MemoryInfo> result)
  31. {
  32. lastUpdateNanos.set(System.nanoTime());
  33. future.compareAndSet(responseFuture, null);
  34. long version = currentAssignmentVersion.get();
  35. if (result != null) {
  36. if (result.hasValue()) {
  37. memoryInfo.set(Optional.ofNullable(result.getValue()));
  38. }
  39. if (result.getStatusCode() != OK.code()) {
  40. log.warn("Error fetching memory info from %s returned status %d: %s", memoryInfoUri, result.getStatusCode(), result.getStatusCode());
  41. return;
  42. }
  43. }
  44. currentAssignmentVersion.compareAndSet(version, assignments.getVersion());
  45. }
  46. @Override
  47. public void onFailure(Throwable t)
  48. {
  49. log.warn("Error fetching memory info from %s: %s", memoryInfoUri, t.getMessage());
  50. lastUpdateNanos.set(System.nanoTime());
  51. future.compareAndSet(responseFuture, null);
  52. }
  53. }, directExecutor());
  54. }
  55. }

MemoryResource在该类中,用户接收协调器发送的资源信息,POST方法,将本地的内存信息返回到协调器

  1. @POST
  2. @Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
  3. @Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
  4. public MemoryInfo getMemoryInfo(MemoryPoolAssignmentsRequest request)
  5. {
  6. taskManager.updateMemoryPoolAssignments(request);
  7. return memoryManager.getInfo();
  8. }