From 8d27c93eec7601b160860a0665ea93ea2b043051 Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Thu, 18 Jul 2024 17:02:34 -0700 Subject: [PATCH] Aggressively prune statistics from query info after completion Previously, we only pruned information from old queries after the expiration queue filled. With more the addition of histograms query statistics can take two orders of magnitude more memory than previous statistics. Without pruning statistics we will much more quickly hit memory limits of the JVM. This change aggressively prunes query plan metadata before query expiry. This is mostly limited to query plan statistics. --- .../dispatcher/FailedDispatchQuery.java | 7 +- .../presto/dispatcher/LocalDispatchQuery.java | 10 +- .../AccessControlCheckerExecution.java | 8 +- .../execution/DataDefinitionExecution.java | 9 +- .../presto/execution/QueryStateMachine.java | 166 ++++++++++++- .../presto/execution/QueryTracker.java | 24 +- .../presto/execution/SqlQueryExecution.java | 17 +- .../presto/execution/StateMachine.java | 5 + .../presto/execution/MockQueryExecution.java | 6 +- .../spi/statistics/ColumnStatistics.java | 10 + .../spi/statistics/TableStatistics.java | 16 ++ .../presto/execution/TestEventListener.java | 2 +- .../execution/TestEventListenerPlugin.java | 6 +- .../presto/tests/TestQueryManager.java | 230 ++++++++++++++++++ 14 files changed, 492 insertions(+), 24 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java index 3c828ea62079..f4e93eed051c 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java @@ -115,7 +115,10 @@ public void fail(Throwable throwable) {} public void cancel() {} @Override - public void pruneInfo() {} + public void pruneExpiredQueryInfo() {} + + @Override + public void pruneFinishedQueryInfo() {} @Override public QueryId getQueryId() @@ -194,5 +197,5 @@ public Optional getResourceGroupQueryLimits() @Override public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits) - { } + {} } diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java index 4babf8a95575..50b7dea3fba3 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java @@ -357,9 +357,15 @@ public void cancel() } @Override - public void pruneInfo() + public void pruneExpiredQueryInfo() { - stateMachine.pruneQueryInfo(); + stateMachine.pruneQueryInfoExpired(); + } + + @Override + public void pruneFinishedQueryInfo() + { + stateMachine.pruneQueryInfoFinished(); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java index 2fb96b05b2f0..160759e3f0cf 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java @@ -327,7 +327,13 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneExpiredQueryInfo() + { + // no-op + } + + @Override + public void pruneFinishedQueryInfo() { // no-op } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java index a130b447516f..cd22e10a5c98 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java @@ -162,6 +162,7 @@ public DataSize getWrittenIntermediateDataSize() { return DataSize.succinctBytes(0); } + @Override public long getOutputPositions() { @@ -290,7 +291,13 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneExpiredQueryInfo() + { + // no-op + } + + @Override + public void pruneFinishedQueryInfo() { // no-op } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index aeee080928ae..b22822fc0149 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -19,7 +19,9 @@ import com.facebook.presto.common.resourceGroups.QueryType; import com.facebook.presto.common.transaction.TransactionId; import com.facebook.presto.common.type.Type; +import com.facebook.presto.cost.PlanNodeStatsEstimate; import com.facebook.presto.cost.StatsAndCosts; +import com.facebook.presto.cost.VariableStatsEstimate; import com.facebook.presto.execution.QueryExecution.QueryOutputInfo; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.memory.VersionedMemoryPoolId; @@ -38,13 +40,17 @@ import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.spi.security.AccessControl; import com.facebook.presto.spi.security.SelectedRole; +import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.sql.planner.CanonicalPlanWithInfo; +import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.transaction.TransactionInfo; import com.facebook.presto.transaction.TransactionManager; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.google.common.util.concurrent.FutureCallback; @@ -93,6 +99,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.succinctBytes; import static java.lang.String.format; @@ -1046,7 +1054,24 @@ public QueryInfo updateQueryInfo(Optional stageInfo) * Remove large objects from the query info object graph, e.g : plan, stats, stage summaries, failed attempts * Used when pruning expired queries from the state machine */ - public void pruneQueryInfo() + public void pruneQueryInfoExpired() + { + Optional finalInfo = finalQueryInfo.get(); + if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) { + return; + } + QueryInfo queryInfo = finalInfo.get(); + QueryInfo prunedQueryInfo; + + prunedQueryInfo = pruneExpiredQueryInfo(queryInfo, getMemoryPool()); + finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo)); + } + + /** + * Remove the largest objects from the query info object graph, e.g : extraneous stats, costs, + * and histograms to reduce memory utilization + */ + public void pruneQueryInfoFinished() { Optional finalInfo = finalQueryInfo.get(); if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) { @@ -1054,20 +1079,150 @@ public void pruneQueryInfo() } QueryInfo queryInfo = finalInfo.get(); + QueryInfo prunedQueryInfo; + + // no longer needed in the session after query finishes + session.getPlanNodeStatsMap().clear(); + session.getPlanNodeCostMap().clear(); + // inputs contain some statistics which should be cleared + inputs.getAndUpdate(QueryStateMachine::pruneInputHistograms); + // query listeners maintain state in their arguments which holds + // onto plan nodes and statistics. Since finalQueryInfo was + // already set it should be in a terminal state and be safe to + // clear the listeners. + finalQueryInfo.clearEventListeners(); + planStatsAndCosts.getAndUpdate(stats -> Optional.ofNullable(stats) + .map(QueryStateMachine::pruneHistogramsFromStatsAndCosts) + .orElse(null)); + prunedQueryInfo = pruneFinishedQueryInfo(queryInfo, inputs.get()); + finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo)); + } + + private static QueryInfo pruneFinishedQueryInfo(QueryInfo queryInfo, Set prunedInputs) + { + return new QueryInfo( + queryInfo.getQueryId(), + queryInfo.getSession(), + queryInfo.getState(), + queryInfo.getMemoryPool(), + queryInfo.isScheduled(), + queryInfo.getSelf(), + queryInfo.getFieldNames(), + queryInfo.getQuery(), + queryInfo.getExpandedQuery(), + queryInfo.getPreparedQuery(), + queryInfo.getQueryStats(), + queryInfo.getSetCatalog(), + queryInfo.getSetSchema(), + queryInfo.getSetSessionProperties(), + queryInfo.getResetSessionProperties(), + queryInfo.getSetRoles(), + queryInfo.getAddedPreparedStatements(), + queryInfo.getDeallocatedPreparedStatements(), + queryInfo.getStartedTransactionId(), + queryInfo.isClearTransactionId(), + queryInfo.getUpdateType(), + queryInfo.getOutputStage().map(QueryStateMachine::pruneStatsFromStageInfo), + queryInfo.getFailureInfo(), + queryInfo.getErrorCode(), + queryInfo.getWarnings(), + prunedInputs, + queryInfo.getOutput(), + queryInfo.isFinalQueryInfo(), + queryInfo.getResourceGroupId(), + queryInfo.getQueryType(), + queryInfo.getFailedTasks(), + queryInfo.getRuntimeOptimizedStages(), + queryInfo.getAddedSessionFunctions(), + queryInfo.getRemovedSessionFunctions(), + pruneHistogramsFromStatsAndCosts(queryInfo.getPlanStatsAndCosts()), + queryInfo.getOptimizerInformation(), + queryInfo.getCteInformationList(), + queryInfo.getScalarFunctions(), + queryInfo.getAggregateFunctions(), + queryInfo.getWindowFunctions(), + ImmutableList.of(), + ImmutableMap.of(), + queryInfo.getPrestoSparkExecutionContext()); + } + + private static Set pruneInputHistograms(Set inputs) + { + return inputs.stream().map(input -> new Input(input.getConnectorId(), + input.getSchema(), + input.getTable(), + input.getConnectorInfo(), + input.getColumns(), + input.getStatistics().map(tableStats -> TableStatistics.buildFrom(tableStats) + .setColumnStatistics(ImmutableMap.copyOf( + Maps.transformValues(tableStats.getColumnStatistics(), + columnStats -> ColumnStatistics.buildFrom(columnStats) + .setHistogram(Optional.empty()) + .build()))) + .build()), + input.getSerializedCommitOutput())) + .collect(toImmutableSet()); + } + + protected static StatsAndCosts pruneHistogramsFromStatsAndCosts(StatsAndCosts statsAndCosts) + { + Map newStats = statsAndCosts.getStats() + .entrySet() + .stream() + .collect(toImmutableMap(entry -> entry.getKey(), + entry -> PlanNodeStatsEstimate.buildFrom(entry.getValue()) + .addVariableStatistics(ImmutableMap.copyOf( + Maps.transformValues( + entry.getValue().getVariableStatistics(), + variableStats -> VariableStatsEstimate.buildFrom(variableStats) + .setHistogram(Optional.empty()) + .build()))) + .build())); + + return new StatsAndCosts(newStats, + statsAndCosts.getCosts()); + } + + private static StageInfo pruneStatsFromStageInfo(StageInfo stage) + { + return new StageInfo( + stage.getStageId(), + stage.getSelf(), + stage.getPlan().map(plan -> new PlanFragment( + plan.getId(), + plan.getRoot(), + plan.getVariables(), + plan.getPartitioning(), + plan.getTableScanSchedulingOrder(), + plan.getPartitioningScheme(), + plan.getStageExecutionDescriptor(), + plan.isOutputTableWriterFragment(), + plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts), + plan.getJsonRepresentation())), // Remove the plan + stage.getLatestAttemptExecutionInfo(), + stage.getPreviousAttemptsExecutionInfos(), // Remove failed attempts + stage.getSubStages().stream() + .map(QueryStateMachine::pruneStatsFromStageInfo) + .collect(toImmutableList()), // Remove the substages + stage.isRuntimeOptimized()); + } + + private static QueryInfo pruneExpiredQueryInfo(QueryInfo queryInfo, VersionedMemoryPoolId pool) + { Optional prunedOutputStage = queryInfo.getOutputStage().map(outputStage -> new StageInfo( outputStage.getStageId(), outputStage.getSelf(), Optional.empty(), // Remove the plan pruneStageExecutionInfo(outputStage.getLatestAttemptExecutionInfo()), ImmutableList.of(), // Remove failed attempts - ImmutableList.of(), - outputStage.isRuntimeOptimized())); // Remove the substages + ImmutableList.of(), // Remove the substages + outputStage.isRuntimeOptimized())); - QueryInfo prunedQueryInfo = new QueryInfo( + return new QueryInfo( queryInfo.getQueryId(), queryInfo.getSession(), queryInfo.getState(), - getMemoryPool().getId(), + pool.getId(), queryInfo.isScheduled(), queryInfo.getSelf(), queryInfo.getFieldNames(), @@ -1107,7 +1262,6 @@ public void pruneQueryInfo() ImmutableList.of(), ImmutableMap.of(), queryInfo.getPrestoSparkExecutionContext()); - finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo)); } private static StageExecutionInfo pruneStageExecutionInfo(StageExecutionInfo info) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java index e37d5c684ef6..e95a55d02f5c 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -197,7 +197,10 @@ public boolean addQuery(T execution) public void expireQuery(QueryId queryId) { tryGetQuery(queryId) - .ifPresent(expirationQueue::add); + .ifPresent(query -> { + query.pruneFinishedQueryInfo(); + expirationQueue.add(query); + }); } public long getRunningTaskCount() @@ -264,8 +267,8 @@ public int getTaskCount() } /** - * When cluster reaches max tasks limit and also a single query - * exceeds a threshold, kill this query + * When cluster reaches max tasks limit and also a single query + * exceeds a threshold, kill this query */ @VisibleForTesting void enforceTaskLimits() @@ -316,7 +319,7 @@ private void pruneExpiredQueries() if (expirationQueue.size() - count <= maxQueryHistory) { break; } - query.pruneInfo(); + query.pruneExpiredQueryInfo(); count++; } } @@ -409,6 +412,17 @@ public interface TrackedQuery void fail(Throwable cause); // XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history - void pruneInfo(); + + /** + * Prune info from finished queries which are in the expiry queue and the queue length is + * greater than {@code query.max-history} + */ + void pruneExpiredQueryInfo(); + + /** + * Prune info from finished queries which should not be kept around at all after the query + * state machine has transitioned into a finished state + */ + void pruneFinishedQueryInfo(); } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java index b9470281fe4f..cab667d73b83 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java @@ -88,6 +88,7 @@ import static com.facebook.presto.common.RuntimeMetricName.GET_CANONICAL_INFO_TIME_NANOS; import static com.facebook.presto.common.RuntimeMetricName.LOGICAL_PLANNER_TIME_NANOS; import static com.facebook.presto.common.RuntimeMetricName.OPTIMIZER_TIME_NANOS; +import static com.facebook.presto.execution.QueryStateMachine.pruneHistogramsFromStatsAndCosts; import static com.facebook.presto.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID; import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static com.facebook.presto.execution.buffer.OutputBuffers.createSpoolingOutputBuffers; @@ -745,9 +746,21 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneExpiredQueryInfo() { - stateMachine.pruneQueryInfo(); + stateMachine.pruneQueryInfoExpired(); + } + + @Override + public void pruneFinishedQueryInfo() + { + queryPlan.getAndUpdate(plan -> new Plan( + plan.getRoot(), + plan.getTypes(), + pruneHistogramsFromStatsAndCosts(plan.getStatsAndCosts()))); + // drop the reference to the scheduler since execution is finished + queryScheduler.set(null); + stateMachine.pruneQueryInfoFinished(); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java index e9046bb854ef..559e24572029 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java @@ -283,6 +283,11 @@ boolean isTerminalState(T state) return terminalStates.contains(state); } + public void clearEventListeners() + { + stateChangeListeners.clear(); + } + @VisibleForTesting List> getStateChangeListeners() { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java index 139a260aa889..6693c7ee2f19 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java @@ -217,7 +217,11 @@ public Optional getFailureReason() } @Override - public void pruneInfo() + public void pruneExpiredQueryInfo() + { } + + @Override + public void pruneFinishedQueryInfo() { } @Override diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java index ec4ee420e46b..bec6fb5c3129 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java @@ -132,6 +132,16 @@ public static Builder builder() return new Builder(); } + public static Builder buildFrom(ColumnStatistics statistics) + { + return new Builder() + .setRange(statistics.getRange()) + .setDataSize(statistics.getDataSize()) + .setNullsFraction(statistics.getNullsFraction()) + .setDistinctValuesCount(statistics.getDistinctValuesCount()) + .setHistogram(statistics.getHistogram()); + } + /** * If one of the estimates below is unspecified, the default "unknown" estimate value * (represented by floating point NaN) may cause the resulting symbol statistics diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java index dd4060c78588..c6fece6d1a66 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java @@ -116,6 +116,15 @@ public static Builder builder() return new Builder(); } + public static Builder buildFrom(TableStatistics tableStatistics) + { + return new Builder() + .setRowCount(tableStatistics.getRowCount()) + .setTotalSize(tableStatistics.getTotalSize()) + .setConfidenceLevel(tableStatistics.getConfidence()) + .setColumnStatistics(tableStatistics.getColumnStatistics()); + } + public static final class Builder { private Estimate rowCount = Estimate.unknown(); @@ -154,6 +163,13 @@ public Builder setColumnStatistics(ColumnHandle columnHandle, ColumnStatistics c return this; } + public Builder setColumnStatistics(Map columnStatistics) + { + requireNonNull(columnStatistics, "columnStatistics can not be null"); + this.columnStatisticsMap.putAll(columnStatistics); + return this; + } + public Map getColumnStatistics() { return Collections.unmodifiableMap(columnStatisticsMap); diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java index 85892ea69c1b..813794044440 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java @@ -335,7 +335,7 @@ public void testGraphvizQueryPlanOutput() assertEquals(queryCompletedEvent.getMetadata().getGraphvizPlan().get(), getOnlyElement(expected.getOnlyColumnAsSet())); } - static class EventsBuilder + public static class EventsBuilder { private ImmutableList.Builder queryCreatedEvents; private ImmutableList.Builder queryCompletedEvents; diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java index bc5798435232..f6a36429d29b 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java @@ -29,7 +29,7 @@ public class TestEventListenerPlugin { - static class TestingEventListenerPlugin + public static class TestingEventListenerPlugin implements Plugin { private final EventsBuilder eventsBuilder; @@ -46,7 +46,7 @@ public Iterable getEventListenerFactories() } } - private static class TestingEventListenerFactory + public static class TestingEventListenerFactory implements EventListenerFactory { private final EventsBuilder eventsBuilder; @@ -69,7 +69,7 @@ public EventListener create(Map config) } } - private static class TestingEventListener + public static class TestingEventListener implements EventListener { private final EventsBuilder eventsBuilder; diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java index 3ef80e08fefc..7e82c52e51bf 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java @@ -13,35 +13,58 @@ */ package com.facebook.presto.tests; +import com.facebook.presto.Session; +import com.facebook.presto.common.RuntimeStats; +import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.dispatcher.DispatchManager; +import com.facebook.presto.execution.MockQueryExecution; import com.facebook.presto.execution.QueryInfo; import com.facebook.presto.execution.QueryManager; import com.facebook.presto.execution.QueryState; +import com.facebook.presto.execution.QueryStats; +import com.facebook.presto.execution.StateMachine; +import com.facebook.presto.execution.TestEventListener.EventsBuilder; +import com.facebook.presto.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import com.facebook.presto.execution.TestingSessionContext; import com.facebook.presto.plugin.blackhole.BlackHolePlugin; import com.facebook.presto.resourceGroups.FileResourceGroupConfigurationManagerFactory; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.server.testing.TestingPrestoServer; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.PrestoWarning; import com.facebook.presto.spi.QueryId; +import com.facebook.presto.spi.WarningCode; +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.memory.MemoryPoolId; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.intellij.lang.annotations.Language; +import org.joda.time.DateTime; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.net.URI; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.execution.QueryState.FAILED; +import static com.facebook.presto.execution.QueryState.FINISHED; import static com.facebook.presto.execution.QueryState.QUEUED; import static com.facebook.presto.execution.QueryState.RUNNING; import static com.facebook.presto.execution.TestQueryRunnerUtil.createQuery; import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState; import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getSimpleQueryRunner; +import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT; +import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_POSITIONS_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_SIZE_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_SCAN_RAW_BYTES_READ_LIMIT; @@ -278,4 +301,211 @@ public void testQueryCountMetrics() Thread.sleep(1000); } } + + @Test + public void testQueryCompletedInfoNotPruned() + throws Exception + { + try (DistributedQueryRunner runner = DistributedQueryRunner.builder(TEST_SESSION) + .setNodeCount(0) + .build()) { + EventsBuilder eventsBuilder = new EventsBuilder(); + eventsBuilder.initialize(1); + TestingEventListenerPlugin testEventListenerPlugin = new TestingEventListenerPlugin(eventsBuilder); + runner.installPlugin(testEventListenerPlugin); + QueryManager manager = runner.getCoordinator().getQueryManager(); + QueryId id = runner.getCoordinator().getDispatchManager().createQueryId(); + @Language("SQL") String sql = "SELECT * FROM lineitem WHERE linenumber = 0 LIMIT 1"; + QueryInfo mockInfo = mockInfo(sql, id.toString(), FINISHED); + MockExecution exec = new MockExecution(eventsBuilder, mockInfo); + manager.createQuery(exec); + + // when the listener executes, we will verify that the query completed event exists + // when pruneInfo is called + exec.finalInfoListeners.forEach(item -> item.stateChanged(mockInfo)); + // verify we actually called pruneQueryFinished to assert that it was checked + assertEquals(exec.pruneFinishedCalls, 1); + } + } + + private static class MockExecution + extends MockQueryExecution + { + List> finalInfoListeners = new ArrayList<>(); + private final EventsBuilder eventsBuilder; + int pruneFinishedCalls; + int pruneExpiredCalls; + private final QueryInfo info; + + private MockExecution(EventsBuilder eventsBuilder, QueryInfo info) + { + this.eventsBuilder = eventsBuilder; + this.info = info; + } + + @Override + public DateTime getCreateTime() + { + return info.getQueryStats().getCreateTime(); + } + + @Override + public Duration getTotalCpuTime() + { + return info.getQueryStats().getTotalCpuTime(); + } + + @Override + public DataSize getRawInputDataSize() + { + return info.getQueryStats().getRawInputDataSize(); + } + + @Override + public DataSize getOutputDataSize() + { + return info.getQueryStats().getOutputDataSize(); + } + + @Override + public Session getSession() + { + return TEST_SESSION; + } + + @Override + public void addFinalQueryInfoListener(StateMachine.StateChangeListener stateChangeListener) + { + finalInfoListeners.add(stateChangeListener); + } + + @Override + public void pruneExpiredQueryInfo() + { + pruneExpiredCalls++; + Optional event = eventsBuilder.getQueryCompletedEvents().stream() + .filter(x -> x.getMetadata().getQueryId().equals(info.getQueryId().toString())) + .findFirst(); + // verify that the event listener was notified before prune was called + assertTrue(event.isPresent()); + } + + @Override + public void pruneFinishedQueryInfo() + { + pruneFinishedCalls++; + Optional event = eventsBuilder.getQueryCompletedEvents().stream() + .filter(x -> x.getMetadata().getQueryId().equals(info.getQueryId().toString())) + .findFirst(); + // verify that the event listener was notified before prune was called + assertTrue(event.isPresent()); + } + } + + private static QueryInfo mockInfo(String query, String queryId, QueryState state) + { + return new QueryInfo( + new QueryId(queryId), + TEST_SESSION.toSessionRepresentation(), + state, + new MemoryPoolId("reserved"), + true, + URI.create("1"), + ImmutableList.of("2", "3"), + query, + Optional.empty(), + Optional.empty(), + new QueryStats( + DateTime.parse("1991-09-06T05:00-05:30"), + DateTime.parse("1991-09-06T05:01-05:30"), + DateTime.parse("1991-09-06T05:02-05:30"), + DateTime.parse("1991-09-06T06:00-05:30"), + Duration.valueOf("8m"), + Duration.valueOf("5m"), + Duration.valueOf("7m"), + Duration.valueOf("34m"), + Duration.valueOf("5m"), + Duration.valueOf("6m"), + Duration.valueOf("35m"), + Duration.valueOf("44m"), + Duration.valueOf("9m"), + Duration.valueOf("10m"), + Duration.valueOf("11m"), + 13, + 14, + 15, + 16, + 100, + 17, + 18, + 34, + 19, + 20.0, + 43.0, + DataSize.valueOf("21GB"), + DataSize.valueOf("22GB"), + DataSize.valueOf("23GB"), + DataSize.valueOf("24GB"), + DataSize.valueOf("25GB"), + DataSize.valueOf("26GB"), + DataSize.valueOf("42GB"), + true, + Duration.valueOf("23m"), + Duration.valueOf("24m"), + Duration.valueOf("0m"), + Duration.valueOf("26m"), + true, + ImmutableSet.of(WAITING_FOR_MEMORY), + DataSize.valueOf("123MB"), + DataSize.valueOf("27GB"), + 28, + DataSize.valueOf("29GB"), + 30, + DataSize.valueOf("32GB"), + 40, + DataSize.valueOf("31GB"), + 32, + 33, + DataSize.valueOf("34GB"), + DataSize.valueOf("35GB"), + DataSize.valueOf("36GB"), + ImmutableList.of(), + ImmutableList.of(), + new RuntimeStats()), + Optional.empty(), + Optional.empty(), + ImmutableMap.of(), + ImmutableSet.of(), + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of(), + Optional.empty(), + false, + "33", + Optional.empty(), + null, + EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode(), + ImmutableList.of( + new PrestoWarning( + new WarningCode(123, "WARNING_123"), + "warning message")), + ImmutableSet.of(), + Optional.empty(), + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + ImmutableMap.of(), + ImmutableSet.of(), + StatsAndCosts.empty(), + ImmutableList.of(), + ImmutableList.of(), + ImmutableSet.of(), + ImmutableSet.of(), + ImmutableSet.of(), + ImmutableList.of(), + ImmutableMap.of(), + Optional.empty()); + } }