Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggressively prune query info after completion #23257

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ public void fail(Throwable throwable) {}
public void cancel() {}

@Override
public void pruneInfo() {}
public void pruneExpiredQueryInfo() {}

@Override
public void pruneFinishedQueryInfo() {}

@Override
public QueryId getQueryId()
Expand Down Expand Up @@ -194,5 +197,5 @@ public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{ }
{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,15 @@ public void cancel()
}

@Override
public void pruneInfo()
public void pruneExpiredQueryInfo()
{
stateMachine.pruneQueryInfo();
stateMachine.pruneQueryInfoExpired();
}

@Override
public void pruneFinishedQueryInfo()
{
stateMachine.pruneQueryInfoFinished();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,13 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneExpiredQueryInfo()
{
// no-op
}

@Override
public void pruneFinishedQueryInfo()
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public DataSize getWrittenIntermediateDataSize()
{
return DataSize.succinctBytes(0);
}

@Override
public long getOutputPositions()
{
Expand Down Expand Up @@ -290,7 +291,13 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneExpiredQueryInfo()
{
// no-op
}

@Override
public void pruneFinishedQueryInfo()
{
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.cost.VariableStatsEstimate;
import com.facebook.presto.execution.QueryExecution.QueryOutputInfo;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
Expand All @@ -38,13 +40,17 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.planner.CanonicalPlanWithInfo;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.transaction.TransactionInfo;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -93,6 +99,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
Expand Down Expand Up @@ -1046,28 +1054,175 @@ public QueryInfo updateQueryInfo(Optional<StageInfo> stageInfo)
* Remove large objects from the query info object graph, e.g : plan, stats, stage summaries, failed attempts
* Used when pruning expired queries from the state machine
*/
public void pruneQueryInfo()
public void pruneQueryInfoExpired()
{
Optional<QueryInfo> finalInfo = finalQueryInfo.get();
if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) {
return;
}
QueryInfo queryInfo = finalInfo.get();
QueryInfo prunedQueryInfo;

prunedQueryInfo = pruneExpiredQueryInfo(queryInfo, getMemoryPool());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

/**
* Remove the largest objects from the query info object graph, e.g : extraneous stats, costs,
* and histograms to reduce memory utilization
*/
public void pruneQueryInfoFinished()
{
Optional<QueryInfo> finalInfo = finalQueryInfo.get();
if (!finalInfo.isPresent() || !finalInfo.get().getOutputStage().isPresent()) {
return;
}

QueryInfo queryInfo = finalInfo.get();
QueryInfo prunedQueryInfo;

// no longer needed in the session after query finishes
session.getPlanNodeStatsMap().clear();
session.getPlanNodeCostMap().clear();
// inputs contain some statistics which should be cleared
inputs.getAndUpdate(QueryStateMachine::pruneInputHistograms);
// query listeners maintain state in their arguments which holds
// onto plan nodes and statistics. Since finalQueryInfo was
// already set it should be in a terminal state and be safe to
// clear the listeners.
finalQueryInfo.clearEventListeners();
planStatsAndCosts.getAndUpdate(stats -> Optional.ofNullable(stats)
.map(QueryStateMachine::pruneHistogramsFromStatsAndCosts)
.orElse(null));
prunedQueryInfo = pruneFinishedQueryInfo(queryInfo, inputs.get());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static QueryInfo pruneFinishedQueryInfo(QueryInfo queryInfo, Set<Input> prunedInputs)
{
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
queryInfo.getMemoryPool(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
queryInfo.getQuery(),
queryInfo.getExpandedQuery(),
queryInfo.getPreparedQuery(),
queryInfo.getQueryStats(),
queryInfo.getSetCatalog(),
queryInfo.getSetSchema(),
queryInfo.getSetSessionProperties(),
queryInfo.getResetSessionProperties(),
queryInfo.getSetRoles(),
queryInfo.getAddedPreparedStatements(),
queryInfo.getDeallocatedPreparedStatements(),
queryInfo.getStartedTransactionId(),
queryInfo.isClearTransactionId(),
queryInfo.getUpdateType(),
queryInfo.getOutputStage().map(QueryStateMachine::pruneStatsFromStageInfo),
queryInfo.getFailureInfo(),
queryInfo.getErrorCode(),
queryInfo.getWarnings(),
prunedInputs,
queryInfo.getOutput(),
queryInfo.isFinalQueryInfo(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getFailedTasks(),
queryInfo.getRuntimeOptimizedStages(),
queryInfo.getAddedSessionFunctions(),
queryInfo.getRemovedSessionFunctions(),
pruneHistogramsFromStatsAndCosts(queryInfo.getPlanStatsAndCosts()),
queryInfo.getOptimizerInformation(),
queryInfo.getCteInformationList(),
queryInfo.getScalarFunctions(),
queryInfo.getAggregateFunctions(),
queryInfo.getWindowFunctions(),
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
}

private static Set<Input> pruneInputHistograms(Set<Input> inputs)
{
return inputs.stream().map(input -> new Input(input.getConnectorId(),
input.getSchema(),
input.getTable(),
input.getConnectorInfo(),
input.getColumns(),
input.getStatistics().map(tableStats -> TableStatistics.buildFrom(tableStats)
.setColumnStatistics(ImmutableMap.copyOf(
Maps.transformValues(tableStats.getColumnStatistics(),
columnStats -> ColumnStatistics.buildFrom(columnStats)
.setHistogram(Optional.empty())
.build())))
.build()),
input.getSerializedCommitOutput()))
.collect(toImmutableSet());
}

protected static StatsAndCosts pruneHistogramsFromStatsAndCosts(StatsAndCosts statsAndCosts)
{
Map<PlanNodeId, PlanNodeStatsEstimate> newStats = statsAndCosts.getStats()
.entrySet()
.stream()
.collect(toImmutableMap(entry -> entry.getKey(),
entry -> PlanNodeStatsEstimate.buildFrom(entry.getValue())
.addVariableStatistics(ImmutableMap.copyOf(
Maps.transformValues(
entry.getValue().getVariableStatistics(),
variableStats -> VariableStatsEstimate.buildFrom(variableStats)
.setHistogram(Optional.empty())
.build())))
.build()));

return new StatsAndCosts(newStats,
statsAndCosts.getCosts());
}

private static StageInfo pruneStatsFromStageInfo(StageInfo stage)
{
return new StageInfo(
stage.getStageId(),
stage.getSelf(),
stage.getPlan().map(plan -> new PlanFragment(
plan.getId(),
plan.getRoot(),
plan.getVariables(),
plan.getPartitioning(),
plan.getTableScanSchedulingOrder(),
plan.getPartitioningScheme(),
plan.getStageExecutionDescriptor(),
plan.isOutputTableWriterFragment(),
plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts),
plan.getJsonRepresentation())), // Remove the plan
stage.getLatestAttemptExecutionInfo(),
stage.getPreviousAttemptsExecutionInfos(), // Remove failed attempts
stage.getSubStages().stream()
.map(QueryStateMachine::pruneStatsFromStageInfo)
.collect(toImmutableList()), // Remove the substages
stage.isRuntimeOptimized());
}

private static QueryInfo pruneExpiredQueryInfo(QueryInfo queryInfo, VersionedMemoryPoolId pool)
{
Optional<StageInfo> prunedOutputStage = queryInfo.getOutputStage().map(outputStage -> new StageInfo(
outputStage.getStageId(),
outputStage.getSelf(),
Optional.empty(), // Remove the plan
pruneStageExecutionInfo(outputStage.getLatestAttemptExecutionInfo()),
ImmutableList.of(), // Remove failed attempts
ImmutableList.of(),
outputStage.isRuntimeOptimized())); // Remove the substages
ImmutableList.of(), // Remove the substages
outputStage.isRuntimeOptimized()));

QueryInfo prunedQueryInfo = new QueryInfo(
return new QueryInfo(
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
getMemoryPool().getId(),
pool.getId(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
Expand Down Expand Up @@ -1107,7 +1262,6 @@ public void pruneQueryInfo()
ImmutableList.of(),
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

private static StageExecutionInfo pruneStageExecutionInfo(StageExecutionInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ public boolean addQuery(T execution)
public void expireQuery(QueryId queryId)
{
tryGetQuery(queryId)
.ifPresent(expirationQueue::add);
.ifPresent(query -> {
query.pruneFinishedQueryInfo();
expirationQueue.add(query);
});
}

public long getRunningTaskCount()
Expand Down Expand Up @@ -264,8 +267,8 @@ public int getTaskCount()
}

/**
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
*/
@VisibleForTesting
void enforceTaskLimits()
Expand Down Expand Up @@ -316,7 +319,7 @@ private void pruneExpiredQueries()
if (expirationQueue.size() - count <= maxQueryHistory) {
break;
}
query.pruneInfo();
query.pruneExpiredQueryInfo();
count++;
}
}
Expand Down Expand Up @@ -409,6 +412,17 @@ public interface TrackedQuery
void fail(Throwable cause);

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
void pruneInfo();

/**
* Prune info from finished queries which are in the expiry queue and the queue length is
* greater than {@code query.max-history}
*/
void pruneExpiredQueryInfo();

/**
* Prune info from finished queries which should not be kept around at all after the query
* state machine has transitioned into a finished state
*/
void pruneFinishedQueryInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static com.facebook.presto.common.RuntimeMetricName.GET_CANONICAL_INFO_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.LOGICAL_PLANNER_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.OPTIMIZER_TIME_NANOS;
import static com.facebook.presto.execution.QueryStateMachine.pruneHistogramsFromStatsAndCosts;
import static com.facebook.presto.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.execution.buffer.OutputBuffers.createSpoolingOutputBuffers;
Expand Down Expand Up @@ -745,9 +746,21 @@ public void recordHeartbeat()
}

@Override
public void pruneInfo()
public void pruneExpiredQueryInfo()
{
stateMachine.pruneQueryInfo();
stateMachine.pruneQueryInfoExpired();
}

@Override
public void pruneFinishedQueryInfo()
{
queryPlan.getAndUpdate(plan -> new Plan(
plan.getRoot(),
plan.getTypes(),
pruneHistogramsFromStatsAndCosts(plan.getStatsAndCosts())));
// drop the reference to the scheduler since execution is finished
queryScheduler.set(null);
stateMachine.pruneQueryInfoFinished();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ boolean isTerminalState(T state)
return terminalStates.contains(state);
}

public void clearEventListeners()
{
stateChangeListeners.clear();
}

@VisibleForTesting
List<StateChangeListener<T>> getStateChangeListeners()
{
Expand Down
Loading
Loading