From 4435305e36f23da43dc6a0e0a2d264f4ef3f9fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Sun, 26 Nov 2023 13:24:08 +0100 Subject: [PATCH 1/7] Add AssignmentResult.isEmpty() --- .../EventDrivenFaultTolerantQueryScheduler.java | 3 +++ .../execution/scheduler/faulttolerant/SplitAssigner.java | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 069b0e638283..25103d51e909 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1414,6 +1414,9 @@ private void loadMoreTaskDescriptorsIfNecessary() @Override public void onSuccess(AssignmentResult result) { + // We need to process even empty events here so stageExecution.taskDescriptorLoadingComplete() + // is called in event handler. Otherwise, IdempotentSplitSource may be not called again + // if there is no other SplitAssignmentEvent for this stage in queue. eventQueue.add(new SplitAssignmentEvent(stageExecution.getStageId(), result)); } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java index 161bf7e01642..6fbeb6884d98 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java @@ -75,6 +75,11 @@ record AssignmentResult( partitionUpdates = ImmutableList.copyOf(requireNonNull(partitionUpdates, "partitionUpdates is null")); } + boolean isEmpty() + { + return partitionsAdded.isEmpty() && !noMorePartitions && partitionUpdates.isEmpty() && sealedPartitions.isEmpty(); + } + public static AssignmentResult.Builder builder() { return new AssignmentResult.Builder(); From 8f14712c57ca4fdc141744f627c01e4bc963ba7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Sat, 25 Nov 2023 16:08:56 +0100 Subject: [PATCH 2/7] Add return type for EventDrivenFaultTolerantQueryScheduler.EventListener --- ...ventDrivenFaultTolerantQueryScheduler.java | 76 +++++++++++-------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 25103d51e909..1f3b318f8b98 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -495,7 +495,7 @@ public void failTaskRemotely(TaskId taskId, Throwable failureCause) } private static class Scheduler - implements EventListener + implements EventListener { private static final int EVENT_BUFFER_CAPACITY = 100; @@ -1343,7 +1343,7 @@ private void updateMemoryRequirements() } @Override - public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) + public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) { ScheduledTask scheduledTask = new ScheduledTask(sinkInstanceHandleAcquiredEvent.getStageId(), sinkInstanceHandleAcquiredEvent.getPartitionId()); PreSchedulingTaskContext context = preSchedulingTaskContexts.remove(scheduledTask); @@ -1376,6 +1376,7 @@ public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns if (remoteTask.isEmpty()) { nodeLease.release(); } + return null; } private StateChangeListener createExchangeSinkInstanceHandleUpdateRequiredListener() @@ -1437,7 +1438,7 @@ public void abort() } @Override - public void onRemoteTaskCompleted(RemoteTaskCompletedEvent event) + public Void onRemoteTaskCompleted(RemoteTaskCompletedEvent event) { TaskStatus taskStatus = event.getTaskStatus(); TaskId taskId = taskStatus.getTaskId(); @@ -1466,26 +1467,29 @@ else if (taskState == TaskState.FAILED) { for (StageId consumerStageId : stageConsumers.get(stageExecution.getStageId())) { getStageExecution(consumerStageId).setSourceOutputSelector(stageExecution.getStageFragmentId(), outputSelector); } + return null; } @Override - public void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) + public Void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) { TaskId taskId = event.getTaskStatus().getTaskId(); StageExecution stageExecution = getStageExecution(taskId.getStageId()); stageExecution.initializeUpdateOfExchangeSinkInstanceHandle(taskId, eventQueue); + return null; } @Override - public void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + public Void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) { TaskId taskId = event.getTaskId(); StageExecution stageExecution = getStageExecution(taskId.getStageId()); stageExecution.finalizeUpdateOfExchangeSinkInstanceHandle(taskId, event.getExchangeSinkInstanceHandle()); + return null; } @Override - public void onSplitAssignment(SplitAssignmentEvent event) + public Void onSplitAssignment(SplitAssignmentEvent event) { StageExecution stageExecution = getStageExecution(event.getStageId()); AssignmentResult assignment = event.getAssignmentResult(); @@ -1519,13 +1523,15 @@ public void onSplitAssignment(SplitAssignmentEvent event) stageExecution.noMorePartitions(); } stageExecution.taskDescriptorLoadingComplete(); + return null; } @Override - public void onStageFailure(StageFailureEvent event) + public Void onStageFailure(StageFailureEvent event) { StageExecution stageExecution = getStageExecution(event.getStageId()); stageExecution.fail(event.getFailure()); + return null; } private StageExecution getStageExecution(StageId stageId) @@ -2643,30 +2649,38 @@ public long getRemainingDelayInMillis() private interface Event { - Event ABORT = listener -> { - throw new UnsupportedOperationException(); + Event ABORT = new Event() { + @Override + public T accept(EventListener listener) + { + throw new UnsupportedOperationException(); + } }; - Event WAKE_UP = listener -> { - throw new UnsupportedOperationException(); + Event WAKE_UP = new Event() { + @Override + public T accept(EventListener listener) + { + throw new UnsupportedOperationException(); + } }; - void accept(EventListener listener); + T accept(EventListener listener); } - private interface EventListener + private interface EventListener { - void onRemoteTaskCompleted(RemoteTaskCompletedEvent event); + T onRemoteTaskCompleted(RemoteTaskCompletedEvent event); - void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event); + T onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event); - void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event); + T onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event); - void onSplitAssignment(SplitAssignmentEvent event); + T onSplitAssignment(SplitAssignmentEvent event); - void onStageFailure(StageFailureEvent event); + T onStageFailure(StageFailureEvent event); - void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent); + T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent); } private static class SinkInstanceHandleAcquiredEvent @@ -2713,9 +2727,9 @@ public ExchangeSinkInstanceHandle getSinkInstanceHandle() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onSinkInstanceHandleAcquired(this); + return listener.onSinkInstanceHandleAcquired(this); } } @@ -2728,9 +2742,9 @@ public RemoteTaskCompletedEvent(TaskStatus taskStatus) } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskCompleted(this); + return listener.onRemoteTaskCompleted(this); } } @@ -2743,9 +2757,9 @@ protected RemoteTaskExchangeSinkUpdateRequiredEvent(TaskStatus taskStatus) } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskExchangeSinkUpdateRequired(this); + return listener.onRemoteTaskExchangeSinkUpdateRequired(this); } } @@ -2762,9 +2776,9 @@ private RemoteTaskExchangeUpdatedSinkAcquired(TaskId taskId, ExchangeSinkInstanc } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskExchangeUpdatedSinkAcquired(this); + return listener.onRemoteTaskExchangeUpdatedSinkAcquired(this); } public TaskId getTaskId() @@ -2811,9 +2825,9 @@ public AssignmentResult getAssignmentResult() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onSplitAssignment(this); + return listener.onSplitAssignment(this); } } @@ -2834,9 +2848,9 @@ public Throwable getFailure() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onStageFailure(this); + return listener.onStageFailure(this); } } From 4a03d7ddd4cf733f265c8b48e3376e073f64e02d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Sat, 25 Nov 2023 16:25:27 +0100 Subject: [PATCH 3/7] Add intermediate methods to EventListener Also add default implementations for EventListener methods which delegate to appropriate intermediate method according to class hierachy. --- ...ventDrivenFaultTolerantQueryScheduler.java | 45 ++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 1f3b318f8b98..9a6c7ffe91ae 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -2670,17 +2670,50 @@ public T accept(EventListener listener) private interface EventListener { - T onRemoteTaskCompleted(RemoteTaskCompletedEvent event); + default T onRemoteTaskCompleted(RemoteTaskCompletedEvent event) + { + return onRemoteTaskEvent(event); + } - T onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event); + default T onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) + { + return onRemoteTaskEvent(event); + } - T onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event); + default T onRemoteTaskEvent(RemoteTaskEvent event) + { + return onEvent(event); + } - T onSplitAssignment(SplitAssignmentEvent event); + default T onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + { + return onEvent(event); + } - T onStageFailure(StageFailureEvent event); + default T onSplitAssignment(SplitAssignmentEvent event) + { + return onStageEvent(event); + } - T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent); + default T onStageFailure(StageFailureEvent event) + { + return onStageEvent(event); + } + + default T onStageEvent(StageEvent event) + { + return onEvent(event); + } + + default T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent event) + { + return onEvent(event); + } + + default T onEvent(Event event) + { + throw new RuntimeException("EventListener no implemented"); + } } private static class SinkInstanceHandleAcquiredEvent From e78ae3c4f15c8908e69bbd07d1101d9005cc1dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Sun, 26 Nov 2023 13:32:34 +0100 Subject: [PATCH 4/7] Introduce constant --- .../faulttolerant/EventDrivenFaultTolerantQueryScheduler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 9a6c7ffe91ae..45a38ee576a6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -498,6 +498,7 @@ private static class Scheduler implements EventListener { private static final int EVENT_BUFFER_CAPACITY = 100; + private static final long EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS = new Duration(1, MINUTES).toMillis(); private final QueryStateMachine queryStateMachine; private final Metadata metadata; @@ -675,7 +676,7 @@ private Optional closeAndAddSuppressed(Optional existingFa private boolean processEvents() { try { - Event event = eventQueue.poll(1, MINUTES); + Event event = eventQueue.poll(EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS, MILLISECONDS); if (event == null) { return true; } From af77fa9ec5f84cfd61093b32db82c27869d014fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 23 Nov 2023 17:51:37 +0100 Subject: [PATCH 5/7] Add diagnostics for stuck FTE scheduler Add code which will dump log debug information in case FTE scheduler is not getting any events for 10 minutes. This is to track rare bug where we observe queries running with retry_policy set to FALSE stuck sometimes. --- .../java/io/trino/execution/SqlStage.java | 14 +- .../buffer/SpoolingOutputBuffers.java | 10 + .../ArbitraryDistributionSplitAssigner.java | 38 ++ .../BinPackingNodeAllocatorService.java | 13 + ...ventDrivenFaultTolerantQueryScheduler.java | 393 +++++++++++++++++- .../faulttolerant/EventDrivenTaskSource.java | 41 ++ .../FaultTolerantPartitioningScheme.java | 12 + .../HashDistributionSplitAssigner.java | 33 ++ .../SingleDistributionSplitAssigner.java | 12 + .../io/trino/split/BufferingSplitSource.java | 10 + .../io/trino/split/TracingSplitSource.java | 9 + .../TestExchangeSourceOutputSelector.java | 14 + .../trino/spi/exchange/ExchangeContext.java | 11 + .../ExchangeSourceOutputSelector.java | 56 ++- .../filesystem/FileSystemExchange.java | 21 + .../FileSystemExchangeSinkInstanceHandle.java | 12 + 16 files changed, 689 insertions(+), 10 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java index 0d8a146ee925..b1a61d9a7582 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.isEnableCoordinatorDynamicFiltersDistribution; @@ -329,9 +330,18 @@ private void checkAllTaskFinal() } @Override - public String toString() + // for debugging + public synchronized String toString() { - return stateMachine.toString(); + return toStringHelper(this) + .add("stateMachine", stateMachine) + .add("summarizeTaskInfo", summarizeTaskInfo) + .add("outboundDynamicFilterIds", outboundDynamicFilterIds) + .add("tasks", tasks) + .add("allTasks", allTasks) + .add("finishedTasks", finishedTasks) + .add("tasksWithFinalInfo", tasksWithFinalInfo) + .toString(); } private class MemoryUsageListener diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java index 21244b328649..fa2cd5bebc27 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -76,4 +77,13 @@ public SpoolingOutputBuffers withExchangeSinkInstanceHandle(ExchangeSinkInstance { return new SpoolingOutputBuffers(getVersion() + 1, handle, outputPartitionCount); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("exchangeSinkInstanceHandle", exchangeSinkInstanceHandle) + .add("outputPartitionCount", outputPartitionCount) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java index 89d70cf94cbe..e875afba7845 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -314,6 +315,32 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List getHostRequirement(Split split) { if (split.getConnectorSplit().isRemotelyAccessible()) { @@ -396,5 +423,16 @@ public void setFull(boolean full) { this.full = full; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("partitionId", partitionId) + .add("assignedDataSizeInBytes", assignedDataSizeInBytes) + .add("assignedSplitCount", assignedSplitCount) + .add("full", full) + .toString(); + } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java index c77fab6592cf..d6757fbce2d2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java @@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -459,6 +460,18 @@ public void release() throw new IllegalStateException("Node " + node + " already released"); } } + + @Override + public String toString() + { + return toStringHelper(this) + .add("node", node) + .add("released", released) + .add("memoryLease", memoryLease) + .add("taskId", taskId) + .add("executionClass", executionClass) + .toString(); + } } private static class BinPackingSimulation diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 45a38ee576a6..2976a9be3a00 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; @@ -108,6 +109,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -129,6 +131,7 @@ import java.util.function.Function; import java.util.function.IntConsumer; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -181,6 +184,7 @@ import static java.lang.Math.round; import static java.lang.Math.toIntExact; import static java.lang.String.format; +import static java.util.Map.Entry.comparingByKey; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -492,13 +496,163 @@ public void failTaskRemotely(TaskId taskId, Throwable failureCause) SqlStage sqlStage = requireNonNull(stages.get(taskId.getStageId()), () -> "stage not found: %s" + taskId.getStageId()); sqlStage.failTaskRemotely(taskId, failureCause); } + + public void logDebugInfo() + { + if (!log.isDebugEnabled()) { + return; + } + log.debug("SqlStages:"); + stages.forEach((stageId, stage) -> { + log.debug("SqlStage %s: %s", stageId, stage); + }); + } + } + + private static class EventDebugInfos + { + private static final String GLOBAL_EVENTS_BUCKET = "GLOBAL"; + private static final EventListener GET_BUCKET_LISTENER = new EventListener<>() + { + @Override + public String onRemoteTaskEvent(RemoteTaskEvent event) + { + return "task_" + event.getTaskStatus().getTaskId().getStageId().toString(); + } + + @Override + public String onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + { + return "task_" + event.getTaskId().getStageId().toString(); + } + + @Override + public String onStageEvent(StageEvent event) + { + return event.getStageId().toString(); + } + + @Override + public String onEvent(Event event) + { + return GLOBAL_EVENTS_BUCKET; + } + }; + + private final String queryId; + private final int eventsPerBucket; + private long eventsCounter; + private long filteredEventsCounter; + + // Using SoftReference to prevent OOM in an unexpected case when this collection grow to substantial size + // and VM is short on memory. + private SoftReference> eventsDebugInfosReference; + + private EventDebugInfos(String queryId, int eventsPerBucket) + { + this.queryId = requireNonNull(queryId, "queryId is null"); + this.eventsPerBucket = eventsPerBucket; + eventsDebugInfosReference = new SoftReference<>(LinkedListMultimap.create()); + } + + /** + * @return true if event was recorded; false if it was filtered out + */ + private boolean add(Event event) + { + ListMultimap eventsDebugInfos = getEventsDebugInfos(); + String bucket = getBucket(event); + Optional debugInfo = getFullDebugInfo(eventsCounter, event); + eventsCounter++; + if (debugInfo.isEmpty()) { + filteredEventsCounter++; + return false; + } + + List bucketDebugInfos = eventsDebugInfos.get(bucket); + bucketDebugInfos.add(debugInfo.get()); + if (bucketDebugInfos.size() > eventsPerBucket) { + Iterator iterator = bucketDebugInfos.iterator(); + iterator.next(); + iterator.remove(); + } + return true; + } + + private ListMultimap getEventsDebugInfos() + { + ListMultimap eventsDebugInfos = eventsDebugInfosReference.get(); + if (eventsDebugInfos == null) { + log.debug("eventsDebugInfos for %s has been cleared", queryId); + eventsDebugInfos = LinkedListMultimap.create(); + eventsDebugInfosReference = new SoftReference<>(eventsDebugInfos); + } + return eventsDebugInfos; + } + + private String getBucket(Event event) + { + if (event == Event.WAKE_UP || event == Event.ABORT) { + return GLOBAL_EVENTS_BUCKET; + } + return event.accept(GET_BUCKET_LISTENER); + } + + private Optional getFullDebugInfo(long eventId, Event event) + { + return getEventDebugInfo(event).map(info -> "[" + eventId + "/" + System.currentTimeMillis() + "/" + info + "]"); + } + + private static Optional getEventDebugInfo(Event event) + { + if (event == Event.WAKE_UP) { + return Optional.of("WAKE_UP"); + } + if (event == Event.ABORT) { + return Optional.of("ABORT"); + } + if (event instanceof SplitAssignmentEvent splitAssignmentEvent) { + if (splitAssignmentEvent.getAssignmentResult().isEmpty()) { + // There may be significant amount of empty AssignmentResults so lets skip processing of those. + // It could be that scheduler loop is not really stuck per se. But we are getting empty events all the time - and it just looks like stuck. + // We need to notice that, and still log debug information. + // Also empty events can push important events out of recorded debug information - making debug logs less useful. + return Optional.empty(); + } + } + + return Optional.of(event.toString()); + } + + public void log() + { + if (!log.isDebugEnabled()) { + return; + } + ListMultimap eventsDebugInfos = getEventsDebugInfos(); + eventsDebugInfos.asMap().entrySet().stream() + .sorted(comparingByKey()) + .forEachOrdered(entry -> { + log.debug("Recent events for " + entry.getKey()); + for (String eventDebugInfo : entry.getValue()) { + // logging events in separate log events as some events may be huge and otherwise rarely we could hit logging framework constraints + log.debug(" " + eventDebugInfo); + } + }); + log.debug("Filtered events count " + filteredEventsCounter); + } } private static class Scheduler implements EventListener { private static final int EVENT_BUFFER_CAPACITY = 100; - private static final long EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS = new Duration(1, MINUTES).toMillis(); + private static final long EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS = MINUTES.toMillis(1); + // If scheduler is stalled for SCHEDULER_STALLED_DURATION_THRESHOLD debug log will be emitted. + // This value must be larger than EVENT_PROCESSING_ENFORCED_FREQUENCY as prerequiste for processing is + // that there are no events in the event queue. + private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = new Duration(5, MINUTES).toMillis(); + private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10; private final QueryStateMachine queryStateMachine; private final Metadata metadata; @@ -531,6 +685,8 @@ private static class Scheduler private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); private final List eventBuffer = new ArrayList<>(EVENT_BUFFER_CAPACITY); + private final Stopwatch eventDebugInfoStopwatch = Stopwatch.createUnstarted(); + private final Optional eventDebugInfos; private boolean started; private boolean runtimeAdaptivePartitioningApplied; @@ -614,7 +770,15 @@ public Scheduler( this.runtimeAdaptivePartitioningMaxTaskSizeInBytes = requireNonNull(runtimeAdaptivePartitioningMaxTaskSize, "runtimeAdaptivePartitioningMaxTaskSize is null").toBytes(); this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled; + if (log.isDebugEnabled()) { + eventDebugInfos = Optional.of(new EventDebugInfos(queryStateMachine.getQueryId().toString(), EVENTS_DEBUG_INFOS_PER_BUCKET)); + } + else { + eventDebugInfos = Optional.empty(); + } + planInTopologicalOrder = sortPlanInTopologicalOrder(plan); + eventDebugInfoStopwatch.start(); } public void run() @@ -677,33 +841,107 @@ private boolean processEvents() { try { Event event = eventQueue.poll(EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS, MILLISECONDS); - if (event == null) { - return true; + if (event != null) { + eventBuffer.add(event); } - eventBuffer.add(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } - while (true) { + boolean eventDebugInfoRecorded = false; + boolean aborted = false; + while (!aborted) { // poll multiple events from the queue in one shot to improve efficiency eventQueue.drainTo(eventBuffer, EVENT_BUFFER_CAPACITY - eventBuffer.size()); if (eventBuffer.isEmpty()) { - return true; + break; } + for (Event e : eventBuffer) { + eventDebugInfoRecorded |= recordEventsDebugInfo(e); if (e == Event.ABORT) { - return false; + aborted = true; + break; } if (e == Event.WAKE_UP) { continue; } e.accept(this); } + eventBuffer.clear(); } + + if (eventDebugInfoRecorded) { + // mark that we processed some events; we filter out some no-op events. + // If only no-op events appear in event queue we still treat scheduler as stuck + eventDebugInfoStopwatch.reset().start(); + } + else { + // if no events were recorded there is a chance scheduler is stalled + if (log.isDebugEnabled() && eventDebugInfoStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS) { + logDebugInfoSafe("Scheduler stalled for %s".formatted(eventDebugInfoStopwatch.elapsed())); + eventDebugInfoStopwatch.reset().start(); // reset to prevent extensive logging + } + } + + return !aborted; + } + + private boolean recordEventsDebugInfo(Event event) + { + if (eventDebugInfos.isEmpty()) { + return false; + } + return eventDebugInfos.orElseThrow().add(event); + } + + private void logDebugInfoSafe(String reason) + { + try { + logDebugInfo(reason); + } + catch (Throwable e) { + log.error(e, "Unexpected error while logging debug info for %s", reason); + } + } + + private void logDebugInfo(String reason) + { + if (!log.isDebugEnabled()) { + return; + } + + log.debug("Scheduler debug info for %s START; reason=%s", queryStateMachine.getQueryId(), reason); + log.debug("General state: %s", toStringHelper(this) + .add("maxTaskExecutionAttempts", maxTaskExecutionAttempts) + .add("maxTasksWaitingForNode", maxTasksWaitingForNode) + .add("maxTasksWaitingForExecution", maxTasksWaitingForExecution) + .add("maxPartitionCount", maxPartitionCount) + .add("runtimeAdaptivePartitioningEnabled", runtimeAdaptivePartitioningEnabled) + .add("runtimeAdaptivePartitioningPartitionCount", runtimeAdaptivePartitioningPartitionCount) + .add("runtimeAdaptivePartitioningMaxTaskSizeInBytes", runtimeAdaptivePartitioningMaxTaskSizeInBytes) + .add("stageEstimationForEagerParentEnabled", stageEstimationForEagerParentEnabled) + .add("started", started) + .add("runtimeAdaptivePartitioningApplied", runtimeAdaptivePartitioningApplied) + .add("nextSchedulingPriority", nextSchedulingPriority) + .add("preSchedulingTaskContexts", preSchedulingTaskContexts) + .add("schedulingDelayer", schedulingDelayer) + .add("queryOutputSet", queryOutputSet) + .toString()); + + stageRegistry.logDebugInfo(); + + log.debug("StageExecutions:"); + stageExecutions.forEach((stageId, stageExecution) -> { + stageExecution.logDebugInfo(); + }); + + eventDebugInfos.ifPresent(EventDebugInfos::log); + + log.debug("Scheduler debug info for %s END", queryStateMachine.getQueryId()); } private boolean schedule() @@ -2193,6 +2431,40 @@ public FaultTolerantPartitioningScheme getSinkPartitioningScheme() { return sinkPartitioningScheme; } + + public void logDebugInfo() + { + if (!log.isDebugEnabled()) { + return; + } + + log.debug("StageExecution %s: %s", + stage.getStageId(), + toStringHelper(this) + .add("taskDescriptorStorage.getReservedBytes()", taskDescriptorStorage.getReservedBytes()) + .add("taskSource", taskSource.getDebugInfo()) + .add("sinkPartitioningScheme", sinkPartitioningScheme) + .add("exchange", exchange) + .add("schedulingPriority", schedulingPriority) + .add("eager", eager) + .add("outputDataSize", outputDataSize) + .add("noMorePartitions", noMorePartitions) + .add("runningPartitions", runningPartitions) + .add("remainingPartitions", remainingPartitions) + .add("sinkOutputSelectorBuilder", sinkOutputSelectorBuilder == null ? null : sinkOutputSelectorBuilder.build()) + .add("finalSinkOutputSelector", finalSinkOutputSelector) + .add("remoteSourceIds", remoteSourceIds) + .add("remoteSources", remoteSources) + .add("sourceOutputSelectors", sourceOutputSelectors) + .add("taskDescriptorLoadingActive", taskDescriptorLoadingActive) + .add("exchangeClosed", exchangeClosed) + .add("initialMemoryRequirements", initialMemoryRequirements) + .toString()); + + partitions.forEach((partitionId, stagePartition) -> { + log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo()); + }); + } } private static class StagePartition @@ -2447,6 +2719,28 @@ public boolean isFinished() { return finished; } + + public String getDebugInfo() + { + return toStringHelper(this) + .add("stageId", stageId) + .add("partitionId", partitionId) + .add("exchangeSinkHandle", exchangeSinkHandle) + .add("remoteSourceIds", remoteSourceIds) + .add("openTaskDescriptor", openTaskDescriptor) + .add("memoryRequirements", memoryRequirements) + .add("failureObserved", failureObserved) + .add("remainingAttempts", remainingAttempts) + .add("tasks", tasks) + .add("taskOutputBuffers", taskOutputBuffers) + .add("runningTasks", runningTasks) + .add("taskNodeLeases", taskNodeLeases) + .add("finalSelectors", finalSelectors) + .add("noMoreSplits", noMoreSplits) + .add("taskScheduled", taskScheduled) + .add("finished", finished) + .toString(); + } } private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector selector) @@ -2646,6 +2940,18 @@ public long getRemainingDelayInMillis() } return 0; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("minRetryDelayInMillis", minRetryDelayInMillis) + .add("maxRetryDelayInMillis", maxRetryDelayInMillis) + .add("retryDelayScaleFactor", retryDelayScaleFactor) + .add("stopwatch", stopwatch) + .add("currentDelayInMillis", currentDelayInMillis) + .toString(); + } } private interface Event @@ -2765,6 +3071,18 @@ public T accept(EventListener listener) { return listener.onSinkInstanceHandleAcquired(this); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("stageId", stageId) + .add("partitionId", partitionId) + .add("nodeLease", nodeLease) + .add("attempt", attempt) + .add("sinkInstanceHandle", sinkInstanceHandle) + .toString(); + } } private static class RemoteTaskCompletedEvent @@ -2780,6 +3098,14 @@ public T accept(EventListener listener) { return listener.onRemoteTaskCompleted(this); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", getTaskStatus()) + .toString(); + } } private static class RemoteTaskExchangeSinkUpdateRequiredEvent @@ -2795,6 +3121,14 @@ public T accept(EventListener listener) { return listener.onRemoteTaskExchangeSinkUpdateRequired(this); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", getTaskStatus()) + .toString(); + } } private static class RemoteTaskExchangeUpdatedSinkAcquired @@ -2824,6 +3158,15 @@ public ExchangeSinkInstanceHandle getExchangeSinkInstanceHandle() { return exchangeSinkInstanceHandle; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskId", taskId) + .add("exchangeSinkInstanceHandle", exchangeSinkInstanceHandle) + .toString(); + } } private abstract static class RemoteTaskEvent @@ -2840,6 +3183,14 @@ public TaskStatus getTaskStatus() { return taskStatus; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", taskStatus) + .toString(); + } } private static class SplitAssignmentEvent @@ -2863,6 +3214,15 @@ public T accept(EventListener listener) { return listener.onSplitAssignment(this); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("stageId", getStageId()) + .add("assignmentResult", assignmentResult) + .toString(); + } } private static class StageFailureEvent @@ -2886,6 +3246,15 @@ public T accept(EventListener listener) { return listener.onStageFailure(this); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("stageId", getStageId()) + .add("failure", failure) + .toString(); + } } private abstract static class StageEvent @@ -2949,5 +3318,15 @@ public void setWaitingForSinkInstanceHandle(boolean waitingForSinkInstanceHandle { this.waitingForSinkInstanceHandle = waitingForSinkInstanceHandle; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("nodeLease", nodeLease) + .add("executionClass", executionClass) + .add("waitingForSinkInstanceHandle", waitingForSinkInstanceHandle) + .toString(); + } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java index d6e6d74582f8..0d21d1a24f7d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java @@ -56,10 +56,12 @@ import java.util.function.LongConsumer; import java.util.function.Supplier; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; +import static com.google.common.collect.Maps.transformValues; import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.toListenableFuture; @@ -221,6 +223,23 @@ public synchronized void close() } } + public String getDebugInfo() + { + return toStringHelper(this) + .add("sourceExchanges", transformValues(sourceExchanges, Exchange::getId)) + .add("remoteSources", remoteSources) + .add("assigner", assigner) + .add("splitBatchSize", splitBatchSize) + .add("targetExchangeSplitSizeInBytes", targetExchangeSplitSizeInBytes) + .add("sourcePartitioningScheme", sourcePartitioningScheme) + .add("initialized", initialized) + .add("splitSources", splitSources) + .add("completedFragments", completedFragments) + .add("future", future) + .add("closed", closed) + .toString(); + } + private static class IdempotentSplitSource implements Closeable { @@ -296,6 +315,19 @@ private synchronized void advance(boolean lastBatch) future = Optional.empty(); } + @Override + public synchronized String toString() + { + return toStringHelper(this) + .add("planNodeId", planNodeId) + .add("sourceFragmentId", sourceFragmentId) + .add("splitSource", splitSource) + .add("splitBatchSize", splitBatchSize) + .add("closed", closed) + .add("finished", finished) + .toString(); + } + public class SplitBatchReference { private final SplitBatch splitBatch; @@ -404,6 +436,15 @@ public Optional> getTableExecuteSplitsInfo() { return Optional.empty(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("handleSource", handleSource) + .add("targetSplitSizeInBytes", targetSplitSizeInBytes) + .toString(); + } } /** diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java index d64cae61d71f..9f4ff0d00b7a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.function.ToIntFunction; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -103,4 +104,15 @@ public FaultTolerantPartitioningScheme withPartitionCount(int partitionCount) this.splitToBucketFunction, this.partitionToNodeMap); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("partitionCount", partitionCount) + .add("bucketToPartitionMap", bucketToPartitionMap.isPresent() ? "present" : "empty") + .add("splitToBucketFunction", splitToBucketFunction.isPresent() ? "present" : "empty") + .add("partitionToNodeMap", partitionToNodeMap) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java index db0e693e0fd6..fe424f9ce050 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java @@ -42,6 +42,7 @@ import java.util.function.Predicate; import java.util.stream.IntStream; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -353,6 +354,16 @@ public Optional getSplitBy() { return splitBy; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("subPartitions", subPartitions) + .add("splitBy", splitBy) + .add("nextSubPartition", nextSubPartition) + .toString(); + } } @VisibleForTesting @@ -376,6 +387,12 @@ public int getId() checkState(id.isPresent(), "id is expected to be assigned"); return id.getAsInt(); } + + @Override + public String toString() + { + return id.toString(); + } } private static boolean isWriteFragment(PlanFragment fragment) @@ -402,4 +419,20 @@ public Boolean visitTableWriter(TableWriterNode node, Void context) return fragment.getRoot().accept(visitor, null); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("catalogRequirement", catalogRequirement) + .add("replicatedSources", replicatedSources) + .add("allSources", allSources) + .add("sourcePartitioningScheme", sourcePartitioningScheme) + .add("sourcePartitionToTaskPartition", sourcePartitionToTaskPartition) + .add("createdTaskPartitions", createdTaskPartitions) + .add("completedSources", completedSources) + .add("replicatedSplits.size()", replicatedSplits.size()) + .add("allTaskPartitionsCreated", allTaskPartitionsCreated) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java index 3f1801c382b6..a9854e692ab3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -88,4 +89,15 @@ public AssignmentResult finish() } return result.build(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("hostRequirement", hostRequirement) + .add("allSources", allSources) + .add("partitionAdded", partitionAdded) + .add("completedSources", completedSources) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java index da721ce70660..d7cc2611792c 100644 --- a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.concurrent.Executor; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.addCallback; @@ -78,6 +79,15 @@ public Optional> getTableExecuteSplitsInfo() return source.getTableExecuteSplitsInfo(); } + @Override + public String toString() + { + return toStringHelper(this) + .add("bufferSize", bufferSize) + .add("source", source) + .toString(); + } + private static class GetNextBatch extends AbstractFuture { diff --git a/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java index 8df2cc09b5b1..349487592caf 100644 --- a/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Optional; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; @@ -105,4 +106,12 @@ public Optional> getTableExecuteSplitsInfo() { return source.getTableExecuteSplitsInfo(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("source", source) + .toString(); + } } diff --git a/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java index 4affb429c24e..9161b2d4f5c9 100644 --- a/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java +++ b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java @@ -195,6 +195,20 @@ public void testIncompatibleTransitions() .hasMessage("decision for partition 0 is already made: 0"); } + @Test + public void testToString() + { + ExchangeSourceOutputSelector selector = ExchangeSourceOutputSelector.builder(ImmutableSet.of(EXCHANGE_ID_1, EXCHANGE_ID_2)) + .include(EXCHANGE_ID_1, 0, 1) + .exclude(EXCHANGE_ID_1, 1) + .include(EXCHANGE_ID_1, 2, 0) + .exclude(EXCHANGE_ID_2, 3) + .setPartitionCount(EXCHANGE_ID_2, 4) + .build(); + + assertThat(selector).hasToString("ExchangeSourceOutputSelector[version=0, values={exchange_1=[0=1,1=E,2=0], exchange_2=[0=U,1=U,2=U,3=E]}, finalSelector=false]"); + } + private ExchangeSourceOutputSelector serializeDeserialize(ExchangeSourceOutputSelector selector) { return codec.fromJson(codec.toJson(selector)); diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java index 2f3a1604d64f..6143746a0b47 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java @@ -16,6 +16,8 @@ import io.trino.spi.Experimental; import io.trino.spi.QueryId; +import java.util.StringJoiner; + import static java.util.Objects.requireNonNull; @Experimental(eta = "2023-09-01") @@ -39,4 +41,13 @@ public ExchangeId getExchangeId() { return exchangeId; } + + @Override + public String toString() + { + return new StringJoiner(", ", ExchangeContext.class.getSimpleName() + "[", "]") + .add("queryId=" + queryId) + .add("exchangeId=" + exchangeId) + .toString(); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java index 6b7c0067a54a..14ee2c3870a0 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.BasicSliceInput; import io.airlift.slice.SizeOf; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -22,7 +23,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.StringJoiner; +import java.util.TreeMap; import java.util.function.Function; import static io.airlift.slice.SizeOf.instanceSize; @@ -31,6 +35,7 @@ import static io.trino.spi.exchange.ExchangeSourceOutputSelector.Selection.UNKNOWN; import static java.lang.Math.max; import static java.util.Arrays.fill; +import static java.util.Map.entry; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; @@ -157,6 +162,55 @@ public ExchangeSourceOutputSelector merge(ExchangeSourceOutputSelector other) this.finalSelector && other.finalSelector); } + @Override + public String toString() + { + return new StringJoiner(", ", ExchangeSourceOutputSelector.class.getSimpleName() + "[", "]") + .add("version=" + version) + .add("values=" + values.entrySet().stream() + .map(e -> entry(e.getKey().toString(), valuesSliceToString(e.getValue()))) + // collect to TreeMap to ensure ordering of keys + .collect(toMap( + Entry::getKey, + Entry::getValue, + (a, b) -> { throw new IllegalArgumentException("got duplicate key " + a + ", " + b); }, + TreeMap::new))) + .add("finalSelector=" + finalSelector) + .toString(); + } + + private String valuesSliceToString(Slice values) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + try (BasicSliceInput input = new BasicSliceInput(values)) { + int taskPartitionId = 0; + while (true) { + int value = input.read(); + if (value == -1) { + break; + } + if (taskPartitionId != 0) { + builder.append(","); + } + builder.append(taskPartitionId); + builder.append("="); + if ((byte) value == EXCLUDED.value) { + builder.append("E"); + } + else if ((byte) value == UNKNOWN.value) { + builder.append("U"); + } + else { + builder.append(value); + } + taskPartitionId++; + } + } + builder.append("]"); + return builder.toString(); + } + private int getPartitionCount(ExchangeId exchangeId) { Slice values = this.values.get(exchangeId); @@ -272,7 +326,7 @@ public ExchangeSourceOutputSelector build() return new ExchangeSourceOutputSelector( nextVersion++, exchangeValues.entrySet().stream() - .collect(toMap(Map.Entry::getKey, entry -> { + .collect(toMap(Entry::getKey, entry -> { ExchangeId exchangeId = entry.getKey(); ValuesBuilder valuesBuilder = entry.getValue(); if (finalSelector) { diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index f5c386fafab3..4d5ac7635c68 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -49,6 +49,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -337,4 +338,24 @@ private record CommittedTaskAttempt(int partitionId, int attemptId) checkArgument(attemptId >= 0, "attemptId is expected to be greater than or equal to zero: %s", attemptId); } } + + @Override + public synchronized String toString() + { + return toStringHelper(this) + .add("baseDirectories", baseDirectories) + .add("exchangeStorage", exchangeStorage.getClass().getName()) + .add("exchangeContext", exchangeContext) + .add("outputPartitionCount", outputPartitionCount) + .add("preserveOrderWithinPartition", preserveOrderWithinPartition) + .add("fileListingParallelism", fileListingParallelism) + .add("exchangeSourceHandleTargetDataSizeInBytes", exchangeSourceHandleTargetDataSizeInBytes) + .add("outputDirectories", outputDirectories) + .add("allSinks", allSinks) + .add("finishedSinks", finishedSinks) + .add("noMoreSinks", noMoreSinks) + .add("exchangeSourceHandlesCreationStarted", exchangeSourceHandlesCreationStarted) + .add("exchangeSourceHandlesFuture", exchangeSourceHandlesFuture) + .toString(); + } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java index 666afd36e0b6..a262b9f04149 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java @@ -19,6 +19,7 @@ import java.net.URI; +import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; public class FileSystemExchangeSinkInstanceHandle @@ -65,4 +66,15 @@ public boolean isPreserveOrderWithinPartition() { return preserveOrderWithinPartition; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("sinkHandle", sinkHandle) + .add("outputDirectory", outputDirectory) + .add("outputPartitionCount", outputPartitionCount) + .add("preserveOrderWithinPartition", preserveOrderWithinPartition) + .toString(); + } } From f21215e920dbead94a3a6581b87f5928954035d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Sun, 26 Nov 2023 13:45:39 +0100 Subject: [PATCH 6/7] Extend diagnostics for stuck FTE on EXCEEDED_TIME_LIMIT --- .../EventDrivenFaultTolerantQueryScheduler.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 2976a9be3a00..c33c356f8053 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -167,6 +167,7 @@ import static io.trino.spi.ErrorType.EXTERNAL; import static io.trino.spi.ErrorType.INTERNAL_ERROR; import static io.trino.spi.ErrorType.USER_ERROR; +import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; import static io.trino.spi.exchange.Exchange.SourceHandlesDeliveryMode.EAGER; @@ -188,6 +189,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; public class EventDrivenFaultTolerantQueryScheduler implements QueryScheduler @@ -651,7 +653,8 @@ private static class Scheduler // If scheduler is stalled for SCHEDULER_STALLED_DURATION_THRESHOLD debug log will be emitted. // This value must be larger than EVENT_PROCESSING_ENFORCED_FREQUENCY as prerequiste for processing is // that there are no events in the event queue. - private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = new Duration(5, MINUTES).toMillis(); + private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = MINUTES.toMillis(5); + private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30); private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10; private final QueryStateMachine queryStateMachine; @@ -792,6 +795,17 @@ public void run() } }); + queryStateMachine.addQueryInfoStateChangeListener(queryInfo -> { + if (!queryInfo.isFinalQueryInfo()) { + return; + } + if (queryInfo.getState() == QueryState.FAILED + && queryInfo.getErrorCode() == EXCEEDED_TIME_LIMIT.toErrorCode() + && eventDebugInfoStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS) { + logDebugInfoSafe(format("Scheduler stalled for %s on EXCEEDED_TIME_LIMIT", eventDebugInfoStopwatch.elapsed())); + } + }); + Optional failure = Optional.empty(); try { if (schedule()) { From f4d34fc6738aa46d40aa6fa669dcf12007c06cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 28 Nov 2023 10:31:36 +0100 Subject: [PATCH 7/7] Enable debug logging for fault tolerant execution in tests --- .../src/main/java/io/trino/testing/DistributedQueryRunner.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index 453ed0219004..2fa3dbbfcabb 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -80,6 +80,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.airlift.log.Level.DEBUG; import static io.airlift.log.Level.ERROR; import static io.airlift.log.Level.WARN; import static io.airlift.testing.Closeables.closeAllSuppress; @@ -234,6 +235,7 @@ private static void setupLogging() logging.setLevel("org.hibernate.validator.internal.util.Version", WARN); logging.setLevel(PluginManager.class.getName(), WARN); logging.setLevel(CoordinatorDynamicCatalogManager.class.getName(), WARN); + logging.setLevel("io.trino.execution.scheduler.faulttolerant", DEBUG); } private static TestingTrinoServer createTestingTrinoServer(