1.内存初始化
初始化代码在LocalMemoryManager中,启动时将内存分为2个内存池,分别是:
- RESERVED_POOL:预留内存池,用于执行最耗费内存资源的查询。
 - GENERAL_POOL:普通内存池,用于执行除最耗费内存查询以外的查询。
 

LocalMemoryManager的configureMemoryPools方法实现内存的分配
1.首先扣除heapHeadroom的大小,占用堆空间大小的30%,剩下的内存留给GENERAL_POOL(普通内存池)和RESERVED_POOL(预留内存池)
2.判断是否启动预留内存池(默认是开启),分配的堆内存大小占堆空间大小的30%,主要用于query比较占用内存使用
3.剩下的内存分配给普通内存池
private void configureMemoryPools(NodeMemoryConfig config, long availableMemory){validateHeapHeadroom(config, availableMemory);// 扣除系统 jvm(xmX)的0.3 表示堆大小maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);checkArgument(config.getMaxQueryMemoryPerNode().toBytes() <= config.getMaxQueryTotalMemoryPerNode().toBytes(),"Max query memory per node (%s) cannot be greater than the max query total memory per node (%s).",QUERY_MAX_MEMORY_PER_NODE_CONFIG,QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG);// 缓存不同内存类型 与 内存池ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();// 普通内存池long generalPoolSize = maxMemory.toBytes();// 是否开启 预留内存池,用于执行最耗费内存资源的查询if (config.isReservedPoolEnabled()) {// AVAILABLE_HEAP_MEMORY * 0.3 每个query最大内存限制条件builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryTotalMemoryPerNode()));// 重新计普通内存大小generalPoolSize -= config.getMaxQueryTotalMemoryPerNode().toBytes();}verify(generalPoolSize > 0, "general memory pool size is 0");// 剩余的普通内存池的大小builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, new DataSize(generalPoolSize, BYTE)));this.pools = builder.build();}
2.定时调整内存池分配策略
SqlQueryManager中的start方法是在创建该实例之后就开始调用,其中enforceMemoryLimits()方法用于内存调整
@PostConstruct // 实现类执行构造函数之后调用该方法public void start(){queryTracker.start();// 定时调度queryManagementExecutor.scheduleWithFixedDelay(() -> {try {killBlockingQuery();// 强制内存调整enforceMemoryLimits();}catch (Throwable e) {log.error(e, "Error enforcing memory limits");}try {enforceCpuLimits();}catch (Throwable e) {log.error(e, "Error enforcing query CPU time limits");}}, 1, 1, TimeUnit.SECONDS); // 每一秒触发一次执行}
进一步细分enforceMemoryLimits方法实现细节
/*** Enforce memory limits at the query level 在查询级别强制执行内存限制*/private void enforceMemoryLimits(){List<QueryExecution> runningQueries;Supplier<List<BasicQueryInfo>> allQueryInfoSupplier;Map<String, SharedQueryState> queryStates = StateCacheStore.get().getCachedStates(StateStoreConstants.QUERY_STATE_COLLECTION_NAME);if (isMultiCoordinatorEnabled() && queryStates != null) {// Get all queries from state storerunningQueries = queryStates.values().stream().filter(state -> state.getBasicQueryInfo().getState() == RUNNING).map(state -> new SharedQueryExecution(state, sessionPropertyManager)).collect(Collectors.toList());allQueryInfoSupplier = () -> queryStates.values().stream().map(state -> state.getBasicQueryInfo()).collect(Collectors.toList());}else {runningQueries = queryTracker.getAllQueries().stream().filter(query -> query.getState() == RUNNING).collect(toImmutableList());allQueryInfoSupplier = this::getQueries;}// 核心处理源码memoryManager.process(runningQueries, allQueryInfoSupplier);// Put the chosen query to kill into state store// 将所选查询杀死以进入状态存储if (isMultiCoordinatorEnabled() && queryStates != null) {updateKillingQuery(runningQueries, queryStates);}}
查询一下核心处理代码process
public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier){// 判断是否启动集群if (!enabled) {return;}//make sure state store is loaded when multiple coordinator is enabled// 确保在启用多个协调器时加载状态存储if (!hetuConfig.isMultipleCoordinatorEnabled()|| (hetuConfig.isMultipleCoordinatorEnabled()&& stateStoreProvider.getStateStore() != null)) {Lock lock = null;boolean locked = false;try {if (hetuConfig.isMultipleCoordinatorEnabled()) {lock = stateStoreProvider.getStateStore().getLock(StateStoreConstants.HANDLE_OOM_QUERY_LOCK_NAME);}else {lock = new ReentrantLock();}// 加锁locked = lock.tryLock(StateStoreConstants.DEFAULT_ACQUIRED_LOCK_TIME_MS, TimeUnit.MILLISECONDS);if (locked) {// TODO revocable memory reservations can also leak and may need to be detected in the future// 可撤销的内存预留也可能会泄漏,将来可能需要检测到// We are only concerned about the leaks in general pool.// 我们只关心普通池中的泄漏// 检查内存泄漏,并保存起来memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pools.get(GENERAL_POOL).getQueryMemoryReservations());// 判断集群是否出现内存泄漏/*** 以下两种情况出现:* 1.当预留内存池空间为空, 普通内存池出现堵塞的节点,则表示集群出现OOM* 2.当预留内存池不为空,并且普通内存池出现堵塞节点,则表示集群出现OOM*/boolean outOfMemory = isClusterOutOfMemory();if (!outOfMemory) {lastTimeNotOutOfMemory = System.nanoTime(); // 最后一次检查的时间}boolean queryKilled = false;// 所有用户内存字节数long totalUserMemoryBytes = 0L;// 所有字节数long totalMemoryBytes = 0L;// 遍历正在运行的查询语句for (QueryExecution query : runningQueries) {boolean resourceOvercommit = resourceOvercommit(query.getSession());// 用户剩余的内存大小long userMemoryReservation = query.getUserMemoryReservation().toBytes();// 所有剩余内存的大小long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();// 当出现资源超支,并且 出现OOM, 出现执行kill 一些queryif (resourceOvercommit && outOfMemory) {// If a query has requested resource overcommit, only kill it if the cluster has run out of memory// 如果查询请求了资源过量使用,则只有在群集内存不足时才将其杀死DataSize memory = succinctBytes(getQueryMemoryReservation(query));//query.fail(new PrestoException(CLUSTER_OUT_OF_MEMORY,format("The cluster is out of memory and %s=true, so this query was killed. It was using %s of memory", RESOURCE_OVERCOMMIT, memory)));queryKilled = true;}// 请求资料没有过量时if (!resourceOvercommit) {// 获取用户内存使用最小值long userMemoryLimit = min(maxQueryMemory.toBytes(), getQueryMaxMemory(query.getSession()).toBytes());// 用户请求内存大于限制条件时,则请求失败,并标记为可以killif (userMemoryReservation > userMemoryLimit) {query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));queryKilled = true;}// 请求所有内存限制值long totalMemoryLimit = min(maxQueryTotalMemory.toBytes(), getQueryMaxTotalMemory(query.getSession()).toBytes());// 大于所有内存最大小值时,则标记killif (totalMemoryReservation > totalMemoryLimit) {query.fail(exceededGlobalTotalLimit(succinctBytes(totalMemoryLimit)));queryKilled = true;}}// 并累加之前的内存值, 统计所有的用户使用的内存大小// 所有内存的总大小totalUserMemoryBytes += userMemoryReservation;totalMemoryBytes += totalMemoryReservation;}// 再次设置需要所有内存值clusterUserMemoryReservation.set(totalUserMemoryBytes);clusterTotalMemoryReservation.set(totalMemoryBytes);// 当lowMemoryKiller是TotalReservationLowMemoryKiller, TotalReservationOnBlockedNodesLowMemoryKiller时// 出现OOM// 非querykilled// 最后一次出现的OOM时间大于上次OOM的时间时//if (!(lowMemoryKiller instanceof NoneLowMemoryKiller) &&outOfMemory &&!queryKilled &&nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {// 上一次killquery是否完成if (isLastKilledQueryGone()) {// 调用oom的killercallOomKiller(runningQueries);}else {log.debug("Last killed query is still not gone: %s", lastKilledQuery);}}}}catch (Exception e) {log.error("Error handleOOMQuery: " + e.getMessage());}finally {if (lock != null && locked) {lock.unlock();}}}Map<MemoryPoolId, Integer> countByPool = new HashMap<>();// 统计不同的query使用线程池for (QueryExecution query : runningQueries) {MemoryPoolId id = query.getMemoryPool().getId();countByPool.put(id, countByPool.getOrDefault(id, 0) + 1);}// 更新线程池updatePools(countByPool);MemoryPoolAssignmentsRequest assignmentsRequest;//if (pools.containsKey(RESERVED_POOL)) {// 如果预留线程池已使用,需要更新assignmentsRequest = updateAssignments(runningQueries);}else {// If reserved pool is not enabled, we don't create a MemoryPoolAssignmentsRequest that puts all the queries// in the general pool (as they already are). In this case we create an effectively NOOP MemoryPoolAssignmentsRequest.// Once the reserved pool is removed we should get rid of the logic of putting queries into reserved pool including// this piece of code.// 如果未启用保留池,则不会创建将所有查询放入常规池中的MemoryPoolAssignmentsRequest(因为它们已经存在)。// 在这种情况下,我们将创建一个有效的NOOP MemoryPoolAssignmentsRequest。// 除去保留池后,我们应该摆脱将查询放入保留池的逻辑,包括这段代码。assignmentsRequest = new MemoryPoolAssignmentsRequest(coordinatorId, Long.MIN_VALUE, ImmutableList.of());}// 更新节点updateNodes(assignmentsRequest);}
updateNodes方法更新所有节点
private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments){ImmutableSet.Builder<InternalNode> builder = ImmutableSet.builder();// 统计所有的内部节点Set<InternalNode> aliveNodes = builder.addAll(nodeManager.getNodes(ACTIVE)).addAll(nodeManager.getNodes(ISOLATING)).addAll(nodeManager.getNodes(ISOLATED)).addAll(nodeManager.getNodes(SHUTTING_DOWN)).build();ImmutableSet<String> aliveNodeIds = aliveNodes.stream().map(InternalNode::getNodeIdentifier).collect(toImmutableSet());// Remove nodes that don't exist anymore// Make a copy to materialize the set differenceSet<String> deadNodes = ImmutableSet.copyOf(difference(nodes.keySet(), aliveNodeIds));// 移除已经dead的nodenodes.keySet().removeAll(deadNodes);// Add new nodesfor (InternalNode node : aliveNodes) {if (!nodes.containsKey(node.getNodeIdentifier()) && shouldIncludeNode(node)) {// nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));nodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));allNodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));}}// Schedule refresh 刷新集群信息for (RemoteNodeMemory node : nodes.values()) {node.asyncRefresh(assignments);}// Schedule All refresh//for (RemoteNodeMemory node : allNodes.values()) {node.asyncRefresh(assignments);}}
asyncRefresh方法,将目前集群的所有资源信息发送到每个worker上,并获取对应的本地资源信息
// 异步刷新public void asyncRefresh(MemoryPoolAssignmentsRequest assignments){Duration sinceUpdate = nanosSince(lastUpdateNanos.get());//if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 &&sinceUpdate.toMillis() > 10_000 &&future.get() != null) {log.warn("Memory info update request to %s has not returned in %s", memoryInfoUri, sinceUpdate.toString(SECONDS));lastWarningLogged.set(System.nanoTime());}//if (sinceUpdate.toMillis() > 1_000 && future.get() == null) {Request request = setContentTypeHeaders(isBinaryEncoding, preparePost()).setUri(memoryInfoUri).setBodyGenerator(createBodyGenerator(assignments)).build();ResponseHandler responseHandler;if (isBinaryEncoding) {responseHandler = createFullSmileResponseHandler((SmileCodec<MemoryInfo>) memoryInfoCodec);}else {responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(memoryInfoCodec));}HttpResponseFuture<BaseResponse<MemoryInfo>> responseFuture = httpClient.executeAsync(request, responseHandler);future.compareAndSet(null, responseFuture);Futures.addCallback(responseFuture, new FutureCallback<BaseResponse<MemoryInfo>>(){@Overridepublic void onSuccess(@Nullable BaseResponse<MemoryInfo> result){lastUpdateNanos.set(System.nanoTime());future.compareAndSet(responseFuture, null);long version = currentAssignmentVersion.get();if (result != null) {if (result.hasValue()) {memoryInfo.set(Optional.ofNullable(result.getValue()));}if (result.getStatusCode() != OK.code()) {log.warn("Error fetching memory info from %s returned status %d: %s", memoryInfoUri, result.getStatusCode(), result.getStatusCode());return;}}currentAssignmentVersion.compareAndSet(version, assignments.getVersion());}@Overridepublic void onFailure(Throwable t){log.warn("Error fetching memory info from %s: %s", memoryInfoUri, t.getMessage());lastUpdateNanos.set(System.nanoTime());future.compareAndSet(responseFuture, null);}}, directExecutor());}}
MemoryResource在该类中,用户接收协调器发送的资源信息,POST方法,将本地的内存信息返回到协调器
@POST@Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})@Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})public MemoryInfo getMemoryInfo(MemoryPoolAssignmentsRequest request){taskManager.updateMemoryPoolAssignments(request);return memoryManager.getInfo();}
