Skip to content

Commit

Permalink
Increase memory requirement for tasks failed due to worker crash
Browse files Browse the repository at this point in the history
When a task fails due to a suspected worker crash this may indicate
(with a high likelihood) a potential memory accounting problem.
Increasing memory requirement would essentially decrease overall cluster
load allowing those problematic tasks to finish with a higher chance.
  • Loading branch information
arhimondr committed Jan 9, 2023
1 parent 7b01054 commit dae2060
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor;
import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError;
import static io.trino.execution.scheduler.ErrorCodes.isWorkerCrashAssociatedError;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
Expand All @@ -100,6 +101,7 @@ public class BinPackingNodeAllocatorService
private final AtomicReference<Map<String, MemoryPoolInfo>> nodePoolMemoryInfos = new AtomicReference<>(ImmutableMap.of());
private final AtomicReference<Optional<DataSize>> maxNodePoolSize = new AtomicReference<>(Optional.empty());
private final boolean scheduleOnCoordinator;
private final boolean memoryRequirementIncreaseOnWorkerCrashEnabled;
private final DataSize taskRuntimeMemoryEstimationOverhead;
private final Ticker ticker;

Expand All @@ -118,6 +120,7 @@ public BinPackingNodeAllocatorService(
this(nodeManager,
clusterMemoryManager::getWorkerMemoryInfo,
nodeSchedulerConfig.isIncludeCoordinator(),
memoryManagerConfig.isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(),
Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()),
memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(),
Ticker.systemTicker());
Expand All @@ -128,13 +131,15 @@ public BinPackingNodeAllocatorService(
InternalNodeManager nodeManager,
Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier,
boolean scheduleOnCoordinator,
boolean memoryRequirementIncreaseOnWorkerCrashEnabled,
Duration allowedNoMatchingNodePeriod,
DataSize taskRuntimeMemoryEstimationOverhead,
Ticker ticker)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;
this.memoryRequirementIncreaseOnWorkerCrashEnabled = memoryRequirementIncreaseOnWorkerCrashEnabled;
this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
this.ticker = requireNonNull(ticker, "ticker is null");
Expand Down Expand Up @@ -642,7 +647,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory

// start with the maximum of previously used memory and actual usage
DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory);
if (isOutOfMemoryError(errorCode)) {
if (shouldIncreaseMemoryRequirement(errorCode)) {
// multiply if we hit an oom error
double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE);
Expand Down Expand Up @@ -670,7 +675,7 @@ public synchronized void registerPartitionFinished(Session session, MemoryRequir
if (success) {
memoryUsageDistribution.add(peakMemoryUsage.toBytes());
}
if (!success && errorCode.isPresent() && isOutOfMemoryError(errorCode.get())) {
if (!success && errorCode.isPresent() && shouldIncreaseMemoryRequirement(errorCode.get())) {
double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
// take previousRequiredBytes into account when registering failure on oom. It is conservative hence safer (and in-line with getNextRetryMemoryRequirements)
long previousRequiredBytes = previousMemoryRequirements.getRequiredMemory().toBytes();
Expand Down Expand Up @@ -710,4 +715,9 @@ public String toString()
return "memoryUsageDistribution=" + memoryUsageDistributionInfo();
}
}

private boolean shouldIncreaseMemoryRequirement(ErrorCode errorCode)
{
return isOutOfMemoryError(errorCode) || (memoryRequirementIncreaseOnWorkerCrashEnabled && isWorkerCrashAssociatedError(errorCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static io.trino.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_MISMATCH;
import static io.trino.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED;

public final class ErrorCodes
{
Expand All @@ -29,4 +33,12 @@ public static boolean isOutOfMemoryError(ErrorCode errorCode)
|| EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode().equals(errorCode)
|| CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode);
}

public static boolean isWorkerCrashAssociatedError(ErrorCode errorCode)
{
return TOO_MANY_REQUESTS_FAILED.toErrorCode().equals(errorCode)
|| REMOTE_HOST_GONE.toErrorCode().equals(errorCode)
|| REMOTE_TASK_MISMATCH.toErrorCode().equals(errorCode)
|| REMOTE_TASK_ERROR.toErrorCode().equals(errorCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class MemoryManagerConfig
private DataSize faultTolerantExecutionTaskRuntimeMemoryEstimationOverhead = DataSize.of(1, GIGABYTE);
private LowMemoryQueryKillerPolicy lowMemoryQueryKillerPolicy = LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private LowMemoryTaskKillerPolicy lowMemoryTaskKillerPolicy = LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = true;

/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
Expand Down Expand Up @@ -190,6 +192,19 @@ public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryEstimationQuantile
return this;
}

public boolean isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled()
{
return faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled;
}

@Config("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled")
@ConfigDescription("Increase memory requirement for tasks failed due to a suspected worker crash")
public MemoryManagerConfig setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled)
{
this.faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
killOnOutOfMemoryDelay = new Duration(0, MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize
nodeManager,
() -> workerMemoryInfos,
false,
false,
Duration.of(1, MINUTES),
taskRuntimeMemoryEstimationOverhead,
ticker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testEstimator()
nodeManager,
() -> ImmutableMap.of(new InternalNode("a-node", URI.create("local://blah"), NodeVersion.UNKNOWN, false).getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(DataSize.ofBytes(0)))),
false,
true,
Duration.of(1, MINUTES),
DataSize.ofBytes(0),
Ticker.systemTicker());
Expand Down Expand Up @@ -79,6 +80,14 @@ public void testEstimator()
StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode()))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
session,
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode()))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testDefaults()
.setFaultTolerantExecutionTaskMemory(DataSize.of(5, GIGABYTE))
.setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(1, GIGABYTE))
.setFaultTolerantExecutionTaskMemoryGrowthFactor(3.0)
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9));
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9)
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(true));
}

@Test
Expand All @@ -62,6 +63,7 @@ public void testExplicitPropertyMappings()
.put("fault-tolerant-execution-task-runtime-memory-estimation-overhead", "300MB")
.put("fault-tolerant-execution-task-memory-growth-factor", "17.3")
.put("fault-tolerant-execution-task-memory-estimation-quantile", "0.7")
.put("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled", "false")
.buildOrThrow();

MemoryManagerConfig expected = new MemoryManagerConfig()
Expand All @@ -74,7 +76,8 @@ public void testExplicitPropertyMappings()
.setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE))
.setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(300, MEGABYTE))
.setFaultTolerantExecutionTaskMemoryGrowthFactor(17.3)
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7);
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7)
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(false);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit dae2060

Please sign in to comment.