Skip to content

Commit

Permalink
Support for killing individual task if cluster is out of memory
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 8, 2022
1 parent f26ee2b commit 507724e
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 38 deletions.
147 changes: 121 additions & 26 deletions core/trino-main/src/main/java/io/trino/memory/ClusterMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryIdGenerator;
import io.trino.execution.StageInfo;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskStatus;
import io.trino.execution.scheduler.NodeSchedulerConfig;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class ClusterMemoryManager
private final AtomicLong clusterTotalMemoryReservation = new AtomicLong();
private final AtomicLong clusterMemoryBytes = new AtomicLong();
private final AtomicLong queriesKilledDueToOutOfMemory = new AtomicLong();
private final AtomicLong tasksKilledDueToOutOfMemory = new AtomicLong();
private final boolean isWorkScheduledOnCoordinator;

@GuardedBy("this")
Expand All @@ -120,7 +122,7 @@ public class ClusterMemoryManager
private long lastTimeNotOutOfMemory = System.nanoTime();

@GuardedBy("this")
private QueryId lastKilledQuery;
private KillTarget lastKillTarget;

@Inject
public ClusterMemoryManager(
Expand Down Expand Up @@ -228,11 +230,11 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
outOfMemory &&
!queryKilled &&
nanosSince(lastTimeNotOutOfMemory).compareTo(killOnOutOfMemoryDelay) > 0) {
if (isLastKilledQueryGone()) {
if (isLastKillTargetGone(runningQueries)) {
callOomKiller(runningQueries);
}
else {
log.debug("Last killed query is still not gone: %s", lastKilledQuery);
log.debug("Last killed target is still not gone: %s", lastKillTarget);
}
}

Expand All @@ -245,38 +247,72 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
List<QueryMemoryInfo> queryMemoryInfoList = Streams.stream(runningQueries)
.map(this::createQueryMemoryInfo)
.collect(toImmutableList());

List<MemoryInfo> nodeMemoryInfos = nodes.values().stream()
.map(RemoteNodeMemory::getInfo)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
Optional<QueryId> chosenQueryId = lowMemoryKiller.chooseQueryToKill(queryMemoryInfoList, nodeMemoryInfos);
if (chosenQueryId.isPresent()) {
log.debug("Low memory killer chose %s", chosenQueryId.get());
Optional<QueryExecution> chosenQuery = Streams.stream(runningQueries).filter(query -> chosenQueryId.get().equals(query.getQueryId())).collect(toOptional());
if (chosenQuery.isPresent()) {
// See comments in isLastKilledQueryGone for why chosenQuery might be absent.
chosenQuery.get().fail(new TrinoException(CLUSTER_OUT_OF_MEMORY, "Query killed because the cluster is out of memory. Please try again in a few minutes."));
queriesKilledDueToOutOfMemory.incrementAndGet();
lastKilledQuery = chosenQueryId.get();
logQueryKill(chosenQueryId.get(), nodeMemoryInfos);

Optional<KillTarget> killTarget = lowMemoryKiller.chooseQueryToKill(queryMemoryInfoList, nodeMemoryInfos);

if (killTarget.isPresent()) {
if (killTarget.get().isWholeQuery()) {
QueryId queryId = killTarget.get().getQuery();
log.debug("Low memory killer chose %s", queryId);
Optional<QueryExecution> chosenQuery = findRunningQuery(runningQueries, killTarget.get().getQuery());
if (chosenQuery.isPresent()) {
// See comments in isQueryGone for why chosenQuery might be absent.
chosenQuery.get().fail(new TrinoException(CLUSTER_OUT_OF_MEMORY, "Query killed because the cluster is out of memory. Please try again in a few minutes."));
queriesKilledDueToOutOfMemory.incrementAndGet();
lastKillTarget = killTarget.get();
logQueryKill(queryId, nodeMemoryInfos);
}
}
else {
Set<TaskId> tasks = killTarget.get().getTasks();
log.debug("Low memory killer chose %s", tasks);
ImmutableSet.Builder<TaskId> killedTasksBuilder = ImmutableSet.builder();
for (TaskId task : tasks) {
Optional<QueryExecution> runningQuery = findRunningQuery(runningQueries, task.getQueryId());
if (runningQuery.isPresent()) {
runningQuery.get().failTask(task, new TrinoException(CLUSTER_OUT_OF_MEMORY, "Task killed because the cluster is out of memory."));
tasksKilledDueToOutOfMemory.incrementAndGet();
killedTasksBuilder.add(task);
}
}
// only record tasks actually killed
ImmutableSet<TaskId> killedTasks = killedTasksBuilder.build();
if (!killedTasks.isEmpty()) {
lastKillTarget = KillTarget.selectedTasks(killedTasks);
logTasksKill(killedTasks, nodeMemoryInfos);
}
}
}
}

@GuardedBy("this")
private boolean isLastKilledQueryGone()
private boolean isLastKillTargetGone(Iterable<QueryExecution> runningQueries)
{
if (lastKilledQuery == null) {
if (lastKillTarget == null) {
return true;
}

if (lastKillTarget.isWholeQuery()) {
return isQueryGone(lastKillTarget.getQuery());
}

return areTasksGone(lastKillTarget.getTasks(), runningQueries);
}

private boolean isQueryGone(QueryId killedQuery)
{
// If the lastKilledQuery is marked as leaked by the ClusterMemoryLeakDetector we consider the lastKilledQuery as gone,
// so that the ClusterMemoryManager can continue to make progress even if there are leaks.
// Even if the weak references to the leaked queries are GCed in the ClusterMemoryLeakDetector, it will mark the same queries
// as leaked in its next run, and eventually the ClusterMemoryManager will make progress.
if (memoryLeakDetector.wasQueryPossiblyLeaked(lastKilledQuery)) {
lastKilledQuery = null;
if (memoryLeakDetector.wasQueryPossiblyLeaked(killedQuery)) {
lastKillTarget = null;
return true;
}

Expand All @@ -286,7 +322,35 @@ private boolean isLastKilledQueryGone()
// Therefore, even if the query appears to be gone here, it might be back when one inspects nodes later.
return !pool
.getQueryMemoryReservations()
.containsKey(lastKilledQuery);
.containsKey(killedQuery);
}

private boolean areTasksGone(Set<TaskId> tasks, Iterable<QueryExecution> runningQueries)
{
List<QueryExecution> queryExecutions = tasks.stream()
.map(TaskId::getQueryId)
.distinct()
.map(query -> findRunningQuery(runningQueries, query))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());

if (queryExecutions.isEmpty()) {
// all queries we care about are gone
return true;
}

Set<TaskId> runningTasks = queryExecutions.stream()
.flatMap(query -> getRunningTasksForQuery(query).stream())
.map(TaskStatus::getTaskId)
.collect(toImmutableSet());

return tasks.stream().noneMatch(runningTasks::contains);
}

private Optional<QueryExecution> findRunningQuery(Iterable<QueryExecution> runningQueries, QueryId queryId)
{
return Streams.stream(runningQueries).filter(query -> queryId.equals(query.getQueryId())).collect(toOptional());
}

private void logQueryKill(QueryId killedQueryId, List<MemoryInfo> nodes)
Expand All @@ -296,18 +360,43 @@ private void logQueryKill(QueryId killedQueryId, List<MemoryInfo> nodes)
}
StringBuilder nodeDescription = new StringBuilder();
nodeDescription.append("Query Kill Decision: Killed ").append(killedQueryId).append("\n");
for (MemoryInfo node : nodes) {
MemoryPoolInfo memoryPoolInfo = node.getPool();
nodeDescription.append("Query Kill Scenario: ");
nodeDescription.append("MaxBytes ").append(memoryPoolInfo.getMaxBytes()).append(' ');
nodeDescription.append("FreeBytes ").append(memoryPoolInfo.getFreeBytes() + memoryPoolInfo.getReservedRevocableBytes()).append(' ');
nodeDescription.append("Queries ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(nodeDescription, memoryPoolInfo.getQueryMemoryReservations());
nodeDescription.append('\n');
nodeDescription.append(formatKillScenario(nodes));
log.info("%s", nodeDescription);
}

private void logTasksKill(Set<TaskId> tasks, List<MemoryInfo> nodes)
{
if (!log.isInfoEnabled()) {
return;
}
StringBuilder nodeDescription = new StringBuilder();
nodeDescription.append("Query Kill Decision: Tasks Killed ")
.append(tasks)
.append("(")
.append(tasks)
.append(")")
.append("\n");
nodeDescription.append(formatKillScenario(nodes));
log.info("%s", nodeDescription);
}

private String formatKillScenario(List<MemoryInfo> nodes)
{
StringBuilder stringBuilder = new StringBuilder();
for (MemoryInfo nodeMemoryInfo : nodes) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryInfo.getPool();
stringBuilder.append("Query Kill Scenario: ");
stringBuilder.append("MaxBytes ").append(memoryPoolInfo.getMaxBytes()).append(' ');
stringBuilder.append("FreeBytes ").append(memoryPoolInfo.getFreeBytes() + memoryPoolInfo.getReservedRevocableBytes()).append(' ');
stringBuilder.append("Queries ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getQueryMemoryReservations());
stringBuilder.append("Tasks ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, nodeMemoryInfo.getTasksMemoryInfo().asMap());
stringBuilder.append('\n');
}
return stringBuilder.toString();
}

@VisibleForTesting
ClusterMemoryPool getPool()
{
Expand Down Expand Up @@ -473,4 +562,10 @@ public long getQueriesKilledDueToOutOfMemory()
{
return queriesKilledDueToOutOfMemory.get();
}

@Managed
public long getTasksKilledDueToOutOfMemory()
{
return tasksKilledDueToOutOfMemory.get();
}
}
100 changes: 100 additions & 0 deletions core/trino-main/src/main/java/io/trino/memory/KillTarget.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.memory;

import com.google.common.collect.ImmutableSet;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class KillTarget
{
// Either query id or tasks list must be set
// If query id is set then whole query will be killed; individual tasks otherwise.

private final Optional<QueryId> query;
private final Set<TaskId> tasks;

public static KillTarget wholeQuery(QueryId queryId)
{
return new KillTarget(Optional.of(queryId), ImmutableSet.of());
}

public static KillTarget selectedTasks(Set<TaskId> tasks)
{
return new KillTarget(Optional.empty(), tasks);
}

private KillTarget(Optional<QueryId> query, Set<TaskId> tasks)
{
requireNonNull(query, "query is null");
requireNonNull(tasks, "tasks is null");
if ((query.isPresent() && !tasks.isEmpty()) || (query.isEmpty() && tasks.isEmpty())) {
throw new IllegalArgumentException("either query or tasks must be set");
}
this.query = query;
this.tasks = ImmutableSet.copyOf(tasks);
}

public boolean isWholeQuery()
{
return query.isPresent();
}

public QueryId getQuery()
{
return query.orElseThrow(() -> new IllegalStateException("query not set in KillTarget: " + this));
}

public Set<TaskId> getTasks()
{
checkState(!tasks.isEmpty(), "tasks not set in KillTarget: " + this);
return tasks;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KillTarget that = (KillTarget) o;
return Objects.equals(query, that.query) && Objects.equals(tasks, that.tasks);
}

@Override
public int hashCode()
{
return Objects.hash(query, tasks);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("query", query)
.add("tasks", tasks)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public interface LowMemoryKiller
{
Optional<QueryId> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes);
Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes);

class QueryMemoryInfo
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

package io.trino.memory;

import io.trino.spi.QueryId;

import java.util.List;
import java.util.Optional;

public class NoneLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<QueryId> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ public class TotalReservationLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<QueryId> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
QueryId biggestQuery = null;
Optional<QueryId> biggestQuery = Optional.empty();
long maxMemory = 0;
for (QueryMemoryInfo query : runningQueries) {
long bytesUsed = query.getMemoryReservation();
if (bytesUsed > maxMemory) {
biggestQuery = query.getQueryId();
biggestQuery = Optional.of(query.getQueryId());
maxMemory = bytesUsed;
}
}
return Optional.ofNullable(biggestQuery);
return biggestQuery.map(KillTarget::wholeQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TotalReservationOnBlockedNodesLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<QueryId> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
Map<QueryId, Long> memoryReservationOnBlockedNodes = new HashMap<>();
for (MemoryInfo node : nodes) {
Expand All @@ -47,6 +47,7 @@ public Optional<QueryId> chooseQueryToKill(List<QueryMemoryInfo> runningQueries,

return memoryReservationOnBlockedNodes.entrySet().stream()
.max(comparingLong(Map.Entry::getValue))
.map(Map.Entry::getKey);
.map(Map.Entry::getKey)
.map(KillTarget::wholeQuery);
}
}
Loading

0 comments on commit 507724e

Please sign in to comment.