From 9165744d9656bb7f3b996d72d64481d30159c7ec Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Thu, 18 Jul 2024 17:02:34 -0700 Subject: [PATCH] Aggressively prune statistics from query info after completion Previously, we only pruned information from old queries after the expiration queue filled. With more the addition of histograms query statistics can take two orders of magnitude more memory than previous statistics. Without pruning statistics we will much more quickly hit memory limits of the JVM. This change aggressively prunes query plan metadata before query expiry. This is mostly limited to query plan statistics. --- .../dispatcher/FailedDispatchQuery.java | 3 +- .../presto/dispatcher/LocalDispatchQuery.java | 5 +- .../AccessControlCheckerExecution.java | 3 +- .../execution/DataDefinitionExecution.java | 3 +- .../presto/execution/QueryStateMachine.java | 158 +++++++++++++++++- .../presto/execution/QueryTracker.java | 29 +++- .../presto/execution/SqlQueryExecution.java | 11 +- .../presto/execution/StateMachine.java | 5 + .../presto/execution/MockQueryExecution.java | 3 +- .../spi/statistics/ColumnStatistics.java | 10 ++ .../spi/statistics/TableStatistics.java | 16 ++ 11 files changed, 227 insertions(+), 19 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java index 3c828ea620791..fda714029b826 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java @@ -17,6 +17,7 @@ import com.facebook.presto.common.ErrorCode; import com.facebook.presto.execution.ExecutionFailureInfo; import com.facebook.presto.execution.QueryState; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.spi.QueryId; @@ -115,7 +116,7 @@ public void fail(Throwable throwable) {} public void cancel() {} @Override - public void pruneInfo() {} + public void pruneInfo(PruneLevel level) {} @Override public QueryId getQueryId() diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java index 4babf8a95575e..a5da6e3162a62 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.QueryExecution; import com.facebook.presto.execution.QueryState; import com.facebook.presto.execution.QueryStateMachine; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.spi.PrestoException; @@ -357,9 +358,9 @@ public void cancel() } @Override - public void pruneInfo() + public void pruneInfo(PruneLevel level) { - stateMachine.pruneQueryInfo(); + stateMachine.pruneQueryInfo(level); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java index 2fb96b05b2f0f..b41d19ecd7483 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/AccessControlCheckerExecution.java @@ -16,6 +16,7 @@ import com.facebook.presto.Session; import com.facebook.presto.common.analyzer.PreparedQuery; import com.facebook.presto.common.resourceGroups.QueryType; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.memory.VersionedMemoryPoolId; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; @@ -327,7 +328,7 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneInfo(PruneLevel level) { // no-op } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java index a130b447516f3..b17bfc37cc559 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java @@ -14,6 +14,7 @@ package com.facebook.presto.execution; import com.facebook.presto.Session; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.memory.VersionedMemoryPoolId; import com.facebook.presto.metadata.Metadata; @@ -290,7 +291,7 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneInfo(PruneLevel level) { // no-op } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index 2edb257819d81..5b042b6eec2ac 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -19,8 +19,11 @@ import com.facebook.presto.common.resourceGroups.QueryType; import com.facebook.presto.common.transaction.TransactionId; import com.facebook.presto.common.type.Type; +import com.facebook.presto.cost.PlanNodeStatsEstimate; import com.facebook.presto.cost.StatsAndCosts; +import com.facebook.presto.cost.VariableStatsEstimate; import com.facebook.presto.execution.QueryExecution.QueryOutputInfo; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.memory.VersionedMemoryPoolId; import com.facebook.presto.metadata.Metadata; @@ -38,13 +41,17 @@ import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.spi.security.AccessControl; import com.facebook.presto.spi.security.SelectedRole; +import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.sql.planner.CanonicalPlanWithInfo; +import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.transaction.TransactionInfo; import com.facebook.presto.transaction.TransactionManager; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.google.common.util.concurrent.FutureCallback; @@ -93,6 +100,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.succinctBytes; import static java.lang.String.format; @@ -1046,7 +1055,7 @@ public QueryInfo updateQueryInfo(Optional stageInfo) * Remove large objects from the query info object graph, e.g : plan, stats, stage summaries, failed attempts * Used when pruning expired queries from the state machine */ - public void pruneQueryInfo() + public void pruneQueryInfo(PruneLevel level) { Optional finalInfo = finalQueryInfo.get(); if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) { @@ -1054,20 +1063,158 @@ public void pruneQueryInfo() } QueryInfo queryInfo = finalInfo.get(); + QueryInfo prunedQueryInfo; + switch (level) { + case FINISHED: + // no longer needed in the session after query finishes + session.getPlanNodeStatsMap().clear(); + session.getPlanNodeCostMap().clear(); + // inputs contain histograms which should be cleared + Set prunedInputs = pruneInputHistograms(inputs.get()); + inputs.set(prunedInputs); + // query listeners maintain state in their arguments which holds + // onto plan node and statistics. Since finalQueryInfo was already + // set it should be safe to clear the listeners. + finalQueryInfo.clearEventListeners(); + Optional.ofNullable(planStatsAndCosts.get()) + .ifPresent(stats -> planStatsAndCosts.set(pruneHistogramsFromStatsAndCosts(stats))); + prunedQueryInfo = pruneFinishedQueryInfo(queryInfo, prunedInputs); + break; + case EXPIRED: + prunedQueryInfo = pruneExpiredQueryInfo(queryInfo, getMemoryPool()); + break; + default: + prunedQueryInfo = queryInfo; + } + + finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo)); + } + + private static QueryInfo pruneFinishedQueryInfo(QueryInfo queryInfo, Set prunedInputs) + { + return new QueryInfo( + queryInfo.getQueryId(), + queryInfo.getSession(), + queryInfo.getState(), + queryInfo.getMemoryPool(), + queryInfo.isScheduled(), + queryInfo.getSelf(), + queryInfo.getFieldNames(), + queryInfo.getQuery(), + queryInfo.getExpandedQuery(), + queryInfo.getPreparedQuery(), + queryInfo.getQueryStats(), + queryInfo.getSetCatalog(), + queryInfo.getSetSchema(), + queryInfo.getSetSessionProperties(), + queryInfo.getResetSessionProperties(), + queryInfo.getSetRoles(), + queryInfo.getAddedPreparedStatements(), + queryInfo.getDeallocatedPreparedStatements(), + queryInfo.getStartedTransactionId(), + queryInfo.isClearTransactionId(), + queryInfo.getUpdateType(), + queryInfo.getOutputStage().map(QueryStateMachine::pruneStatsFromStageInfo), + queryInfo.getFailureInfo(), + queryInfo.getErrorCode(), + queryInfo.getWarnings(), + prunedInputs, + queryInfo.getOutput(), + queryInfo.isFinalQueryInfo(), + queryInfo.getResourceGroupId(), + queryInfo.getQueryType(), + queryInfo.getFailedTasks(), + queryInfo.getRuntimeOptimizedStages(), + queryInfo.getAddedSessionFunctions(), + queryInfo.getRemovedSessionFunctions(), + pruneHistogramsFromStatsAndCosts(queryInfo.getPlanStatsAndCosts()), + queryInfo.getOptimizerInformation(), + queryInfo.getCteInformationList(), + queryInfo.getScalarFunctions(), + queryInfo.getAggregateFunctions(), + queryInfo.getWindowsFunctions(), + ImmutableList.of(), + ImmutableMap.of(), + queryInfo.getPrestoSparkExecutionContext()); + } + + private static Set pruneInputHistograms(Set inputs) + { + return inputs.stream().map(input -> new Input(input.getConnectorId(), + input.getSchema(), + input.getTable(), + input.getConnectorInfo(), + input.getColumns(), + input.getStatistics().map(tableStats -> TableStatistics.buildFrom(tableStats) + .setColumnStatistics(ImmutableMap.copyOf( + Maps.transformValues(tableStats.getColumnStatistics(), + columnStats -> ColumnStatistics.buildFrom(columnStats) + .setHistogram(Optional.empty()) + .build()))) + .build()), + input.getSerializedCommitOutput())) + .collect(toImmutableSet()); + } + + private static StatsAndCosts pruneHistogramsFromStatsAndCosts(StatsAndCosts statsAndCosts) + { + Map newStats = statsAndCosts.getStats() + .entrySet() + .stream() + .collect(toImmutableMap(entry -> entry.getKey(), + entry -> PlanNodeStatsEstimate.buildFrom(entry.getValue()) + .addVariableStatistics(ImmutableMap.copyOf( + Maps.transformValues( + entry.getValue().getVariableStatistics(), + variableStats -> VariableStatsEstimate.buildFrom(variableStats) + .setHistogram(Optional.empty()) + .build()))) + .build())); + + return new StatsAndCosts(newStats, + statsAndCosts.getCosts()); + } + + private static StageInfo pruneStatsFromStageInfo(StageInfo stage) + { + return new StageInfo( + stage.getStageId(), + stage.getSelf(), + stage.getPlan().map(plan -> new PlanFragment( + plan.getId(), + plan.getRoot(), + plan.getVariables(), + plan.getPartitioning(), + plan.getTableScanSchedulingOrder(), + plan.getPartitioningScheme(), + plan.getStageExecutionDescriptor(), + plan.isOutputTableWriterFragment(), + plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts), + plan.getJsonRepresentation())), // Remove the plan + stage.getLatestAttemptExecutionInfo(), + stage.getPreviousAttemptsExecutionInfos(), // Remove failed attempts + stage.getSubStages().stream() + .map(QueryStateMachine::pruneStatsFromStageInfo) + .collect(toImmutableList()), // Remove the substages + stage.isRuntimeOptimized()); + } + + private static QueryInfo pruneExpiredQueryInfo(QueryInfo queryInfo, VersionedMemoryPoolId pool) + { Optional prunedOutputStage = queryInfo.getOutputStage().map(outputStage -> new StageInfo( outputStage.getStageId(), outputStage.getSelf(), Optional.empty(), // Remove the plan pruneStageExecutionInfo(outputStage.getLatestAttemptExecutionInfo()), ImmutableList.of(), // Remove failed attempts - ImmutableList.of(), - outputStage.isRuntimeOptimized())); // Remove the substages + ImmutableList.of(), // Remove the substages + outputStage.isRuntimeOptimized())); - QueryInfo prunedQueryInfo = new QueryInfo( + return new QueryInfo( queryInfo.getQueryId(), queryInfo.getSession(), queryInfo.getState(), - getMemoryPool().getId(), + pool.getId(), queryInfo.isScheduled(), queryInfo.getSelf(), queryInfo.getFieldNames(), @@ -1107,7 +1254,6 @@ public void pruneQueryInfo() ImmutableList.of(), ImmutableMap.of(), queryInfo.getPrestoSparkExecutionContext()); - finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo)); } private static StageExecutionInfo pruneStageExecutionInfo(StageExecutionInfo info) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java index e37d5c684ef61..571b4478b82f4 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -48,6 +48,8 @@ import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP; import static com.facebook.presto.execution.QueryLimit.createDurationLimit; import static com.facebook.presto.execution.QueryLimit.getMinimum; +import static com.facebook.presto.execution.QueryTracker.PruneLevel.EXPIRED; +import static com.facebook.presto.execution.QueryTracker.PruneLevel.FINISHED; import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; @@ -197,7 +199,10 @@ public boolean addQuery(T execution) public void expireQuery(QueryId queryId) { tryGetQuery(queryId) - .ifPresent(expirationQueue::add); + .ifPresent(query -> { + query.pruneInfo(FINISHED); + expirationQueue.add(query); + }); } public long getRunningTaskCount() @@ -264,8 +269,8 @@ public int getTaskCount() } /** - * When cluster reaches max tasks limit and also a single query - * exceeds a threshold, kill this query + * When cluster reaches max tasks limit and also a single query + * exceeds a threshold, kill this query */ @VisibleForTesting void enforceTaskLimits() @@ -316,7 +321,7 @@ private void pruneExpiredQueries() if (expirationQueue.size() - count <= maxQueryHistory) { break; } - query.pruneInfo(); + query.pruneInfo(EXPIRED); count++; } } @@ -409,6 +414,20 @@ public interface TrackedQuery void fail(Throwable cause); // XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history - void pruneInfo(); + void pruneInfo(PruneLevel level); + } + + public enum PruneLevel + { + /** + * Prune info from finished queries which should not be kept around at all after the query + * state machine has transitioned into a finished state + */ + FINISHED, + /** + * Prune info from finished queries which are in the expiry queue and the queue length is + * greater than {@code query.max-history} + */ + EXPIRED } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java index 84f4a22db4ae7..6e96d8cf5d9a2 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java @@ -19,7 +19,9 @@ import com.facebook.presto.common.resourceGroups.QueryType; import com.facebook.presto.cost.CostCalculator; import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager; +import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.cost.StatsCalculator; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; @@ -745,9 +747,14 @@ public void recordHeartbeat() } @Override - public void pruneInfo() + public void pruneInfo(PruneLevel level) { - stateMachine.pruneQueryInfo(); + if (level == PruneLevel.FINISHED) { + queryPlan.getAndUpdate(plan -> new Plan(plan.getRoot(), plan.getTypes(), StatsAndCosts.empty())); + // drop the reference to the scheduler since execution is finished + queryScheduler.set(null); + } + stateMachine.pruneQueryInfo(level); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java index e9046bb854ef5..559e245720291 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StateMachine.java @@ -283,6 +283,11 @@ boolean isTerminalState(T state) return terminalStates.contains(state); } + public void clearEventListeners() + { + stateChangeListeners.clear(); + } + @VisibleForTesting List> getStateChangeListeners() { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java index 139a260aa889f..bd68d42115d5f 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java @@ -14,6 +14,7 @@ package com.facebook.presto.execution; import com.facebook.presto.Session; +import com.facebook.presto.execution.QueryTracker.PruneLevel; import com.facebook.presto.memory.VersionedMemoryPoolId; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.spi.QueryId; @@ -217,7 +218,7 @@ public Optional getFailureReason() } @Override - public void pruneInfo() + public void pruneInfo(PruneLevel level) { } @Override diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java index ec4ee420e46bd..bec6fb5c31299 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/ColumnStatistics.java @@ -132,6 +132,16 @@ public static Builder builder() return new Builder(); } + public static Builder buildFrom(ColumnStatistics statistics) + { + return new Builder() + .setRange(statistics.getRange()) + .setDataSize(statistics.getDataSize()) + .setNullsFraction(statistics.getNullsFraction()) + .setDistinctValuesCount(statistics.getDistinctValuesCount()) + .setHistogram(statistics.getHistogram()); + } + /** * If one of the estimates below is unspecified, the default "unknown" estimate value * (represented by floating point NaN) may cause the resulting symbol statistics diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java index dd4060c785883..c6fece6d1a66c 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java @@ -116,6 +116,15 @@ public static Builder builder() return new Builder(); } + public static Builder buildFrom(TableStatistics tableStatistics) + { + return new Builder() + .setRowCount(tableStatistics.getRowCount()) + .setTotalSize(tableStatistics.getTotalSize()) + .setConfidenceLevel(tableStatistics.getConfidence()) + .setColumnStatistics(tableStatistics.getColumnStatistics()); + } + public static final class Builder { private Estimate rowCount = Estimate.unknown(); @@ -154,6 +163,13 @@ public Builder setColumnStatistics(ColumnHandle columnHandle, ColumnStatistics c return this; } + public Builder setColumnStatistics(Map columnStatistics) + { + requireNonNull(columnStatistics, "columnStatistics can not be null"); + this.columnStatisticsMap.putAll(columnStatistics); + return this; + } + public Map getColumnStatistics() { return Collections.unmodifiableMap(columnStatisticsMap);