Skip to content

Commit

Permalink
Record source information for HBO stats
Browse files Browse the repository at this point in the history
Record the type of workers (CPP, JAVA) and query ID of the queries
which produce these stats.
  • Loading branch information
feilong-liu committed Mar 26, 2024
1 parent 59bd301 commit 8b8c9e4
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;

import java.util.ArrayList;
Expand All @@ -29,26 +30,26 @@ public class HistoricalPlanStatisticsUtil
private HistoricalPlanStatisticsUtil() {}

/**
* Returns predicted plan statistics depending on historical runs
* Returns historical plan statistics entry containing predicted plan statistics depending on historical runs
*/
public static PlanStatistics getPredictedPlanStatistics(
public static Optional<HistoricalPlanStatisticsEntry> getSelectedHistoricalPlanStatisticsEntry(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics,
double historyMatchingThreshold)
{
List<HistoricalPlanStatisticsEntry> lastRunsStatistics = historicalPlanStatistics.getLastRunsStatistics();
if (lastRunsStatistics.isEmpty()) {
return PlanStatistics.empty();
return Optional.empty();
}

Optional<Integer> similarStatsIndex = getSimilarStatsIndex(historicalPlanStatistics, inputTableStatistics, historyMatchingThreshold);

if (similarStatsIndex.isPresent()) {
return lastRunsStatistics.get(similarStatsIndex.get()).getPlanStatistics();
return Optional.of(lastRunsStatistics.get(similarStatsIndex.get()));
}

// TODO: Use linear regression to predict stats if we have only 1 table.
return PlanStatistics.empty();
return Optional.empty();
}

/**
Expand All @@ -58,7 +59,8 @@ public static HistoricalPlanStatistics updatePlanStatistics(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics,
PlanStatistics current,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo)
{
List<HistoricalPlanStatisticsEntry> lastRunsStatistics = historicalPlanStatistics.getLastRunsStatistics();

Expand All @@ -69,7 +71,7 @@ public static HistoricalPlanStatistics updatePlanStatistics(
newLastRunsStatistics.remove(similarStatsIndex.get().intValue());
}

newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics));
newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics, historicalPlanStatisticsEntryInfo));
int maxLastRuns = inputTableStatistics.isEmpty() ? 1 : config.getMaxLastRunsHistory();
if (newLastRunsStatistics.size() > maxLastRuns) {
newLastRunsStatistics.remove(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;
Expand All @@ -41,7 +42,7 @@
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getPredictedPlanStatistics;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getSelectedHistoricalPlanStatisticsEntry;
import static com.facebook.presto.cost.HistoryBasedPlanStatisticsManager.historyBasedPlanCanonicalizationStrategyList;
import static com.facebook.presto.sql.planner.iterative.Plans.resolveGroupReferences;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -194,11 +195,14 @@ private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session,
if (allHashes.containsKey(strategy) && entry.getKey().getHash().isPresent() && allHashes.get(strategy).equals(entry.getKey())) {
Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(plan, session, true);
if (inputTableStatistics.isPresent()) {
PlanStatistics predictedPlanStatistics = getPredictedPlanStatistics(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
if (predictedPlanStatistics.getConfidence() > 0) {
return delegateStats.combineStats(
predictedPlanStatistics,
new HistoryBasedSourceInfo(entry.getKey().getHash(), inputTableStatistics));
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
if (historicalPlanStatisticsEntry.isPresent()) {
PlanStatistics predictedPlanStatistics = historicalPlanStatisticsEntry.get().getPlanStatistics();
if (predictedPlanStatistics.getConfidence() > 0) {
return delegateStats.combineStats(
predictedPlanStatistics,
new HistoryBasedSourceInfo(entry.getKey().getHash(), inputTableStatistics, Optional.of(historicalPlanStatisticsEntry.get().getHistoricalPlanStatisticsEntryInfo())));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.statistics.EmptyPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.CachingPlanCanonicalInfoProvider;
import com.facebook.presto.sql.planner.PlanCanonicalInfoProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +41,21 @@ public class HistoryBasedPlanStatisticsManager

private HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider = EmptyPlanStatisticsProvider.getInstance();
private boolean statisticsProviderAdded;
private final boolean isNativeExecution;
private final String serverVersion;

@Inject
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config)
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config,
FeaturesConfig featuresConfig, NodeVersion nodeVersion)
{
requireNonNull(objectMapper, "objectMapper is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.historyBasedStatisticsCacheManager = new HistoryBasedStatisticsCacheManager();
ObjectMapper newObjectMapper = objectMapper.copy().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
this.planCanonicalInfoProvider = new CachingPlanCanonicalInfoProvider(historyBasedStatisticsCacheManager, newObjectMapper, metadata);
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = featuresConfig.isNativeExecutionEnabled();
this.serverVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}

public void addHistoryBasedPlanStatisticsProviderFactory(HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider)
Expand All @@ -67,7 +74,7 @@ public HistoryBasedPlanStatisticsCalculator getHistoryBasedPlanStatisticsCalcula

public HistoryBasedPlanStatisticsTracker getHistoryBasedPlanStatisticsTracker()
{
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config);
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config, isNativeExecution, serverVersion);
}

public PlanCanonicalInfoProvider getPlanCanonicalInfoProvider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
Expand Down Expand Up @@ -78,17 +79,23 @@ public class HistoryBasedPlanStatisticsTracker
private final HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager;
private final SessionPropertyManager sessionPropertyManager;
private final HistoryBasedOptimizationConfig config;
private final boolean isNativeExecution;
private final String serverVersion;

public HistoryBasedPlanStatisticsTracker(
Supplier<HistoryBasedPlanStatisticsProvider> historyBasedPlanStatisticsProvider,
HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager,
SessionPropertyManager sessionPropertyManager,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
boolean isNativeExecution,
String serverVersion)
{
this.historyBasedPlanStatisticsProvider = requireNonNull(historyBasedPlanStatisticsProvider, "historyBasedPlanStatisticsProvider is null");
this.historyBasedStatisticsCacheManager = requireNonNull(historyBasedStatisticsCacheManager, "historyBasedStatisticsCacheManager is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = isNativeExecution;
this.serverVersion = serverVersion;
}

public void updateStatistics(QueryExecution queryExecution)
Expand Down Expand Up @@ -139,6 +146,9 @@ else if (trackStatsForFailedQueries) {
return ImmutableMap.of();
}

HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo = new HistoricalPlanStatisticsEntryInfo(
isNativeExecution ? HistoricalPlanStatisticsEntryInfo.WorkerType.CPP : HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, queryInfo.getQueryId(), serverVersion);

Map<PlanNodeId, PlanNodeStats> planNodeStatsMap = aggregateStageStats(allStages);
Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> planStatisticsMap = new HashMap<>();
Map<CanonicalPlan, PlanNodeCanonicalInfo> canonicalInfoMap = new HashMap<>();
Expand Down Expand Up @@ -216,7 +226,7 @@ else if (trackStatsForFailedQueries) {
PlanStatisticsWithSourceInfo planStatsWithSourceInfo = new PlanStatisticsWithSourceInfo(
planNode.getId(),
newPlanNodeStats,
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics)));
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics), Optional.of(historicalPlanStatisticsEntryInfo)));
planStatisticsMap.put(planNodeWithHash, planStatsWithSourceInfo);

if (isAggregation(planNode, AggregationNode.Step.FINAL) && ((AggregationNode) planNode).getAggregationId().isPresent() && trackPartialAggregationHistory(session)) {
Expand Down Expand Up @@ -317,7 +327,8 @@ public void updateStatistics(QueryInfo queryInfo)
Map<PlanNodeWithHash, HistoricalPlanStatistics> newPlanStatistics = planStatistics.entrySet().stream()
.filter(entry -> entry.getKey().getHash().isPresent() &&
entry.getValue().getSourceInfo() instanceof HistoryBasedSourceInfo &&
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getInputTableStatistics().isPresent())
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getInputTableStatistics().isPresent() &&
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getHistoricalPlanStatisticsEntryInfo().isPresent())
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> {
Expand All @@ -328,7 +339,8 @@ public void updateStatistics(QueryInfo queryInfo)
historicalPlanStatistics,
historyBasedSourceInfo.getInputTableStatistics().get(),
entry.getValue().getPlanStatistics(),
config);
config,
historyBasedSourceInfo.getHistoricalPlanStatisticsEntryInfo().get());
}));

if (!newPlanStatistics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
this.statsNormalizer = new StatsNormalizer();
this.scalarStatsCalculator = new ScalarStatsCalculator(metadata);
this.filterStatsCalculator = new FilterStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer);
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig());
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig(), featuresConfig, new NodeVersion("1"));
this.fragmentStatsProvider = new FragmentStatsProvider();
this.statsCalculator = createNewStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer, filterStatsCalculator, historyBasedPlanStatisticsManager, fragmentStatsProvider);
this.taskCountEstimator = new TaskCountEstimator(() -> nodeCountForStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
import com.facebook.presto.spi.statistics.PlanStatistics;
Expand All @@ -23,7 +26,9 @@
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getSelectedHistoricalPlanStatisticsEntry;
import static org.testng.Assert.assertEquals;

public class TestHistoricalPlanStatistics
Expand Down Expand Up @@ -97,16 +102,18 @@ private static HistoricalPlanStatistics updatePlanStatistics(
historicalPlanStatistics,
inputTableStatistics,
current,
new HistoryBasedOptimizationConfig());
new HistoryBasedOptimizationConfig(),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0"), "test"));
}

private static PlanStatistics getPredictedPlanStatistics(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics)
{
return HistoricalPlanStatisticsUtil.getPredictedPlanStatistics(
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry(
historicalPlanStatistics,
inputTableStatistics,
0.1);
return historicalPlanStatisticsEntry.isPresent() ? historicalPlanStatisticsEntry.get().getPlanStatistics() : PlanStatistics.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.cost;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
Expand Down Expand Up @@ -126,7 +128,8 @@ public Map<PlanNodeWithHash, HistoricalPlanStatistics> getStats(List<PlanNodeWit
if (node.getTable().toString().contains("orders")) {
return new HistoricalPlanStatistics(ImmutableList.of(new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(100), Estimate.of(1000), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())))));
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0"), "test"))));
}
}
return HistoricalPlanStatistics.empty();
Expand Down
Loading

0 comments on commit 8b8c9e4

Please sign in to comment.