From d1a7c0f7625cb0409b7550a5fba732460e578105 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 5 Jan 2023 15:58:53 -0500 Subject: [PATCH] Increase memory requirement for tasks failed due to worker crash When a task fails due to a suspected worker crash this may indicate (with a high likelihood) a potential memory accounting problem. Increasing memory requirement would essentially decrease overall cluster load allowing those problematic tasks to finish with a higher chance. --- .../scheduler/BinPackingNodeAllocatorService.java | 14 ++++++++++++-- .../io/trino/execution/scheduler/ErrorCodes.java | 12 ++++++++++++ .../java/io/trino/memory/MemoryManagerConfig.java | 15 +++++++++++++++ .../scheduler/TestBinPackingNodeAllocator.java | 1 + ...ExponentialGrowthPartitionMemoryEstimator.java | 9 +++++++++ .../io/trino/memory/TestMemoryManagerConfig.java | 7 +++++-- 6 files changed, 54 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 49d01a341fd2..147ee4b6c438 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -75,6 +75,7 @@ import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError; +import static io.trino.execution.scheduler.ErrorCodes.isWorkerCrashAssociatedError; import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static java.lang.Math.max; import static java.lang.Thread.currentThread; @@ -100,6 +101,7 @@ public class BinPackingNodeAllocatorService private final AtomicReference> nodePoolMemoryInfos = new AtomicReference<>(ImmutableMap.of()); private final AtomicReference> maxNodePoolSize = new AtomicReference<>(Optional.empty()); private final boolean scheduleOnCoordinator; + private final boolean memoryRequirementIncreaseOnWorkerCrashEnabled; private final DataSize taskRuntimeMemoryEstimationOverhead; private final Ticker ticker; @@ -118,6 +120,7 @@ public BinPackingNodeAllocatorService( this(nodeManager, clusterMemoryManager::getWorkerMemoryInfo, nodeSchedulerConfig.isIncludeCoordinator(), + memoryManagerConfig.isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(), Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()), memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(), Ticker.systemTicker()); @@ -128,6 +131,7 @@ public BinPackingNodeAllocatorService( InternalNodeManager nodeManager, Supplier>> workerMemoryInfoSupplier, boolean scheduleOnCoordinator, + boolean memoryRequirementIncreaseOnWorkerCrashEnabled, Duration allowedNoMatchingNodePeriod, DataSize taskRuntimeMemoryEstimationOverhead, Ticker ticker) @@ -135,6 +139,7 @@ public BinPackingNodeAllocatorService( this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null"); this.scheduleOnCoordinator = scheduleOnCoordinator; + this.memoryRequirementIncreaseOnWorkerCrashEnabled = memoryRequirementIncreaseOnWorkerCrashEnabled; this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null"); this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null"); this.ticker = requireNonNull(ticker, "ticker is null"); @@ -642,7 +647,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory // start with the maximum of previously used memory and actual usage DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory); - if (isOutOfMemoryError(errorCode)) { + if (shouldIncreaseMemoryRequirement(errorCode)) { // multiply if we hit an oom error double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session); newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE); @@ -670,7 +675,7 @@ public synchronized void registerPartitionFinished(Session session, MemoryRequir if (success) { memoryUsageDistribution.add(peakMemoryUsage.toBytes()); } - if (!success && errorCode.isPresent() && isOutOfMemoryError(errorCode.get())) { + if (!success && errorCode.isPresent() && shouldIncreaseMemoryRequirement(errorCode.get())) { double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session); // take previousRequiredBytes into account when registering failure on oom. It is conservative hence safer (and in-line with getNextRetryMemoryRequirements) long previousRequiredBytes = previousMemoryRequirements.getRequiredMemory().toBytes(); @@ -710,4 +715,9 @@ public String toString() return "memoryUsageDistribution=" + memoryUsageDistributionInfo(); } } + + private boolean shouldIncreaseMemoryRequirement(ErrorCode errorCode) + { + return isOutOfMemoryError(errorCode) || (memoryRequirementIncreaseOnWorkerCrashEnabled && isWorkerCrashAssociatedError(errorCode)); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ErrorCodes.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ErrorCodes.java index a975db27621a..cd632b385082 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ErrorCodes.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ErrorCodes.java @@ -18,6 +18,10 @@ import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; +import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; +import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR; +import static io.trino.spi.StandardErrorCode.REMOTE_TASK_MISMATCH; +import static io.trino.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED; public final class ErrorCodes { @@ -29,4 +33,12 @@ public static boolean isOutOfMemoryError(ErrorCode errorCode) || EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode().equals(errorCode) || CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode); } + + public static boolean isWorkerCrashAssociatedError(ErrorCode errorCode) + { + return TOO_MANY_REQUESTS_FAILED.toErrorCode().equals(errorCode) + || REMOTE_HOST_GONE.toErrorCode().equals(errorCode) + || REMOTE_TASK_MISMATCH.toErrorCode().equals(errorCode) + || REMOTE_TASK_ERROR.toErrorCode().equals(errorCode); + } } diff --git a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java index 6da264a8d6be..f146a595c26c 100644 --- a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java @@ -45,6 +45,8 @@ public class MemoryManagerConfig private DataSize faultTolerantExecutionTaskRuntimeMemoryEstimationOverhead = DataSize.of(1, GIGABYTE); private LowMemoryQueryKillerPolicy lowMemoryQueryKillerPolicy = LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES; private LowMemoryTaskKillerPolicy lowMemoryTaskKillerPolicy = LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES; + private boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = true; + /** * default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}} */ @@ -190,6 +192,19 @@ public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryEstimationQuantile return this; } + public boolean isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled() + { + return faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled; + } + + @Config("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled") + @ConfigDescription("Increase memory requirement for tasks failed due to a suspected worker crash") + public MemoryManagerConfig setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled) + { + this.faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled; + return this; + } + public void applyFaultTolerantExecutionDefaults() { killOnOutOfMemoryDelay = new Duration(0, MINUTES); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index f0bdc03b48cd..36cf514f356a 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -101,6 +101,7 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize nodeManager, () -> workerMemoryInfos, false, + false, Duration.of(1, MINUTES), taskRuntimeMemoryEstimationOverhead, ticker); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java index a56bd7c0e6c1..7c4dc1b2a4e7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java @@ -51,6 +51,7 @@ public void testEstimator() nodeManager, () -> ImmutableMap.of(new InternalNode("a-node", URI.create("local://blah"), NodeVersion.UNKNOWN, false).getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(DataSize.ofBytes(0)))), false, + true, Duration.of(1, MINUTES), DataSize.ofBytes(0), Ticker.systemTicker()); @@ -79,6 +80,14 @@ public void testEstimator() StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode())) .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); + assertThat( + estimator.getNextRetryMemoryRequirements( + session, + new MemoryRequirements(DataSize.of(50, MEGABYTE)), + DataSize.of(10, MEGABYTE), + StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode())) + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); + assertThat( estimator.getNextRetryMemoryRequirements( session, diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java index 363f8d1b8cb6..a684c8b38a4f 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java @@ -45,7 +45,8 @@ public void testDefaults() .setFaultTolerantExecutionTaskMemory(DataSize.of(5, GIGABYTE)) .setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(1, GIGABYTE)) .setFaultTolerantExecutionTaskMemoryGrowthFactor(3.0) - .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9)); + .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9) + .setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(true)); } @Test @@ -62,6 +63,7 @@ public void testExplicitPropertyMappings() .put("fault-tolerant-execution-task-runtime-memory-estimation-overhead", "300MB") .put("fault-tolerant-execution-task-memory-growth-factor", "17.3") .put("fault-tolerant-execution-task-memory-estimation-quantile", "0.7") + .put("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled", "false") .buildOrThrow(); MemoryManagerConfig expected = new MemoryManagerConfig() @@ -74,7 +76,8 @@ public void testExplicitPropertyMappings() .setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE)) .setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(300, MEGABYTE)) .setFaultTolerantExecutionTaskMemoryGrowthFactor(17.3) - .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7); + .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7) + .setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(false); assertFullMapping(properties, expected); }