Skip to content

Commit

Permalink
Add session property to enforce timeout for query register in HBO opt…
Browse files Browse the repository at this point in the history
…imizer
  • Loading branch information
feilong-liu committed Aug 7, 2024
1 parent f8f4a83 commit 39e74cc
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Configuration Property Name Description
``optimizer.track-history-based-plan-statistics`` Recording the statistics of the current query as history statistics so as to be used by future queries. ``False``
``optimizer.track-history-stats-from-failed-queries`` Track history based plan statistics from complete plan fragments in failed queries. ``True``
``optimizer.history-based-optimizer-timeout`` Timeout for history based optimizer. ``10 seconds``
``optimizer.enforce-timeout-for-hbo-query-registration`` Enforce timeout for query registration in HBO optimizer ``False``
``optimizer.treat-low-confidence-zero-estimation-as-unknown`` Treat ``LOW`` confidence, zero estimations as ``UNKNOWN`` during joins. ``False``
``optimizer.confidence-based-broadcast`` Broadcast based on the confidence of the statistics that are being used, by broadcasting the side of a joinNode which ``False``
has the highest confidence statistics. If confidence is the same, then the original behavior will be followed.
Expand All @@ -61,6 +62,8 @@ Session property Name Description
``optimizer.track-history-stats-from-failed-queries`` in the current session.
``history_based_optimizer_timeout_limit`` Overrides the behavior of the configuration property ``optimizer.history-based-optimizer-timeout``
``optimizer.history-based-optimizer-timeout`` in the current session.
``enforce_history_based_optimizer_register_timeout`` Overrides the behavior of the configuration property ``optimizer.enforce-timeout-for-hbo-query-registration``
``optimizer.enforce-timeout-for-hbo-query-registration`` in the current session.
``restrict_history_based_optimization_to_complex_query`` Enable history based optimization only for complex queries, i.e. queries with join and aggregation. ``True``
``history_input_table_statistics_matching_threshold`` Overrides the behavior of the configuration property ``hbo.history-matching-threshold``
``hbo.history-matching-threshold`` in the current session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public final class SystemSessionProperties
public static final String HISTORY_BASED_OPTIMIZATION_PLAN_CANONICALIZATION_STRATEGY = "history_based_optimization_plan_canonicalization_strategy";
public static final String ENABLE_VERBOSE_HISTORY_BASED_OPTIMIZER_RUNTIME_STATS = "enable_verbose_history_based_optimizer_runtime_stats";
public static final String LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER = "log_query_plans_used_in_history_based_optimizer";
public static final String ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT = "enforce_history_based_optimizer_register_timeout";
public static final String MAX_LEAF_NODES_IN_PLAN = "max_leaf_nodes_in_plan";
public static final String LEAF_NODE_LIMIT_ENABLED = "leaf_node_limit_enabled";
public static final String PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID = "push_remote_exchange_through_group_id";
Expand Down Expand Up @@ -1594,6 +1595,11 @@ public SystemSessionProperties(
"Enable logging of query plans generated and used in history based optimizer",
featuresConfig.isLogPlansUsedInHistoryBasedOptimizer(),
false),
booleanProperty(
ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT,
"Enforce timeout for query registration in HBO optimizer",
featuresConfig.isEnforceTimeoutForHBOQueryRegistration(),
false),
new PropertyMetadata<>(
MAX_LEAF_NODES_IN_PLAN,
"Maximum number of leaf nodes in the logical plan of SQL statement",
Expand Down Expand Up @@ -3074,6 +3080,11 @@ public static boolean logQueryPlansUsedInHistoryBasedOptimizer(Session session)
return session.getSystemProperty(LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER, Boolean.class);
}

public static boolean enforceHistoryBasedOptimizerRegistrationTimeout(Session session)
{
return session.getSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, Boolean.class);
}

public static boolean shouldPushRemoteExchangeThroughGroupId(Session session)
{
return session.getSystemProperty(PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class FeaturesConfig
private Duration historyBasedOptimizerTimeout = new Duration(10, SECONDS);
private String historyBasedOptimizerPlanCanonicalizationStrategies = "IGNORE_SAFE_CONSTANTS";
private boolean logPlansUsedInHistoryBasedOptimizer;
private boolean enforceTimeoutForHBOQueryRegistration;
private boolean redistributeWrites = true;
private boolean scaleWriters;
private DataSize writerMinSize = new DataSize(32, MEGABYTE);
Expand Down Expand Up @@ -1010,6 +1011,18 @@ public FeaturesConfig setLogPlansUsedInHistoryBasedOptimizer(boolean logPlansUse
return this;
}

public boolean isEnforceTimeoutForHBOQueryRegistration()
{
return enforceTimeoutForHBOQueryRegistration;
}

@Config("optimizer.enforce-timeout-for-hbo-query-registration")
public FeaturesConfig setEnforceTimeoutForHBOQueryRegistration(boolean enforceTimeoutForHBOQueryRegistration)
{
this.enforceTimeoutForHBOQueryRegistration = enforceTimeoutForHBOQueryRegistration;
return this;
}

public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy()
{
return aggregationPartitioningMergingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.facebook.presto.SystemSessionProperties.enforceHistoryBasedOptimizerRegistrationTimeout;
import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit;
import static com.facebook.presto.SystemSessionProperties.getHistoryCanonicalPlanNodeLimit;
import static com.facebook.presto.SystemSessionProperties.restrictHistoryBasedOptimizationToComplexQuery;
import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class HistoricalStatisticsEquivalentPlanMarkingOptimizer
Expand Down Expand Up @@ -120,9 +125,25 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
}

// Fetch and cache history based statistics of all plan nodes, so no serial network calls happen later.
boolean registerSucceed = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds);
boolean registrationSucceeded = false;
if (enforceHistoryBasedOptimizerRegistrationTimeout(session)) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = executor.submit(() -> statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds));
try {
registrationSucceeded = future.get(timeoutInMilliseconds, MILLISECONDS);
}
catch (Exception ignored) {
}
finally {
executor.shutdownNow();
}
}
else {
registrationSucceeded = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds);
}

