Skip to content

Commit

Permalink
Aggressively prune statistics from query info after completion
Browse files Browse the repository at this point in the history
Previously, we only pruned information from old queries after
the expiration queue filled. With more the addition of histograms
query statistics can take two orders of magnitude more memory
than previous statistics. Without pruning statistics we will
much more quickly hit memory limits of the JVM.

This change aggressively prunes query plan metadata before query
expiry. This is mostly limited to query plan statistics.
  • Loading branch information
ZacBlanco committed Aug 16, 2024
1 parent 4dab583 commit e3d09ad
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ public void fail(Throwable throwable) {}
public void cancel() {}

@Override
public void pruneInfo() {}
public void pruneQueryExpired() {}

@Override
public void pruneQueryFinished() {}

@Override
public QueryId getQueryId()
Expand Down Expand Up @@ -194,5 +197,5 @@ public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{ }
{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,15 @@ public void cancel()
}

@Override
public void pruneInfo()
public void pruneQueryExpired()
{
stateMachine.pruneQueryInfo();
stateMachine.pruneQueryInfoExpired();
}

@Override
public void pruneQueryFinished()
{
stateMachine.pruneQueryInfoFinished();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,13 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneQueryExpired()
{
// no-op
}

@Override
public void pruneQueryFinished()
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public DataSize getWrittenIntermediateDataSize()
{
return DataSize.succinctBytes(0);
}

@Override
public long getOutputPositions()
{
Expand Down Expand Up @@ -290,7 +291,13 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneQueryExpired()
{
// no-op
}

@Override
public void pruneQueryFinished()
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.cost.VariableStatsEstimate;
import com.facebook.presto.execution.QueryExecution.QueryOutputInfo;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
Expand All @@ -38,13 +40,17 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.planner.CanonicalPlanWithInfo;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.transaction.TransactionInfo;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -93,6 +99,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
Expand Down Expand Up @@ -1046,28 +1054,175 @@ public QueryInfo updateQueryInfo(Optional<StageInfo> stageInfo)
* Remove large objects from the query info object graph, e.g : plan, stats, stage summaries, failed attempts
* Used when pruning expired queries from the state machine
*/
public void pruneQueryInfo()
public void pruneQueryInfoExpired()
{
Optional<QueryInfo> finalInfo = finalQueryInfo.get();
if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) {
return;
}
QueryInfo queryInfo = finalInfo.get();
QueryInfo prunedQueryInfo;

prunedQueryInfo = pruneExpiredQueryInfo(queryInfo, getMemoryPool());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

/**
* Remove the largest objects from the query info object graph, e.g : extraneous stats, costs,
* and histograms to reduce memory utilization
*/
public void pruneQueryInfoFinished()
{
Optional<QueryInfo> finalInfo = finalQueryInfo.get();
if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) {
return;
}

QueryInfo queryInfo = finalInfo.get();
QueryInfo prunedQueryInfo;

// no longer needed in the session after query finishes
session.getPlanNodeStatsMap().clear();
session.getPlanNodeCostMap().clear();
// inputs contain some statistics which should be cleared
inputs.getAndUpdate(QueryStateMachine::pruneInputHistograms);
// query listeners maintain state in their arguments which holds
// onto plan nodes and statistics. Since finalQueryInfo was
// already set it should be in a terminal state and be safe to
// clear the listeners.
finalQueryInfo.clearEventListeners();
planStatsAndCosts.getAndUpdate(stats -> Optional.ofNullable(stats)
.map(QueryStateMachine::pruneHistogramsFromStatsAndCosts)
.orElse(null));
prunedQueryInfo = pruneFinishedQueryInfo(queryInfo, inputs.get());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static QueryInfo pruneFinishedQueryInfo(QueryInfo queryInfo, Set<Input> prunedInputs)
{
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
queryInfo.getMemoryPool(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
queryInfo.getQuery(),
queryInfo.getExpandedQuery(),
queryInfo.getPreparedQuery(),
queryInfo.getQueryStats(),
queryInfo.getSetCatalog(),
queryInfo.getSetSchema(),
queryInfo.getSetSessionProperties(),
queryInfo.getResetSessionProperties(),
queryInfo.getSetRoles(),
queryInfo.getAddedPreparedStatements(),
queryInfo.getDeallocatedPreparedStatements(),
queryInfo.getStartedTransactionId(),
queryInfo.isClearTransactionId(),
queryInfo.getUpdateType(),
queryInfo.getOutputStage().map(QueryStateMachine::pruneStatsFromStageInfo),
queryInfo.getFailureInfo(),
queryInfo.getErrorCode(),
queryInfo.getWarnings(),
prunedInputs,
queryInfo.getOutput(),
queryInfo.isFinalQueryInfo(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getFailedTasks(),
queryInfo.getRuntimeOptimizedStages(),
queryInfo.getAddedSessionFunctions(),
queryInfo.getRemovedSessionFunctions(),
pruneHistogramsFromStatsAndCosts(queryInfo.getPlanStatsAndCosts()),
queryInfo.getOptimizerInformation(),
queryInfo.getCteInformationList(),
queryInfo.getScalarFunctions(),
queryInfo.getAggregateFunctions(),
queryInfo.getWindowFunctions(),
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
}

private static Set<Input> pruneInputHistograms(Set<Input> inputs)
{
return inputs.stream().map(input -> new Input(input.getConnectorId(),
input.getSchema(),
input.getTable(),
input.getConnectorInfo(),
input.getColumns(),
input.getStatistics().map(tableStats -> TableStatistics.buildFrom(tableStats)
.setColumnStatistics(ImmutableMap.copyOf(
Maps.transformValues(tableStats.getColumnStatistics(),
columnStats -> ColumnStatistics.buildFrom(columnStats)
.setHistogram(Optional.empty())
.build())))
.build()),
input.getSerializedCommitOutput()))
.collect(toImmutableSet());
}

protected static StatsAndCosts pruneHistogramsFromStatsAndCosts(StatsAndCosts statsAndCosts)
{
Map<PlanNodeId, PlanNodeStatsEstimate> newStats = statsAndCosts.getStats()
.entrySet()
.stream()
.collect(toImmutableMap(entry -> entry.getKey(),
entry -> PlanNodeStatsEstimate.buildFrom(entry.getValue())
.addVariableStatistics(ImmutableMap.copyOf(
Maps.transformValues(
entry.getValue().getVariableStatistics(),
variableStats -> VariableStatsEstimate.buildFrom(variableStats)
.setHistogram(Optional.empty())
.build())))
.build()));

return new StatsAndCosts(newStats,
statsAndCosts.getCosts());
}

private static StageInfo pruneStatsFromStageInfo(StageInfo stage)
{
return new StageInfo(
stage.getStageId(),
stage.getSelf(),
stage.getPlan().map(plan -> new PlanFragment(
plan.getId(),
plan.getRoot(),
plan.getVariables(),
plan.getPartitioning(),
plan.getTableScanSchedulingOrder(),
plan.getPartitioningScheme(),
plan.getStageExecutionDescriptor(),
plan.isOutputTableWriterFragment(),
plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts),
plan.getJsonRepresentation())), // Remove the plan
stage.getLatestAttemptExecutionInfo(),
stage.getPreviousAttemptsExecutionInfos(), // Remove failed attempts
stage.getSubStages().stream()
.map(QueryStateMachine::pruneStatsFromStageInfo)
.collect(toImmutableList()), // Remove the substages
stage.isRuntimeOptimized());
}

private static QueryInfo pruneExpiredQueryInfo(QueryInfo queryInfo, VersionedMemoryPoolId pool)
{
Optional<StageInfo> prunedOutputStage = queryInfo.getOutputStage().map(outputStage -> new StageInfo(
outputStage.getStageId(),
outputStage.getSelf(),
Optional.empty(), // Remove the plan
pruneStageExecutionInfo(outputStage.getLatestAttemptExecutionInfo()),
ImmutableList.of(), // Remove failed attempts
ImmutableList.of(),
outputStage.isRuntimeOptimized())); // Remove the substages
ImmutableList.of(), // Remove the substages
outputStage.isRuntimeOptimized()));

QueryInfo prunedQueryInfo = new QueryInfo(
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
getMemoryPool().getId(),
pool.getId(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
Expand Down Expand Up @@ -1107,7 +1262,6 @@ public void pruneQueryInfo()
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static StageExecutionInfo pruneStageExecutionInfo(StageExecutionInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ public boolean addQuery(T execution)
public void expireQuery(QueryId queryId)
{
tryGetQuery(queryId)
.ifPresent(expirationQueue::add);
.ifPresent(query -> {
query.pruneQueryFinished();
expirationQueue.add(query);
});
}

public long getRunningTaskCount()
Expand Down Expand Up @@ -264,8 +267,8 @@ public int getTaskCount()
}

/**
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
*/
@VisibleForTesting
void enforceTaskLimits()
Expand Down Expand Up @@ -316,7 +319,7 @@ private void pruneExpiredQueries()
if (expirationQueue.size() - count <= maxQueryHistory) {
break;
}
query.pruneInfo();
query.pruneQueryExpired();
count++;
}
}
Expand Down Expand Up @@ -409,6 +412,17 @@ public interface TrackedQuery
void fail(Throwable cause);

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
void pruneInfo();

/**
* Prune info from finished queries which are in the expiry queue and the queue length is
* greater than {@code query.max-history}
*/
void pruneQueryExpired();

/**
* Prune info from finished queries which should not be kept around at all after the query
* state machine has transitioned into a finished state
*/
void pruneQueryFinished();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static com.facebook.presto.common.RuntimeMetricName.GET_CANONICAL_INFO_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.LOGICAL_PLANNER_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.OPTIMIZER_TIME_NANOS;
import static com.facebook.presto.execution.QueryStateMachine.pruneHistogramsFromStatsAndCosts;
import static com.facebook.presto.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.execution.buffer.OutputBuffers.createSpoolingOutputBuffers;
Expand Down Expand Up @@ -745,9 +746,21 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneQueryExpired()
{
stateMachine.pruneQueryInfo();
stateMachine.pruneQueryInfoExpired();
}

@Override
public void pruneQueryFinished()
{
queryPlan.getAndUpdate(plan -> new Plan(
plan.getRoot(),
plan.getTypes(),
pruneHistogramsFromStatsAndCosts(plan.getStatsAndCosts())));
// drop the reference to the scheduler since execution is finished
queryScheduler.set(null);
stateMachine.pruneQueryInfoFinished();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ boolean isTerminalState(T state)
return terminalStates.contains(state);
}

public void clearEventListeners()
{
stateChangeListeners.clear();
}

@VisibleForTesting
List<StateChangeListener<T>> getStateChangeListeners()
{
Expand Down
Loading

0 comments on commit e3d09ad

Please sign in to comment.