1.内存初始化
初始化代码在LocalMemoryManager中,启动时将内存分为2个内存池,分别是:
- RESERVED_POOL:预留内存池,用于执行最耗费内存资源的查询。
- GENERAL_POOL:普通内存池,用于执行除最耗费内存查询以外的查询。
LocalMemoryManager的configureMemoryPools方法实现内存的分配
1.首先扣除heapHeadroom的大小,占用堆空间大小的30%,剩下的内存留给GENERAL_POOL(普通内存池)和RESERVED_POOL(预留内存池)
2.判断是否启动预留内存池(默认是开启),分配的堆内存大小占堆空间大小的30%,主要用于query比较占用内存使用
3.剩下的内存分配给普通内存池
private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
{
validateHeapHeadroom(config, availableMemory);
// 扣除系统 jvm(xmX)的0.3 表示堆大小
maxMemory = new DataSize(availableMemory - config.getHeapHeadroom().toBytes(), BYTE);
checkArgument(
config.getMaxQueryMemoryPerNode().toBytes() <= config.getMaxQueryTotalMemoryPerNode().toBytes(),
"Max query memory per node (%s) cannot be greater than the max query total memory per node (%s).",
QUERY_MAX_MEMORY_PER_NODE_CONFIG,
QUERY_MAX_TOTAL_MEMORY_PER_NODE_CONFIG);
// 缓存不同内存类型 与 内存池
ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();
// 普通内存池
long generalPoolSize = maxMemory.toBytes();
// 是否开启 预留内存池,用于执行最耗费内存资源的查询
if (config.isReservedPoolEnabled()) {
// AVAILABLE_HEAP_MEMORY * 0.3 每个query最大内存限制条件
builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryTotalMemoryPerNode()));
// 重新计普通内存大小
generalPoolSize -= config.getMaxQueryTotalMemoryPerNode().toBytes();
}
verify(generalPoolSize > 0, "general memory pool size is 0");
// 剩余的普通内存池的大小
builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, new DataSize(generalPoolSize, BYTE)));
this.pools = builder.build();
}
2.定时调整内存池分配策略
SqlQueryManager中的start方法是在创建该实例之后就开始调用,其中enforceMemoryLimits()方法用于内存调整
@PostConstruct // 实现类执行构造函数之后调用该方法
public void start()
{
queryTracker.start();
// 定时调度
queryManagementExecutor.scheduleWithFixedDelay(() -> {
try {
killBlockingQuery();
// 强制内存调整
enforceMemoryLimits();
}
catch (Throwable e) {
log.error(e, "Error enforcing memory limits");
}
try {
enforceCpuLimits();
}
catch (Throwable e) {
log.error(e, "Error enforcing query CPU time limits");
}
}, 1, 1, TimeUnit.SECONDS); // 每一秒触发一次执行
}
进一步细分enforceMemoryLimits方法实现细节
/**
* Enforce memory limits at the query level 在查询级别强制执行内存限制
*/
private void enforceMemoryLimits()
{
List<QueryExecution> runningQueries;
Supplier<List<BasicQueryInfo>> allQueryInfoSupplier;
Map<String, SharedQueryState> queryStates = StateCacheStore.get().getCachedStates(StateStoreConstants.QUERY_STATE_COLLECTION_NAME);
if (isMultiCoordinatorEnabled() && queryStates != null) {
// Get all queries from state store
runningQueries = queryStates.values().stream()
.filter(state -> state.getBasicQueryInfo().getState() == RUNNING)
.map(state -> new SharedQueryExecution(state, sessionPropertyManager)).collect(Collectors.toList());
allQueryInfoSupplier = () -> queryStates.values().stream().map(state -> state.getBasicQueryInfo()).collect(Collectors.toList());
}
else {
runningQueries = queryTracker.getAllQueries().stream()
.filter(query -> query.getState() == RUNNING)
.collect(toImmutableList());
allQueryInfoSupplier = this::getQueries;
}
// 核心处理源码
memoryManager.process(runningQueries, allQueryInfoSupplier);
// Put the chosen query to kill into state store
// 将所选查询杀死以进入状态存储
if (isMultiCoordinatorEnabled() && queryStates != null) {
updateKillingQuery(runningQueries, queryStates);
}
}
查询一下核心处理代码process
public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier)
{
// 判断是否启动集群
if (!enabled) {
return;
}
//make sure state store is loaded when multiple coordinator is enabled
// 确保在启用多个协调器时加载状态存储
if (!hetuConfig.isMultipleCoordinatorEnabled()
|| (hetuConfig.isMultipleCoordinatorEnabled()
&& stateStoreProvider.getStateStore() != null)) {
Lock lock = null;
boolean locked = false;
try {
if (hetuConfig.isMultipleCoordinatorEnabled()) {
lock = stateStoreProvider.getStateStore().getLock(StateStoreConstants.HANDLE_OOM_QUERY_LOCK_NAME);
}
else {
lock = new ReentrantLock();
}
// 加锁
locked = lock.tryLock(StateStoreConstants.DEFAULT_ACQUIRED_LOCK_TIME_MS, TimeUnit.MILLISECONDS);
if (locked) {
// TODO revocable memory reservations can also leak and may need to be detected in the future
// 可撤销的内存预留也可能会泄漏,将来可能需要检测到
// We are only concerned about the leaks in general pool.
// 我们只关心普通池中的泄漏
// 检查内存泄漏,并保存起来
memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pools.get(GENERAL_POOL).getQueryMemoryReservations());
// 判断集群是否出现内存泄漏
/**
* 以下两种情况出现:
* 1.当预留内存池空间为空, 普通内存池出现堵塞的节点,则表示集群出现OOM
* 2.当预留内存池不为空,并且普通内存池出现堵塞节点,则表示集群出现OOM
*/
boolean outOfMemory = isClusterOutOfMemory();
if (!outOfMemory) {
lastTimeNotOutOfMemory = System.nanoTime(); // 最后一次检查的时间
}
boolean queryKilled = false;
// 所有用户内存字节数
long totalUserMemoryBytes = 0L;
// 所有字节数
long totalMemoryBytes = 0L;
// 遍历正在运行的查询语句
for (QueryExecution query : runningQueries) {
boolean resourceOvercommit = resourceOvercommit(query.getSession());
// 用户剩余的内存大小
long userMemoryReservation = query.getUserMemoryReservation().toBytes();
// 所有剩余内存的大小
long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
// 当出现资源超支,并且 出现OOM, 出现执行kill 一些query
if (resourceOvercommit && outOfMemory) {
// If a query has requested resource overcommit, only kill it if the cluster has run out of memory
// 如果查询请求了资源过量使用,则只有在群集内存不足时才将其杀死
DataSize memory = succinctBytes(getQueryMemoryReservation(query));
//
query.fail(new PrestoException(CLUSTER_OUT_OF_MEMORY,
format("The cluster is out of memory and %s=true, so this query was killed. It was using %s of memory", RESOURCE_OVERCOMMIT, memory)));
queryKilled = true;
}
// 请求资料没有过量时
if (!resourceOvercommit) {
// 获取用户内存使用最小值
long userMemoryLimit = min(maxQueryMemory.toBytes(), getQueryMaxMemory(query.getSession()).toBytes());
// 用户请求内存大于限制条件时,则请求失败,并标记为可以kill
if (userMemoryReservation > userMemoryLimit) {
query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));
queryKilled = true;
}
// 请求所有内存限制值
long totalMemoryLimit = min(maxQueryTotalMemory.toBytes(), getQueryMaxTotalMemory(query.getSession()).toBytes());
// 大于所有内存最大小值时,则标记kill
if (totalMemoryReservation > totalMemoryLimit) {
query.fail(exceededGlobalTotalLimit(succinctBytes(totalMemoryLimit)));
queryKilled = true;
}
}
// 并累加之前的内存值, 统计所有的用户使用的内存大小
// 所有内存的总大小
totalUserMemoryBytes += userMemoryReservation;
totalMemoryBytes += totalMemoryReservation;
}
// 再次设置需要所有内存值
clusterUserMemoryReservation.set(totalUserMemoryBytes);
clusterTotalMemoryReservation.set(totalMemoryBytes);
// 当lowMemoryKiller是TotalReservationLowMemoryKiller, TotalReservationOnBlockedNodesLowMemoryKiller时
// 出现OOM
// 非querykilled
// 最后一次出现的OOM时间大于上次OOM的时间时
//
if (!(lowMemoryKiller instanceof NoneLowMemoryKiller) &&
outOfMemory &&
!queryKilled &&
nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {
// 上一次killquery是否完成
if (isLastKilledQueryGone()) {
// 调用oom的killer
callOomKiller(runningQueries);
}
else {
log.debug("Last killed query is still not gone: %s", lastKilledQuery);
}
}
}
}
catch (Exception e) {
log.error("Error handleOOMQuery: " + e.getMessage());
}
finally {
if (lock != null && locked) {
lock.unlock();
}
}
}
Map<MemoryPoolId, Integer> countByPool = new HashMap<>();
// 统计不同的query使用线程池
for (QueryExecution query : runningQueries) {
MemoryPoolId id = query.getMemoryPool().getId();
countByPool.put(id, countByPool.getOrDefault(id, 0) + 1);
}
// 更新线程池
updatePools(countByPool);
MemoryPoolAssignmentsRequest assignmentsRequest;
//
if (pools.containsKey(RESERVED_POOL)) {
// 如果预留线程池已使用,需要更新
assignmentsRequest = updateAssignments(runningQueries);
}
else {
// If reserved pool is not enabled, we don't create a MemoryPoolAssignmentsRequest that puts all the queries
// in the general pool (as they already are). In this case we create an effectively NOOP MemoryPoolAssignmentsRequest.
// Once the reserved pool is removed we should get rid of the logic of putting queries into reserved pool including
// this piece of code.
// 如果未启用保留池,则不会创建将所有查询放入常规池中的MemoryPoolAssignmentsRequest(因为它们已经存在)。
// 在这种情况下,我们将创建一个有效的NOOP MemoryPoolAssignmentsRequest。
// 除去保留池后,我们应该摆脱将查询放入保留池的逻辑,包括这段代码。
assignmentsRequest = new MemoryPoolAssignmentsRequest(coordinatorId, Long.MIN_VALUE, ImmutableList.of());
}
// 更新节点
updateNodes(assignmentsRequest);
}
updateNodes方法更新所有节点
private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments)
{
ImmutableSet.Builder<InternalNode> builder = ImmutableSet.builder();
// 统计所有的内部节点
Set<InternalNode> aliveNodes = builder
.addAll(nodeManager.getNodes(ACTIVE))
.addAll(nodeManager.getNodes(ISOLATING))
.addAll(nodeManager.getNodes(ISOLATED))
.addAll(nodeManager.getNodes(SHUTTING_DOWN))
.build();
ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
.map(InternalNode::getNodeIdentifier)
.collect(toImmutableSet());
// Remove nodes that don't exist anymore
// Make a copy to materialize the set difference
Set<String> deadNodes = ImmutableSet.copyOf(difference(nodes.keySet(), aliveNodeIds));
// 移除已经dead的node
nodes.keySet().removeAll(deadNodes);
// Add new nodes
for (InternalNode node : aliveNodes) {
if (!nodes.containsKey(node.getNodeIdentifier()) && shouldIncludeNode(node)) {
// nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
nodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
allNodes.put(node.getInternalUri().toString(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestCodec, locationFactory.createMemoryInfoLocation(node), isBinaryEncoding));
}
}
// Schedule refresh 刷新集群信息
for (RemoteNodeMemory node : nodes.values()) {
node.asyncRefresh(assignments);
}
// Schedule All refresh
//
for (RemoteNodeMemory node : allNodes.values()) {
node.asyncRefresh(assignments);
}
}
asyncRefresh方法,将目前集群的所有资源信息发送到每个worker上,并获取对应的本地资源信息
// 异步刷新
public void asyncRefresh(MemoryPoolAssignmentsRequest assignments)
{
Duration sinceUpdate = nanosSince(lastUpdateNanos.get());
//
if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 &&
sinceUpdate.toMillis() > 10_000 &&
future.get() != null) {
log.warn("Memory info update request to %s has not returned in %s", memoryInfoUri, sinceUpdate.toString(SECONDS));
lastWarningLogged.set(System.nanoTime());
}
//
if (sinceUpdate.toMillis() > 1_000 && future.get() == null) {
Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
.setUri(memoryInfoUri)
.setBodyGenerator(createBodyGenerator(assignments))
.build();
ResponseHandler responseHandler;
if (isBinaryEncoding) {
responseHandler = createFullSmileResponseHandler((SmileCodec<MemoryInfo>) memoryInfoCodec);
}
else {
responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(memoryInfoCodec));
}
HttpResponseFuture<BaseResponse<MemoryInfo>> responseFuture = httpClient.executeAsync(request, responseHandler);
future.compareAndSet(null, responseFuture);
Futures.addCallback(responseFuture, new FutureCallback<BaseResponse<MemoryInfo>>()
{
@Override
public void onSuccess(@Nullable BaseResponse<MemoryInfo> result)
{
lastUpdateNanos.set(System.nanoTime());
future.compareAndSet(responseFuture, null);
long version = currentAssignmentVersion.get();
if (result != null) {
if (result.hasValue()) {
memoryInfo.set(Optional.ofNullable(result.getValue()));
}
if (result.getStatusCode() != OK.code()) {
log.warn("Error fetching memory info from %s returned status %d: %s", memoryInfoUri, result.getStatusCode(), result.getStatusCode());
return;
}
}
currentAssignmentVersion.compareAndSet(version, assignments.getVersion());
}
@Override
public void onFailure(Throwable t)
{
log.warn("Error fetching memory info from %s: %s", memoryInfoUri, t.getMessage());
lastUpdateNanos.set(System.nanoTime());
future.compareAndSet(responseFuture, null);
}
}, directExecutor());
}
}
MemoryResource在该类中,用户接收协调器发送的资源信息,POST方法,将本地的内存信息返回到协调器
@POST
@Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
public MemoryInfo getMemoryInfo(MemoryPoolAssignmentsRequest request)
{
taskManager.updateMemoryPoolAssignments(request);
return memoryManager.getInfo();
}