// Return original plan if timeout or registration not successful
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registerSucceed) {
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registrationSucceeded) {
logOptimizerFailure(session);
return PlanOptimizerResult.optimizerResult(plan, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class InMemoryHistoryBasedPlanStatisticsProvider
Expand Down Expand Up @@ -80,6 +81,16 @@ public void waitProcessQueryEvents()
}
}

public void noProcessQueryEvents()
{
try {
assertFalse(semaphore.tryAcquire(10, TimeUnit.SECONDS));
}
catch (InterruptedException e) {
throw new AssertionError("Expect no history statistics to be written");
}
}

// Returns boolean whether stats writing query events were processed
public boolean waitProcessQueryEventsIfAvailable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void testDefaults()
.setHistoryBasedOptimizerTimeout(new Duration(10, SECONDS))
.setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS")
.setLogPlansUsedInHistoryBasedOptimizer(false)
.setEnforceTimeoutForHBOQueryRegistration(false)
.setRedistributeWrites(true)
.setScaleWriters(false)
.setWriterMinSize(new DataSize(32, MEGABYTE))
Expand Down Expand Up @@ -331,6 +332,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.history-canonical-plan-node-limit", "2")
.put("optimizer.history-based-optimizer-plan-canonicalization-strategies", "IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS")
.put("optimizer.log-plans-used-in-history-based-optimizer", "true")
.put("optimizer.enforce-timeout-for-hbo-query-registration", "true")
.put("optimizer.history-based-optimizer-timeout", "1s")
.put("redistribute-writes", "false")
.put("scale-writers", "true")
Expand Down Expand Up @@ -543,6 +545,7 @@ public void testExplicitPropertyMappings()
.setHistoryBasedOptimizerTimeout(new Duration(1, SECONDS))
.setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS")
.setLogPlansUsedInHistoryBasedOptimizer(true)
.setEnforceTimeoutForHBOQueryRegistration(true)
.setRedistributeWrites(false)
.setScaleWriters(true)
.setWriterMinSize(new DataSize(42, GIGABYTE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.CONFIDENCE_BASED_BROADCAST_ENABLED;
import static com.facebook.presto.SystemSessionProperties.ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT;
import static com.facebook.presto.SystemSessionProperties.HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT;
import static com.facebook.presto.SystemSessionProperties.HISTORY_CANONICAL_PLAN_NODE_LIMIT;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
Expand Down Expand Up @@ -129,6 +131,88 @@ public void testHistoryBasedStatsCalculator()
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54)));
}

@Test
public void testHistoryBasedStatsCalculatorEnforceTimeOut()
{
Session sessionWithDefaultTimeoutLimit = Session.builder(createSession())
.setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true")
.build();
Session sessionWithZeroTimeoutLimit = Session.builder(createSession())
.setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true")
.setSystemProperty(HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT, "0ms")
.build();
// CBO Statistics
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// Write HBO statistics failed as we set timeout limit to be 0
executeAndNoHistoryWritten("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithZeroTimeoutLimit);
// No HBO statistics read
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// Write HBO Statistics is successful, as we use the default 10 seconds timeout limit
executeAndTrackHistory("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithDefaultTimeoutLimit);
// Read HBO statistics successfully with default timeout
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(2).withOutputSize(199)));
// Read HBO statistics fail due to timeout
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// CBO Statistics
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));

// Write HBO statistics failed as we set timeout limit to be 0
executeAndNoHistoryWritten("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithZeroTimeoutLimit);
// No HBO statistics read
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));

// Write HBO Statistics is successful, as we use the default 10 seconds timeout limit
executeAndTrackHistory("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithDefaultTimeoutLimit);
// Read HBO statistics successfully with default timeout
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(5).withOutputSize(90)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54)));

// Read HBO statistics fail due to timeout
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));
}

@Test
public void testFailedQuery()
{
Expand Down Expand Up @@ -525,6 +609,12 @@ private void executeAndTrackHistory(String sql, Session session)
getHistoryProvider().waitProcessQueryEvents();
}

private void executeAndNoHistoryWritten(String sql, Session session)
{
getQueryRunner().execute(session, sql);
getHistoryProvider().noProcessQueryEvents();
}

private InMemoryHistoryBasedPlanStatisticsProvider getHistoryProvider()
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
Expand Down

0 comments on commit 39e74cc

Please sign in to comment.