Skip to content

Commit

Permalink
Aggressively prune statistics from query info after completion
Browse files Browse the repository at this point in the history
Previously, we only pruned information from old queries after
the expiration queue filled. With more the addition of histograms
query statistics can take two orders of magnitude more memory
than previous statistics. Without pruning statistics we will
much more quickly hit memory limits of the JVM.

This change aggressively prunes query plan metadata before query
expiry. This is mostly limited to query plan statistics.
  • Loading branch information
ZacBlanco committed Jul 19, 2024
1 parent 753e8e1 commit 9165744
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.QueryId;
Expand Down Expand Up @@ -115,7 +116,7 @@ public void fail(Throwable throwable) {}
public void cancel() {}

@Override
public void pruneInfo() {}
public void pruneInfo(PruneLevel level) {}

@Override
public QueryId getQueryId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -357,9 +358,9 @@ public void cancel()
}

@Override
public void pruneInfo()
public void pruneInfo(PruneLevel level)
{
stateMachine.pruneQueryInfo();
stateMachine.pruneQueryInfo(level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.MetadataManager;
Expand Down Expand Up @@ -327,7 +328,7 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneInfo(PruneLevel level)
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
Expand Down Expand Up @@ -290,7 +291,7 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneInfo(PruneLevel level)
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.cost.VariableStatsEstimate;
import com.facebook.presto.execution.QueryExecution.QueryOutputInfo;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
Expand All @@ -38,13 +41,17 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.planner.CanonicalPlanWithInfo;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.transaction.TransactionInfo;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -93,6 +100,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
Expand Down Expand Up @@ -1046,28 +1055,166 @@ public QueryInfo updateQueryInfo(Optional<StageInfo> stageInfo)
* Remove large objects from the query info object graph, e.g : plan, stats, stage summaries, failed attempts
* Used when pruning expired queries from the state machine
*/
public void pruneQueryInfo()
public void pruneQueryInfo(PruneLevel level)
{
Optional<QueryInfo> finalInfo = finalQueryInfo.get();
if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) {
return;
}

QueryInfo queryInfo = finalInfo.get();
QueryInfo prunedQueryInfo;
switch (level) {
case FINISHED:
// no longer needed in the session after query finishes
session.getPlanNodeStatsMap().clear();
session.getPlanNodeCostMap().clear();
// inputs contain histograms which should be cleared
Set<Input> prunedInputs = pruneInputHistograms(inputs.get());
inputs.set(prunedInputs);
// query listeners maintain state in their arguments which holds
// onto plan node and statistics. Since finalQueryInfo was already
// set it should be safe to clear the listeners.
finalQueryInfo.clearEventListeners();
Optional.ofNullable(planStatsAndCosts.get())
.ifPresent(stats -> planStatsAndCosts.set(pruneHistogramsFromStatsAndCosts(stats)));
prunedQueryInfo = pruneFinishedQueryInfo(queryInfo, prunedInputs);
break;
case EXPIRED:
prunedQueryInfo = pruneExpiredQueryInfo(queryInfo, getMemoryPool());
break;
default:
prunedQueryInfo = queryInfo;
}

finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static QueryInfo pruneFinishedQueryInfo(QueryInfo queryInfo, Set<Input> prunedInputs)
{
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
queryInfo.getMemoryPool(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
queryInfo.getQuery(),
queryInfo.getExpandedQuery(),
queryInfo.getPreparedQuery(),
queryInfo.getQueryStats(),
queryInfo.getSetCatalog(),
queryInfo.getSetSchema(),
queryInfo.getSetSessionProperties(),
queryInfo.getResetSessionProperties(),
queryInfo.getSetRoles(),
queryInfo.getAddedPreparedStatements(),
queryInfo.getDeallocatedPreparedStatements(),
queryInfo.getStartedTransactionId(),
queryInfo.isClearTransactionId(),
queryInfo.getUpdateType(),
queryInfo.getOutputStage().map(QueryStateMachine::pruneStatsFromStageInfo),
queryInfo.getFailureInfo(),
queryInfo.getErrorCode(),
queryInfo.getWarnings(),
prunedInputs,
queryInfo.getOutput(),
queryInfo.isFinalQueryInfo(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getFailedTasks(),
queryInfo.getRuntimeOptimizedStages(),
queryInfo.getAddedSessionFunctions(),
queryInfo.getRemovedSessionFunctions(),
pruneHistogramsFromStatsAndCosts(queryInfo.getPlanStatsAndCosts()),
queryInfo.getOptimizerInformation(),
queryInfo.getCteInformationList(),
queryInfo.getScalarFunctions(),
queryInfo.getAggregateFunctions(),
queryInfo.getWindowsFunctions(),
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
}

private static Set<Input> pruneInputHistograms(Set<Input> inputs)
{
return inputs.stream().map(input -> new Input(input.getConnectorId(),
input.getSchema(),
input.getTable(),
input.getConnectorInfo(),
input.getColumns(),
input.getStatistics().map(tableStats -> TableStatistics.buildFrom(tableStats)
.setColumnStatistics(ImmutableMap.copyOf(
Maps.transformValues(tableStats.getColumnStatistics(),
columnStats -> ColumnStatistics.buildFrom(columnStats)
.setHistogram(Optional.empty())
.build())))
.build()),
input.getSerializedCommitOutput()))
.collect(toImmutableSet());
}

private static StatsAndCosts pruneHistogramsFromStatsAndCosts(StatsAndCosts statsAndCosts)
{
Map<PlanNodeId, PlanNodeStatsEstimate> newStats = statsAndCosts.getStats()
.entrySet()
.stream()
.collect(toImmutableMap(entry -> entry.getKey(),
entry -> PlanNodeStatsEstimate.buildFrom(entry.getValue())
.addVariableStatistics(ImmutableMap.copyOf(
Maps.transformValues(
entry.getValue().getVariableStatistics(),
variableStats -> VariableStatsEstimate.buildFrom(variableStats)
.setHistogram(Optional.empty())
.build())))
.build()));

return new StatsAndCosts(newStats,
statsAndCosts.getCosts());
}

private static StageInfo pruneStatsFromStageInfo(StageInfo stage)
{
return new StageInfo(
stage.getStageId(),
stage.getSelf(),
stage.getPlan().map(plan -> new PlanFragment(
plan.getId(),
plan.getRoot(),
plan.getVariables(),
plan.getPartitioning(),
plan.getTableScanSchedulingOrder(),
plan.getPartitioningScheme(),
plan.getStageExecutionDescriptor(),
plan.isOutputTableWriterFragment(),
plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts),
plan.getJsonRepresentation())), // Remove the plan
stage.getLatestAttemptExecutionInfo(),
stage.getPreviousAttemptsExecutionInfos(), // Remove failed attempts
stage.getSubStages().stream()
.map(QueryStateMachine::pruneStatsFromStageInfo)
.collect(toImmutableList()), // Remove the substages
stage.isRuntimeOptimized());
}

private static QueryInfo pruneExpiredQueryInfo(QueryInfo queryInfo, VersionedMemoryPoolId pool)
{
Optional<StageInfo> prunedOutputStage = queryInfo.getOutputStage().map(outputStage -> new StageInfo(
outputStage.getStageId(),
outputStage.getSelf(),
Optional.empty(), // Remove the plan
pruneStageExecutionInfo(outputStage.getLatestAttemptExecutionInfo()),
ImmutableList.of(), // Remove failed attempts
ImmutableList.of(),
outputStage.isRuntimeOptimized())); // Remove the substages
ImmutableList.of(), // Remove the substages
outputStage.isRuntimeOptimized()));

QueryInfo prunedQueryInfo = new QueryInfo(
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
getMemoryPool().getId(),
pool.getId(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
Expand Down Expand Up @@ -1107,7 +1254,6 @@ public void pruneQueryInfo()
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static StageExecutionInfo pruneStageExecutionInfo(StageExecutionInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP;
import static com.facebook.presto.execution.QueryLimit.createDurationLimit;
import static com.facebook.presto.execution.QueryLimit.getMinimum;
import static com.facebook.presto.execution.QueryTracker.PruneLevel.EXPIRED;
import static com.facebook.presto.execution.QueryTracker.PruneLevel.FINISHED;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
Expand Down Expand Up @@ -197,7 +199,10 @@ public boolean addQuery(T execution)
public void expireQuery(QueryId queryId)
{
tryGetQuery(queryId)
.ifPresent(expirationQueue::add);
.ifPresent(query -> {
query.pruneInfo(FINISHED);
expirationQueue.add(query);
});
}

public long getRunningTaskCount()
Expand Down Expand Up @@ -264,8 +269,8 @@ public int getTaskCount()
}

/**
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
*/
@VisibleForTesting
void enforceTaskLimits()
Expand Down Expand Up @@ -316,7 +321,7 @@ private void pruneExpiredQueries()
if (expirationQueue.size() - count <= maxQueryHistory) {
break;
}
query.pruneInfo();
query.pruneInfo(EXPIRED);
count++;
}
}
Expand Down Expand Up @@ -409,6 +414,20 @@ public interface TrackedQuery
void fail(Throwable cause);

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
void pruneInfo();
void pruneInfo(PruneLevel level);
}

public enum PruneLevel
{
/**
* Prune info from finished queries which should not be kept around at all after the query
* state machine has transitioned into a finished state
*/
FINISHED,
/**
* Prune info from finished queries which are in the expiry queue and the queue length is
* greater than {@code query.max-history}
*/
EXPIRED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.cost.CostCalculator;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.execution.QueryTracker.PruneLevel;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
Expand Down Expand Up @@ -745,9 +747,14 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneInfo(PruneLevel level)
{
stateMachine.pruneQueryInfo();
if (level == PruneLevel.FINISHED) {
queryPlan.getAndUpdate(plan -> new Plan(plan.getRoot(), plan.getTypes(), StatsAndCosts.empty()));
// drop the reference to the scheduler since execution is finished
queryScheduler.set(null);
}
stateMachine.pruneQueryInfo(level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ boolean isTerminalState(T state)
return terminalStates.contains(state);
}

public void clearEventListeners()
{
stateChangeListeners.clear();
}

@VisibleForTesting
List<StateChangeListener<T>> getStateChangeListeners()
{
Expand Down
Loading

0 comments on commit 9165744

Please sign in to comment.