diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java index 0d8a146ee925..b1a61d9a7582 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.isEnableCoordinatorDynamicFiltersDistribution; @@ -329,9 +330,18 @@ private void checkAllTaskFinal() } @Override - public String toString() + // for debugging + public synchronized String toString() { - return stateMachine.toString(); + return toStringHelper(this) + .add("stateMachine", stateMachine) + .add("summarizeTaskInfo", summarizeTaskInfo) + .add("outboundDynamicFilterIds", outboundDynamicFilterIds) + .add("tasks", tasks) + .add("allTasks", allTasks) + .add("finishedTasks", finishedTasks) + .add("tasksWithFinalInfo", tasksWithFinalInfo) + .toString(); } private class MemoryUsageListener diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java index 21244b328649..fa2cd5bebc27 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputBuffers.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -76,4 +77,13 @@ public SpoolingOutputBuffers withExchangeSinkInstanceHandle(ExchangeSinkInstance { return new SpoolingOutputBuffers(getVersion() + 1, handle, outputPartitionCount); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("exchangeSinkInstanceHandle", exchangeSinkInstanceHandle) + .add("outputPartitionCount", outputPartitionCount) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java index 89d70cf94cbe..e875afba7845 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -314,6 +315,32 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List getHostRequirement(Split split) { if (split.getConnectorSplit().isRemotelyAccessible()) { @@ -396,5 +423,16 @@ public void setFull(boolean full) { this.full = full; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("partitionId", partitionId) + .add("assignedDataSizeInBytes", assignedDataSizeInBytes) + .add("assignedSplitCount", assignedSplitCount) + .add("full", full) + .toString(); + } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java index c77fab6592cf..d6757fbce2d2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java @@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -459,6 +460,18 @@ public void release() throw new IllegalStateException("Node " + node + " already released"); } } + + @Override + public String toString() + { + return toStringHelper(this) + .add("node", node) + .add("released", released) + .add("memoryLease", memoryLease) + .add("taskId", taskId) + .add("executionClass", executionClass) + .toString(); + } } private static class BinPackingSimulation diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 069b0e638283..c33c356f8053 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; @@ -108,6 +109,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -129,6 +131,7 @@ import java.util.function.Function; import java.util.function.IntConsumer; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -164,6 +167,7 @@ import static io.trino.spi.ErrorType.EXTERNAL; import static io.trino.spi.ErrorType.INTERNAL_ERROR; import static io.trino.spi.ErrorType.USER_ERROR; +import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; import static io.trino.spi.exchange.Exchange.SourceHandlesDeliveryMode.EAGER; @@ -181,9 +185,11 @@ import static java.lang.Math.round; import static java.lang.Math.toIntExact; import static java.lang.String.format; +import static java.util.Map.Entry.comparingByKey; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; public class EventDrivenFaultTolerantQueryScheduler implements QueryScheduler @@ -492,12 +498,164 @@ public void failTaskRemotely(TaskId taskId, Throwable failureCause) SqlStage sqlStage = requireNonNull(stages.get(taskId.getStageId()), () -> "stage not found: %s" + taskId.getStageId()); sqlStage.failTaskRemotely(taskId, failureCause); } + + public void logDebugInfo() + { + if (!log.isDebugEnabled()) { + return; + } + log.debug("SqlStages:"); + stages.forEach((stageId, stage) -> { + log.debug("SqlStage %s: %s", stageId, stage); + }); + } + } + + private static class EventDebugInfos + { + private static final String GLOBAL_EVENTS_BUCKET = "GLOBAL"; + private static final EventListener GET_BUCKET_LISTENER = new EventListener<>() + { + @Override + public String onRemoteTaskEvent(RemoteTaskEvent event) + { + return "task_" + event.getTaskStatus().getTaskId().getStageId().toString(); + } + + @Override + public String onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + { + return "task_" + event.getTaskId().getStageId().toString(); + } + + @Override + public String onStageEvent(StageEvent event) + { + return event.getStageId().toString(); + } + + @Override + public String onEvent(Event event) + { + return GLOBAL_EVENTS_BUCKET; + } + }; + + private final String queryId; + private final int eventsPerBucket; + private long eventsCounter; + private long filteredEventsCounter; + + // Using SoftReference to prevent OOM in an unexpected case when this collection grow to substantial size + // and VM is short on memory. + private SoftReference> eventsDebugInfosReference; + + private EventDebugInfos(String queryId, int eventsPerBucket) + { + this.queryId = requireNonNull(queryId, "queryId is null"); + this.eventsPerBucket = eventsPerBucket; + eventsDebugInfosReference = new SoftReference<>(LinkedListMultimap.create()); + } + + /** + * @return true if event was recorded; false if it was filtered out + */ + private boolean add(Event event) + { + ListMultimap eventsDebugInfos = getEventsDebugInfos(); + String bucket = getBucket(event); + Optional debugInfo = getFullDebugInfo(eventsCounter, event); + eventsCounter++; + if (debugInfo.isEmpty()) { + filteredEventsCounter++; + return false; + } + + List bucketDebugInfos = eventsDebugInfos.get(bucket); + bucketDebugInfos.add(debugInfo.get()); + if (bucketDebugInfos.size() > eventsPerBucket) { + Iterator iterator = bucketDebugInfos.iterator(); + iterator.next(); + iterator.remove(); + } + return true; + } + + private ListMultimap getEventsDebugInfos() + { + ListMultimap eventsDebugInfos = eventsDebugInfosReference.get(); + if (eventsDebugInfos == null) { + log.debug("eventsDebugInfos for %s has been cleared", queryId); + eventsDebugInfos = LinkedListMultimap.create(); + eventsDebugInfosReference = new SoftReference<>(eventsDebugInfos); + } + return eventsDebugInfos; + } + + private String getBucket(Event event) + { + if (event == Event.WAKE_UP || event == Event.ABORT) { + return GLOBAL_EVENTS_BUCKET; + } + return event.accept(GET_BUCKET_LISTENER); + } + + private Optional getFullDebugInfo(long eventId, Event event) + { + return getEventDebugInfo(event).map(info -> "[" + eventId + "/" + System.currentTimeMillis() + "/" + info + "]"); + } + + private static Optional getEventDebugInfo(Event event) + { + if (event == Event.WAKE_UP) { + return Optional.of("WAKE_UP"); + } + if (event == Event.ABORT) { + return Optional.of("ABORT"); + } + if (event instanceof SplitAssignmentEvent splitAssignmentEvent) { + if (splitAssignmentEvent.getAssignmentResult().isEmpty()) { + // There may be significant amount of empty AssignmentResults so lets skip processing of those. + // It could be that scheduler loop is not really stuck per se. But we are getting empty events all the time - and it just looks like stuck. + // We need to notice that, and still log debug information. + // Also empty events can push important events out of recorded debug information - making debug logs less useful. + return Optional.empty(); + } + } + + return Optional.of(event.toString()); + } + + public void log() + { + if (!log.isDebugEnabled()) { + return; + } + ListMultimap eventsDebugInfos = getEventsDebugInfos(); + eventsDebugInfos.asMap().entrySet().stream() + .sorted(comparingByKey()) + .forEachOrdered(entry -> { + log.debug("Recent events for " + entry.getKey()); + for (String eventDebugInfo : entry.getValue()) { + // logging events in separate log events as some events may be huge and otherwise rarely we could hit logging framework constraints + log.debug(" " + eventDebugInfo); + } + }); + log.debug("Filtered events count " + filteredEventsCounter); + } } private static class Scheduler - implements EventListener + implements EventListener { private static final int EVENT_BUFFER_CAPACITY = 100; + private static final long EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS = MINUTES.toMillis(1); + // If scheduler is stalled for SCHEDULER_STALLED_DURATION_THRESHOLD debug log will be emitted. + // This value must be larger than EVENT_PROCESSING_ENFORCED_FREQUENCY as prerequiste for processing is + // that there are no events in the event queue. + private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = MINUTES.toMillis(5); + private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30); + private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10; private final QueryStateMachine queryStateMachine; private final Metadata metadata; @@ -530,6 +688,8 @@ private static class Scheduler private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); private final List eventBuffer = new ArrayList<>(EVENT_BUFFER_CAPACITY); + private final Stopwatch eventDebugInfoStopwatch = Stopwatch.createUnstarted(); + private final Optional eventDebugInfos; private boolean started; private boolean runtimeAdaptivePartitioningApplied; @@ -613,7 +773,15 @@ public Scheduler( this.runtimeAdaptivePartitioningMaxTaskSizeInBytes = requireNonNull(runtimeAdaptivePartitioningMaxTaskSize, "runtimeAdaptivePartitioningMaxTaskSize is null").toBytes(); this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled; + if (log.isDebugEnabled()) { + eventDebugInfos = Optional.of(new EventDebugInfos(queryStateMachine.getQueryId().toString(), EVENTS_DEBUG_INFOS_PER_BUCKET)); + } + else { + eventDebugInfos = Optional.empty(); + } + planInTopologicalOrder = sortPlanInTopologicalOrder(plan); + eventDebugInfoStopwatch.start(); } public void run() @@ -627,6 +795,17 @@ public void run() } }); + queryStateMachine.addQueryInfoStateChangeListener(queryInfo -> { + if (!queryInfo.isFinalQueryInfo()) { + return; + } + if (queryInfo.getState() == QueryState.FAILED + && queryInfo.getErrorCode() == EXCEEDED_TIME_LIMIT.toErrorCode() + && eventDebugInfoStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS) { + logDebugInfoSafe(format("Scheduler stalled for %s on EXCEEDED_TIME_LIMIT", eventDebugInfoStopwatch.elapsed())); + } + }); + Optional failure = Optional.empty(); try { if (schedule()) { @@ -675,34 +854,108 @@ private Optional closeAndAddSuppressed(Optional existingFa private boolean processEvents() { try { - Event event = eventQueue.poll(1, MINUTES); - if (event == null) { - return true; + Event event = eventQueue.poll(EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS, MILLISECONDS); + if (event != null) { + eventBuffer.add(event); } - eventBuffer.add(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } - while (true) { + boolean eventDebugInfoRecorded = false; + boolean aborted = false; + while (!aborted) { // poll multiple events from the queue in one shot to improve efficiency eventQueue.drainTo(eventBuffer, EVENT_BUFFER_CAPACITY - eventBuffer.size()); if (eventBuffer.isEmpty()) { - return true; + break; } + for (Event e : eventBuffer) { + eventDebugInfoRecorded |= recordEventsDebugInfo(e); if (e == Event.ABORT) { - return false; + aborted = true; + break; } if (e == Event.WAKE_UP) { continue; } e.accept(this); } + eventBuffer.clear(); } + + if (eventDebugInfoRecorded) { + // mark that we processed some events; we filter out some no-op events. + // If only no-op events appear in event queue we still treat scheduler as stuck + eventDebugInfoStopwatch.reset().start(); + } + else { + // if no events were recorded there is a chance scheduler is stalled + if (log.isDebugEnabled() && eventDebugInfoStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS) { + logDebugInfoSafe("Scheduler stalled for %s".formatted(eventDebugInfoStopwatch.elapsed())); + eventDebugInfoStopwatch.reset().start(); // reset to prevent extensive logging + } + } + + return !aborted; + } + + private boolean recordEventsDebugInfo(Event event) + { + if (eventDebugInfos.isEmpty()) { + return false; + } + return eventDebugInfos.orElseThrow().add(event); + } + + private void logDebugInfoSafe(String reason) + { + try { + logDebugInfo(reason); + } + catch (Throwable e) { + log.error(e, "Unexpected error while logging debug info for %s", reason); + } + } + + private void logDebugInfo(String reason) + { + if (!log.isDebugEnabled()) { + return; + } + + log.debug("Scheduler debug info for %s START; reason=%s", queryStateMachine.getQueryId(), reason); + log.debug("General state: %s", toStringHelper(this) + .add("maxTaskExecutionAttempts", maxTaskExecutionAttempts) + .add("maxTasksWaitingForNode", maxTasksWaitingForNode) + .add("maxTasksWaitingForExecution", maxTasksWaitingForExecution) + .add("maxPartitionCount", maxPartitionCount) + .add("runtimeAdaptivePartitioningEnabled", runtimeAdaptivePartitioningEnabled) + .add("runtimeAdaptivePartitioningPartitionCount", runtimeAdaptivePartitioningPartitionCount) + .add("runtimeAdaptivePartitioningMaxTaskSizeInBytes", runtimeAdaptivePartitioningMaxTaskSizeInBytes) + .add("stageEstimationForEagerParentEnabled", stageEstimationForEagerParentEnabled) + .add("started", started) + .add("runtimeAdaptivePartitioningApplied", runtimeAdaptivePartitioningApplied) + .add("nextSchedulingPriority", nextSchedulingPriority) + .add("preSchedulingTaskContexts", preSchedulingTaskContexts) + .add("schedulingDelayer", schedulingDelayer) + .add("queryOutputSet", queryOutputSet) + .toString()); + + stageRegistry.logDebugInfo(); + + log.debug("StageExecutions:"); + stageExecutions.forEach((stageId, stageExecution) -> { + stageExecution.logDebugInfo(); + }); + + eventDebugInfos.ifPresent(EventDebugInfos::log); + + log.debug("Scheduler debug info for %s END", queryStateMachine.getQueryId()); } private boolean schedule() @@ -1343,7 +1596,7 @@ private void updateMemoryRequirements() } @Override - public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) + public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) { ScheduledTask scheduledTask = new ScheduledTask(sinkInstanceHandleAcquiredEvent.getStageId(), sinkInstanceHandleAcquiredEvent.getPartitionId()); PreSchedulingTaskContext context = preSchedulingTaskContexts.remove(scheduledTask); @@ -1376,6 +1629,7 @@ public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns if (remoteTask.isEmpty()) { nodeLease.release(); } + return null; } private StateChangeListener createExchangeSinkInstanceHandleUpdateRequiredListener() @@ -1414,6 +1668,9 @@ private void loadMoreTaskDescriptorsIfNecessary() @Override public void onSuccess(AssignmentResult result) { + // We need to process even empty events here so stageExecution.taskDescriptorLoadingComplete() + // is called in event handler. Otherwise, IdempotentSplitSource may be not called again + // if there is no other SplitAssignmentEvent for this stage in queue. eventQueue.add(new SplitAssignmentEvent(stageExecution.getStageId(), result)); } @@ -1434,7 +1691,7 @@ public void abort() } @Override - public void onRemoteTaskCompleted(RemoteTaskCompletedEvent event) + public Void onRemoteTaskCompleted(RemoteTaskCompletedEvent event) { TaskStatus taskStatus = event.getTaskStatus(); TaskId taskId = taskStatus.getTaskId(); @@ -1463,26 +1720,29 @@ else if (taskState == TaskState.FAILED) { for (StageId consumerStageId : stageConsumers.get(stageExecution.getStageId())) { getStageExecution(consumerStageId).setSourceOutputSelector(stageExecution.getStageFragmentId(), outputSelector); } + return null; } @Override - public void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) + public Void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) { TaskId taskId = event.getTaskStatus().getTaskId(); StageExecution stageExecution = getStageExecution(taskId.getStageId()); stageExecution.initializeUpdateOfExchangeSinkInstanceHandle(taskId, eventQueue); + return null; } @Override - public void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + public Void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) { TaskId taskId = event.getTaskId(); StageExecution stageExecution = getStageExecution(taskId.getStageId()); stageExecution.finalizeUpdateOfExchangeSinkInstanceHandle(taskId, event.getExchangeSinkInstanceHandle()); + return null; } @Override - public void onSplitAssignment(SplitAssignmentEvent event) + public Void onSplitAssignment(SplitAssignmentEvent event) { StageExecution stageExecution = getStageExecution(event.getStageId()); AssignmentResult assignment = event.getAssignmentResult(); @@ -1516,13 +1776,15 @@ public void onSplitAssignment(SplitAssignmentEvent event) stageExecution.noMorePartitions(); } stageExecution.taskDescriptorLoadingComplete(); + return null; } @Override - public void onStageFailure(StageFailureEvent event) + public Void onStageFailure(StageFailureEvent event) { StageExecution stageExecution = getStageExecution(event.getStageId()); stageExecution.fail(event.getFailure()); + return null; } private StageExecution getStageExecution(StageId stageId) @@ -2183,6 +2445,40 @@ public FaultTolerantPartitioningScheme getSinkPartitioningScheme() { return sinkPartitioningScheme; } + + public void logDebugInfo() + { + if (!log.isDebugEnabled()) { + return; + } + + log.debug("StageExecution %s: %s", + stage.getStageId(), + toStringHelper(this) + .add("taskDescriptorStorage.getReservedBytes()", taskDescriptorStorage.getReservedBytes()) + .add("taskSource", taskSource.getDebugInfo()) + .add("sinkPartitioningScheme", sinkPartitioningScheme) + .add("exchange", exchange) + .add("schedulingPriority", schedulingPriority) + .add("eager", eager) + .add("outputDataSize", outputDataSize) + .add("noMorePartitions", noMorePartitions) + .add("runningPartitions", runningPartitions) + .add("remainingPartitions", remainingPartitions) + .add("sinkOutputSelectorBuilder", sinkOutputSelectorBuilder == null ? null : sinkOutputSelectorBuilder.build()) + .add("finalSinkOutputSelector", finalSinkOutputSelector) + .add("remoteSourceIds", remoteSourceIds) + .add("remoteSources", remoteSources) + .add("sourceOutputSelectors", sourceOutputSelectors) + .add("taskDescriptorLoadingActive", taskDescriptorLoadingActive) + .add("exchangeClosed", exchangeClosed) + .add("initialMemoryRequirements", initialMemoryRequirements) + .toString()); + + partitions.forEach((partitionId, stagePartition) -> { + log.debug(" StagePartition %s.%s: %s", stage.getStageId(), partitionId, stagePartition.getDebugInfo()); + }); + } } private static class StagePartition @@ -2437,6 +2733,28 @@ public boolean isFinished() { return finished; } + + public String getDebugInfo() + { + return toStringHelper(this) + .add("stageId", stageId) + .add("partitionId", partitionId) + .add("exchangeSinkHandle", exchangeSinkHandle) + .add("remoteSourceIds", remoteSourceIds) + .add("openTaskDescriptor", openTaskDescriptor) + .add("memoryRequirements", memoryRequirements) + .add("failureObserved", failureObserved) + .add("remainingAttempts", remainingAttempts) + .add("tasks", tasks) + .add("taskOutputBuffers", taskOutputBuffers) + .add("runningTasks", runningTasks) + .add("taskNodeLeases", taskNodeLeases) + .add("finalSelectors", finalSelectors) + .add("noMoreSplits", noMoreSplits) + .add("taskScheduled", taskScheduled) + .add("finished", finished) + .toString(); + } } private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector selector) @@ -2636,34 +2954,87 @@ public long getRemainingDelayInMillis() } return 0; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("minRetryDelayInMillis", minRetryDelayInMillis) + .add("maxRetryDelayInMillis", maxRetryDelayInMillis) + .add("retryDelayScaleFactor", retryDelayScaleFactor) + .add("stopwatch", stopwatch) + .add("currentDelayInMillis", currentDelayInMillis) + .toString(); + } } private interface Event { - Event ABORT = listener -> { - throw new UnsupportedOperationException(); + Event ABORT = new Event() { + @Override + public T accept(EventListener listener) + { + throw new UnsupportedOperationException(); + } }; - Event WAKE_UP = listener -> { - throw new UnsupportedOperationException(); + Event WAKE_UP = new Event() { + @Override + public T accept(EventListener listener) + { + throw new UnsupportedOperationException(); + } }; - void accept(EventListener listener); + T accept(EventListener listener); } - private interface EventListener + private interface EventListener { - void onRemoteTaskCompleted(RemoteTaskCompletedEvent event); + default T onRemoteTaskCompleted(RemoteTaskCompletedEvent event) + { + return onRemoteTaskEvent(event); + } - void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event); + default T onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) + { + return onRemoteTaskEvent(event); + } - void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event); + default T onRemoteTaskEvent(RemoteTaskEvent event) + { + return onEvent(event); + } + + default T onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) + { + return onEvent(event); + } - void onSplitAssignment(SplitAssignmentEvent event); + default T onSplitAssignment(SplitAssignmentEvent event) + { + return onStageEvent(event); + } - void onStageFailure(StageFailureEvent event); + default T onStageFailure(StageFailureEvent event) + { + return onStageEvent(event); + } + + default T onStageEvent(StageEvent event) + { + return onEvent(event); + } - void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent); + default T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent event) + { + return onEvent(event); + } + + default T onEvent(Event event) + { + throw new RuntimeException("EventListener no implemented"); + } } private static class SinkInstanceHandleAcquiredEvent @@ -2710,9 +3081,21 @@ public ExchangeSinkInstanceHandle getSinkInstanceHandle() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) + { + return listener.onSinkInstanceHandleAcquired(this); + } + + @Override + public String toString() { - listener.onSinkInstanceHandleAcquired(this); + return toStringHelper(this) + .add("stageId", stageId) + .add("partitionId", partitionId) + .add("nodeLease", nodeLease) + .add("attempt", attempt) + .add("sinkInstanceHandle", sinkInstanceHandle) + .toString(); } } @@ -2725,9 +3108,17 @@ public RemoteTaskCompletedEvent(TaskStatus taskStatus) } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskCompleted(this); + return listener.onRemoteTaskCompleted(this); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", getTaskStatus()) + .toString(); } } @@ -2740,9 +3131,17 @@ protected RemoteTaskExchangeSinkUpdateRequiredEvent(TaskStatus taskStatus) } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskExchangeSinkUpdateRequired(this); + return listener.onRemoteTaskExchangeSinkUpdateRequired(this); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", getTaskStatus()) + .toString(); } } @@ -2759,9 +3158,9 @@ private RemoteTaskExchangeUpdatedSinkAcquired(TaskId taskId, ExchangeSinkInstanc } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) { - listener.onRemoteTaskExchangeUpdatedSinkAcquired(this); + return listener.onRemoteTaskExchangeUpdatedSinkAcquired(this); } public TaskId getTaskId() @@ -2773,6 +3172,15 @@ public ExchangeSinkInstanceHandle getExchangeSinkInstanceHandle() { return exchangeSinkInstanceHandle; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskId", taskId) + .add("exchangeSinkInstanceHandle", exchangeSinkInstanceHandle) + .toString(); + } } private abstract static class RemoteTaskEvent @@ -2789,6 +3197,14 @@ public TaskStatus getTaskStatus() { return taskStatus; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("taskStatus", taskStatus) + .toString(); + } } private static class SplitAssignmentEvent @@ -2808,9 +3224,18 @@ public AssignmentResult getAssignmentResult() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) + { + return listener.onSplitAssignment(this); + } + + @Override + public String toString() { - listener.onSplitAssignment(this); + return toStringHelper(this) + .add("stageId", getStageId()) + .add("assignmentResult", assignmentResult) + .toString(); } } @@ -2831,9 +3256,18 @@ public Throwable getFailure() } @Override - public void accept(EventListener listener) + public T accept(EventListener listener) + { + return listener.onStageFailure(this); + } + + @Override + public String toString() { - listener.onStageFailure(this); + return toStringHelper(this) + .add("stageId", getStageId()) + .add("failure", failure) + .toString(); } } @@ -2898,5 +3332,15 @@ public void setWaitingForSinkInstanceHandle(boolean waitingForSinkInstanceHandle { this.waitingForSinkInstanceHandle = waitingForSinkInstanceHandle; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("nodeLease", nodeLease) + .add("executionClass", executionClass) + .add("waitingForSinkInstanceHandle", waitingForSinkInstanceHandle) + .toString(); + } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java index d6e6d74582f8..0d21d1a24f7d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java @@ -56,10 +56,12 @@ import java.util.function.LongConsumer; import java.util.function.Supplier; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; +import static com.google.common.collect.Maps.transformValues; import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.toListenableFuture; @@ -221,6 +223,23 @@ public synchronized void close() } } + public String getDebugInfo() + { + return toStringHelper(this) + .add("sourceExchanges", transformValues(sourceExchanges, Exchange::getId)) + .add("remoteSources", remoteSources) + .add("assigner", assigner) + .add("splitBatchSize", splitBatchSize) + .add("targetExchangeSplitSizeInBytes", targetExchangeSplitSizeInBytes) + .add("sourcePartitioningScheme", sourcePartitioningScheme) + .add("initialized", initialized) + .add("splitSources", splitSources) + .add("completedFragments", completedFragments) + .add("future", future) + .add("closed", closed) + .toString(); + } + private static class IdempotentSplitSource implements Closeable { @@ -296,6 +315,19 @@ private synchronized void advance(boolean lastBatch) future = Optional.empty(); } + @Override + public synchronized String toString() + { + return toStringHelper(this) + .add("planNodeId", planNodeId) + .add("sourceFragmentId", sourceFragmentId) + .add("splitSource", splitSource) + .add("splitBatchSize", splitBatchSize) + .add("closed", closed) + .add("finished", finished) + .toString(); + } + public class SplitBatchReference { private final SplitBatch splitBatch; @@ -404,6 +436,15 @@ public Optional> getTableExecuteSplitsInfo() { return Optional.empty(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("handleSource", handleSource) + .add("targetSplitSizeInBytes", targetSplitSizeInBytes) + .toString(); + } } /** diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java index d64cae61d71f..9f4ff0d00b7a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/FaultTolerantPartitioningScheme.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.function.ToIntFunction; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -103,4 +104,15 @@ public FaultTolerantPartitioningScheme withPartitionCount(int partitionCount) this.splitToBucketFunction, this.partitionToNodeMap); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("partitionCount", partitionCount) + .add("bucketToPartitionMap", bucketToPartitionMap.isPresent() ? "present" : "empty") + .add("splitToBucketFunction", splitToBucketFunction.isPresent() ? "present" : "empty") + .add("partitionToNodeMap", partitionToNodeMap) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java index db0e693e0fd6..fe424f9ce050 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/HashDistributionSplitAssigner.java @@ -42,6 +42,7 @@ import java.util.function.Predicate; import java.util.stream.IntStream; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -353,6 +354,16 @@ public Optional getSplitBy() { return splitBy; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("subPartitions", subPartitions) + .add("splitBy", splitBy) + .add("nextSubPartition", nextSubPartition) + .toString(); + } } @VisibleForTesting @@ -376,6 +387,12 @@ public int getId() checkState(id.isPresent(), "id is expected to be assigned"); return id.getAsInt(); } + + @Override + public String toString() + { + return id.toString(); + } } private static boolean isWriteFragment(PlanFragment fragment) @@ -402,4 +419,20 @@ public Boolean visitTableWriter(TableWriterNode node, Void context) return fragment.getRoot().accept(visitor, null); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("catalogRequirement", catalogRequirement) + .add("replicatedSources", replicatedSources) + .add("allSources", allSources) + .add("sourcePartitioningScheme", sourcePartitioningScheme) + .add("sourcePartitionToTaskPartition", sourcePartitionToTaskPartition) + .add("createdTaskPartitions", createdTaskPartitions) + .add("completedSources", completedSources) + .add("replicatedSplits.size()", replicatedSplits.size()) + .add("allTaskPartitionsCreated", allTaskPartitionsCreated) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java index 3f1801c382b6..a9854e692ab3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SingleDistributionSplitAssigner.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -88,4 +89,15 @@ public AssignmentResult finish() } return result.build(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("hostRequirement", hostRequirement) + .add("allSources", allSources) + .add("partitionAdded", partitionAdded) + .add("completedSources", completedSources) + .toString(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java index 161bf7e01642..6fbeb6884d98 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitAssigner.java @@ -75,6 +75,11 @@ record AssignmentResult( partitionUpdates = ImmutableList.copyOf(requireNonNull(partitionUpdates, "partitionUpdates is null")); } + boolean isEmpty() + { + return partitionsAdded.isEmpty() && !noMorePartitions && partitionUpdates.isEmpty() && sealedPartitions.isEmpty(); + } + public static AssignmentResult.Builder builder() { return new AssignmentResult.Builder(); diff --git a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java index da721ce70660..d7cc2611792c 100644 --- a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.concurrent.Executor; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.addCallback; @@ -78,6 +79,15 @@ public Optional> getTableExecuteSplitsInfo() return source.getTableExecuteSplitsInfo(); } + @Override + public String toString() + { + return toStringHelper(this) + .add("bufferSize", bufferSize) + .add("source", source) + .toString(); + } + private static class GetNextBatch extends AbstractFuture { diff --git a/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java index 8df2cc09b5b1..349487592caf 100644 --- a/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Optional; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; @@ -105,4 +106,12 @@ public Optional> getTableExecuteSplitsInfo() { return source.getTableExecuteSplitsInfo(); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("source", source) + .toString(); + } } diff --git a/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java index 4affb429c24e..9161b2d4f5c9 100644 --- a/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java +++ b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeSourceOutputSelector.java @@ -195,6 +195,20 @@ public void testIncompatibleTransitions() .hasMessage("decision for partition 0 is already made: 0"); } + @Test + public void testToString() + { + ExchangeSourceOutputSelector selector = ExchangeSourceOutputSelector.builder(ImmutableSet.of(EXCHANGE_ID_1, EXCHANGE_ID_2)) + .include(EXCHANGE_ID_1, 0, 1) + .exclude(EXCHANGE_ID_1, 1) + .include(EXCHANGE_ID_1, 2, 0) + .exclude(EXCHANGE_ID_2, 3) + .setPartitionCount(EXCHANGE_ID_2, 4) + .build(); + + assertThat(selector).hasToString("ExchangeSourceOutputSelector[version=0, values={exchange_1=[0=1,1=E,2=0], exchange_2=[0=U,1=U,2=U,3=E]}, finalSelector=false]"); + } + private ExchangeSourceOutputSelector serializeDeserialize(ExchangeSourceOutputSelector selector) { return codec.fromJson(codec.toJson(selector)); diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java index 2f3a1604d64f..6143746a0b47 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeContext.java @@ -16,6 +16,8 @@ import io.trino.spi.Experimental; import io.trino.spi.QueryId; +import java.util.StringJoiner; + import static java.util.Objects.requireNonNull; @Experimental(eta = "2023-09-01") @@ -39,4 +41,13 @@ public ExchangeId getExchangeId() { return exchangeId; } + + @Override + public String toString() + { + return new StringJoiner(", ", ExchangeContext.class.getSimpleName() + "[", "]") + .add("queryId=" + queryId) + .add("exchangeId=" + exchangeId) + .toString(); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java index 6b7c0067a54a..14ee2c3870a0 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeSourceOutputSelector.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.BasicSliceInput; import io.airlift.slice.SizeOf; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -22,7 +23,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.StringJoiner; +import java.util.TreeMap; import java.util.function.Function; import static io.airlift.slice.SizeOf.instanceSize; @@ -31,6 +35,7 @@ import static io.trino.spi.exchange.ExchangeSourceOutputSelector.Selection.UNKNOWN; import static java.lang.Math.max; import static java.util.Arrays.fill; +import static java.util.Map.entry; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; @@ -157,6 +162,55 @@ public ExchangeSourceOutputSelector merge(ExchangeSourceOutputSelector other) this.finalSelector && other.finalSelector); } + @Override + public String toString() + { + return new StringJoiner(", ", ExchangeSourceOutputSelector.class.getSimpleName() + "[", "]") + .add("version=" + version) + .add("values=" + values.entrySet().stream() + .map(e -> entry(e.getKey().toString(), valuesSliceToString(e.getValue()))) + // collect to TreeMap to ensure ordering of keys + .collect(toMap( + Entry::getKey, + Entry::getValue, + (a, b) -> { throw new IllegalArgumentException("got duplicate key " + a + ", " + b); }, + TreeMap::new))) + .add("finalSelector=" + finalSelector) + .toString(); + } + + private String valuesSliceToString(Slice values) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + try (BasicSliceInput input = new BasicSliceInput(values)) { + int taskPartitionId = 0; + while (true) { + int value = input.read(); + if (value == -1) { + break; + } + if (taskPartitionId != 0) { + builder.append(","); + } + builder.append(taskPartitionId); + builder.append("="); + if ((byte) value == EXCLUDED.value) { + builder.append("E"); + } + else if ((byte) value == UNKNOWN.value) { + builder.append("U"); + } + else { + builder.append(value); + } + taskPartitionId++; + } + } + builder.append("]"); + return builder.toString(); + } + private int getPartitionCount(ExchangeId exchangeId) { Slice values = this.values.get(exchangeId); @@ -272,7 +326,7 @@ public ExchangeSourceOutputSelector build() return new ExchangeSourceOutputSelector( nextVersion++, exchangeValues.entrySet().stream() - .collect(toMap(Map.Entry::getKey, entry -> { + .collect(toMap(Entry::getKey, entry -> { ExchangeId exchangeId = entry.getKey(); ValuesBuilder valuesBuilder = entry.getValue(); if (finalSelector) { diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index f5c386fafab3..4d5ac7635c68 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -49,6 +49,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -337,4 +338,24 @@ private record CommittedTaskAttempt(int partitionId, int attemptId) checkArgument(attemptId >= 0, "attemptId is expected to be greater than or equal to zero: %s", attemptId); } } + + @Override + public synchronized String toString() + { + return toStringHelper(this) + .add("baseDirectories", baseDirectories) + .add("exchangeStorage", exchangeStorage.getClass().getName()) + .add("exchangeContext", exchangeContext) + .add("outputPartitionCount", outputPartitionCount) + .add("preserveOrderWithinPartition", preserveOrderWithinPartition) + .add("fileListingParallelism", fileListingParallelism) + .add("exchangeSourceHandleTargetDataSizeInBytes", exchangeSourceHandleTargetDataSizeInBytes) + .add("outputDirectories", outputDirectories) + .add("allSinks", allSinks) + .add("finishedSinks", finishedSinks) + .add("noMoreSinks", noMoreSinks) + .add("exchangeSourceHandlesCreationStarted", exchangeSourceHandlesCreationStarted) + .add("exchangeSourceHandlesFuture", exchangeSourceHandlesFuture) + .toString(); + } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java index 666afd36e0b6..a262b9f04149 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java @@ -19,6 +19,7 @@ import java.net.URI; +import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; public class FileSystemExchangeSinkInstanceHandle @@ -65,4 +66,15 @@ public boolean isPreserveOrderWithinPartition() { return preserveOrderWithinPartition; } + + @Override + public String toString() + { + return toStringHelper(this) + .add("sinkHandle", sinkHandle) + .add("outputDirectory", outputDirectory) + .add("outputPartitionCount", outputPartitionCount) + .add("preserveOrderWithinPartition", preserveOrderWithinPartition) + .toString(); + } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index 453ed0219004..2fa3dbbfcabb 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -80,6 +80,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.airlift.log.Level.DEBUG; import static io.airlift.log.Level.ERROR; import static io.airlift.log.Level.WARN; import static io.airlift.testing.Closeables.closeAllSuppress; @@ -234,6 +235,7 @@ private static void setupLogging() logging.setLevel("org.hibernate.validator.internal.util.Version", WARN); logging.setLevel(PluginManager.class.getName(), WARN); logging.setLevel(CoordinatorDynamicCatalogManager.class.getName(), WARN); + logging.setLevel("io.trino.execution.scheduler.faulttolerant", DEBUG); } private static TestingTrinoServer createTestingTrinoServer(