Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase memory requirement for tasks failed due to worker crash #15624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor;
import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError;
import static io.trino.execution.scheduler.ErrorCodes.isWorkerCrashAssociatedError;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
Expand All @@ -100,6 +101,7 @@ public class BinPackingNodeAllocatorService
private final AtomicReference<Map<String, MemoryPoolInfo>> nodePoolMemoryInfos = new AtomicReference<>(ImmutableMap.of());
private final AtomicReference<Optional<DataSize>> maxNodePoolSize = new AtomicReference<>(Optional.empty());
private final boolean scheduleOnCoordinator;
private final boolean memoryRequirementIncreaseOnWorkerCrashEnabled;
private final DataSize taskRuntimeMemoryEstimationOverhead;
private final Ticker ticker;

Expand All @@ -118,6 +120,7 @@ public BinPackingNodeAllocatorService(
this(nodeManager,
clusterMemoryManager::getWorkerMemoryInfo,
nodeSchedulerConfig.isIncludeCoordinator(),
memoryManagerConfig.isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(),
Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()),
memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(),
Ticker.systemTicker());
Expand All @@ -128,13 +131,15 @@ public BinPackingNodeAllocatorService(
InternalNodeManager nodeManager,
Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier,
boolean scheduleOnCoordinator,
boolean memoryRequirementIncreaseOnWorkerCrashEnabled,
Duration allowedNoMatchingNodePeriod,
DataSize taskRuntimeMemoryEstimationOverhead,
Ticker ticker)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;
this.memoryRequirementIncreaseOnWorkerCrashEnabled = memoryRequirementIncreaseOnWorkerCrashEnabled;
this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
this.ticker = requireNonNull(ticker, "ticker is null");
Expand Down Expand Up @@ -642,7 +647,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory

// start with the maximum of previously used memory and actual usage
DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory);
if (isOutOfMemoryError(errorCode)) {
if (shouldIncreaseMemoryRequirement(errorCode)) {
// multiply if we hit an oom error
double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE);
Expand Down Expand Up @@ -670,7 +675,7 @@ public synchronized void registerPartitionFinished(Session session, MemoryRequir
if (success) {
memoryUsageDistribution.add(peakMemoryUsage.toBytes());
}
if (!success && errorCode.isPresent() && isOutOfMemoryError(errorCode.get())) {
if (!success && errorCode.isPresent() && shouldIncreaseMemoryRequirement(errorCode.get())) {
double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
// take previousRequiredBytes into account when registering failure on oom. It is conservative hence safer (and in-line with getNextRetryMemoryRequirements)
long previousRequiredBytes = previousMemoryRequirements.getRequiredMemory().toBytes();
Expand Down Expand Up @@ -710,4 +715,9 @@ public String toString()
return "memoryUsageDistribution=" + memoryUsageDistributionInfo();
}
}

private boolean shouldIncreaseMemoryRequirement(ErrorCode errorCode)
{
return isOutOfMemoryError(errorCode) || (memoryRequirementIncreaseOnWorkerCrashEnabled && isWorkerCrashAssociatedError(errorCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static io.trino.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_MISMATCH;
import static io.trino.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED;

public final class ErrorCodes
{
Expand All @@ -29,4 +33,12 @@ public static boolean isOutOfMemoryError(ErrorCode errorCode)
|| EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode().equals(errorCode)
|| CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode);
}

public static boolean isWorkerCrashAssociatedError(ErrorCode errorCode)
{
return TOO_MANY_REQUESTS_FAILED.toErrorCode().equals(errorCode)
|| REMOTE_HOST_GONE.toErrorCode().equals(errorCode)
|| REMOTE_TASK_MISMATCH.toErrorCode().equals(errorCode)
|| REMOTE_TASK_ERROR.toErrorCode().equals(errorCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class MemoryManagerConfig
private DataSize faultTolerantExecutionTaskRuntimeMemoryEstimationOverhead = DataSize.of(1, GIGABYTE);
private LowMemoryQueryKillerPolicy lowMemoryQueryKillerPolicy = LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private LowMemoryTaskKillerPolicy lowMemoryTaskKillerPolicy = LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = true;

/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
Expand Down Expand Up @@ -190,6 +192,19 @@ public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryEstimationQuantile
return this;
}

public boolean isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled()
{
return faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled;
}

@Config("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled")
@ConfigDescription("Increase memory requirement for tasks failed due to a suspected worker crash")
public MemoryManagerConfig setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled)
{
this.faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled = faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
killOnOutOfMemoryDelay = new Duration(0, MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize
nodeManager,
() -> workerMemoryInfos,
false,
false,
Duration.of(1, MINUTES),
taskRuntimeMemoryEstimationOverhead,
ticker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testEstimator()
nodeManager,
() -> ImmutableMap.of(new InternalNode("a-node", URI.create("local://blah"), NodeVersion.UNKNOWN, false).getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(DataSize.ofBytes(0)))),
false,
true,
Duration.of(1, MINUTES),
DataSize.ofBytes(0),
Ticker.systemTicker());
Expand Down Expand Up @@ -79,6 +80,14 @@ public void testEstimator()
StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode()))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
session,
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode()))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testDefaults()
.setFaultTolerantExecutionTaskMemory(DataSize.of(5, GIGABYTE))
.setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(1, GIGABYTE))
.setFaultTolerantExecutionTaskMemoryGrowthFactor(3.0)
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9));
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9)
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(true));
}

@Test
Expand All @@ -62,6 +63,7 @@ public void testExplicitPropertyMappings()
.put("fault-tolerant-execution-task-runtime-memory-estimation-overhead", "300MB")
.put("fault-tolerant-execution-task-memory-growth-factor", "17.3")
.put("fault-tolerant-execution-task-memory-estimation-quantile", "0.7")
.put("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled", "false")
.buildOrThrow();

MemoryManagerConfig expected = new MemoryManagerConfig()
Expand All @@ -74,7 +76,8 @@ public void testExplicitPropertyMappings()
.setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE))
.setFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(DataSize.of(300, MEGABYTE))
.setFaultTolerantExecutionTaskMemoryGrowthFactor(17.3)
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7);
.setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7)
.setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(false);

assertFullMapping(properties, expected);
}
Expand Down