From 39e74ccfbf398edc41770c9240be26a897946b52 Mon Sep 17 00:00:00 2001 From: Feilong Liu Date: Wed, 31 Jul 2024 23:36:45 -0700 Subject: [PATCH] Add session property to enforce timeout for query register in HBO optimizer --- .../optimizer/history-based-optimization.rst | 3 + .../presto/SystemSessionProperties.java | 11 +++ .../presto/sql/analyzer/FeaturesConfig.java | 13 +++ ...tisticsEquivalentPlanMarkingOptimizer.java | 25 +++++- ...oryHistoryBasedPlanStatisticsProvider.java | 11 +++ .../sql/analyzer/TestFeaturesConfig.java | 3 + .../TestHistoryBasedStatsTracking.java | 90 +++++++++++++++++++ 7 files changed, 154 insertions(+), 2 deletions(-) diff --git a/presto-docs/src/main/sphinx/optimizer/history-based-optimization.rst b/presto-docs/src/main/sphinx/optimizer/history-based-optimization.rst index 3b07d8317cd0..eeb5884e55be 100644 --- a/presto-docs/src/main/sphinx/optimizer/history-based-optimization.rst +++ b/presto-docs/src/main/sphinx/optimizer/history-based-optimization.rst @@ -36,6 +36,7 @@ Configuration Property Name Description ``optimizer.track-history-based-plan-statistics`` Recording the statistics of the current query as history statistics so as to be used by future queries. ``False`` ``optimizer.track-history-stats-from-failed-queries`` Track history based plan statistics from complete plan fragments in failed queries. ``True`` ``optimizer.history-based-optimizer-timeout`` Timeout for history based optimizer. ``10 seconds`` +``optimizer.enforce-timeout-for-hbo-query-registration`` Enforce timeout for query registration in HBO optimizer ``False`` ``optimizer.treat-low-confidence-zero-estimation-as-unknown`` Treat ``LOW`` confidence, zero estimations as ``UNKNOWN`` during joins. ``False`` ``optimizer.confidence-based-broadcast`` Broadcast based on the confidence of the statistics that are being used, by broadcasting the side of a joinNode which ``False`` has the highest confidence statistics. If confidence is the same, then the original behavior will be followed. @@ -61,6 +62,8 @@ Session property Name Description ``optimizer.track-history-stats-from-failed-queries`` in the current session. ``history_based_optimizer_timeout_limit`` Overrides the behavior of the configuration property ``optimizer.history-based-optimizer-timeout`` ``optimizer.history-based-optimizer-timeout`` in the current session. +``enforce_history_based_optimizer_register_timeout`` Overrides the behavior of the configuration property ``optimizer.enforce-timeout-for-hbo-query-registration`` + ``optimizer.enforce-timeout-for-hbo-query-registration`` in the current session. ``restrict_history_based_optimization_to_complex_query`` Enable history based optimization only for complex queries, i.e. queries with join and aggregation. ``True`` ``history_input_table_statistics_matching_threshold`` Overrides the behavior of the configuration property ``hbo.history-matching-threshold`` ``hbo.history-matching-threshold`` in the current session. diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 66370508a81f..bdc35dd372ab 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -278,6 +278,7 @@ public final class SystemSessionProperties public static final String HISTORY_BASED_OPTIMIZATION_PLAN_CANONICALIZATION_STRATEGY = "history_based_optimization_plan_canonicalization_strategy"; public static final String ENABLE_VERBOSE_HISTORY_BASED_OPTIMIZER_RUNTIME_STATS = "enable_verbose_history_based_optimizer_runtime_stats"; public static final String LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER = "log_query_plans_used_in_history_based_optimizer"; + public static final String ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT = "enforce_history_based_optimizer_register_timeout"; public static final String MAX_LEAF_NODES_IN_PLAN = "max_leaf_nodes_in_plan"; public static final String LEAF_NODE_LIMIT_ENABLED = "leaf_node_limit_enabled"; public static final String PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID = "push_remote_exchange_through_group_id"; @@ -1594,6 +1595,11 @@ public SystemSessionProperties( "Enable logging of query plans generated and used in history based optimizer", featuresConfig.isLogPlansUsedInHistoryBasedOptimizer(), false), + booleanProperty( + ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, + "Enforce timeout for query registration in HBO optimizer", + featuresConfig.isEnforceTimeoutForHBOQueryRegistration(), + false), new PropertyMetadata<>( MAX_LEAF_NODES_IN_PLAN, "Maximum number of leaf nodes in the logical plan of SQL statement", @@ -3074,6 +3080,11 @@ public static boolean logQueryPlansUsedInHistoryBasedOptimizer(Session session) return session.getSystemProperty(LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER, Boolean.class); } + public static boolean enforceHistoryBasedOptimizerRegistrationTimeout(Session session) + { + return session.getSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, Boolean.class); + } + public static boolean shouldPushRemoteExchangeThroughGroupId(Session session) { return session.getSystemProperty(PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 23843bb5d84f..447cb970c192 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -106,6 +106,7 @@ public class FeaturesConfig private Duration historyBasedOptimizerTimeout = new Duration(10, SECONDS); private String historyBasedOptimizerPlanCanonicalizationStrategies = "IGNORE_SAFE_CONSTANTS"; private boolean logPlansUsedInHistoryBasedOptimizer; + private boolean enforceTimeoutForHBOQueryRegistration; private boolean redistributeWrites = true; private boolean scaleWriters; private DataSize writerMinSize = new DataSize(32, MEGABYTE); @@ -1010,6 +1011,18 @@ public FeaturesConfig setLogPlansUsedInHistoryBasedOptimizer(boolean logPlansUse return this; } + public boolean isEnforceTimeoutForHBOQueryRegistration() + { + return enforceTimeoutForHBOQueryRegistration; + } + + @Config("optimizer.enforce-timeout-for-hbo-query-registration") + public FeaturesConfig setEnforceTimeoutForHBOQueryRegistration(boolean enforceTimeoutForHBOQueryRegistration) + { + this.enforceTimeoutForHBOQueryRegistration = enforceTimeoutForHBOQueryRegistration; + return this; + } + public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy() { return aggregationPartitioningMergingStrategy; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HistoricalStatisticsEquivalentPlanMarkingOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HistoricalStatisticsEquivalentPlanMarkingOptimizer.java index 6e0b51c655ac..729d80f9e386 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HistoricalStatisticsEquivalentPlanMarkingOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HistoricalStatisticsEquivalentPlanMarkingOptimizer.java @@ -38,13 +38,18 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import static com.facebook.presto.SystemSessionProperties.enforceHistoryBasedOptimizerRegistrationTimeout; import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit; import static com.facebook.presto.SystemSessionProperties.getHistoryCanonicalPlanNodeLimit; import static com.facebook.presto.SystemSessionProperties.restrictHistoryBasedOptimizationToComplexQuery; import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled; import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; public class HistoricalStatisticsEquivalentPlanMarkingOptimizer @@ -120,9 +125,25 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider } // Fetch and cache history based statistics of all plan nodes, so no serial network calls happen later. - boolean registerSucceed = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds); + boolean registrationSucceeded = false; + if (enforceHistoryBasedOptimizerRegistrationTimeout(session)) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(() -> statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds)); + try { + registrationSucceeded = future.get(timeoutInMilliseconds, MILLISECONDS); + } + catch (Exception ignored) { + } + finally { + executor.shutdownNow(); + } + } + else { + registrationSucceeded = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds); + } + // Return original plan if timeout or registration not successful - if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registerSucceed) { + if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registrationSucceeded) { logOptimizerFailure(session); return PlanOptimizerResult.optimizerResult(plan, false); } diff --git a/presto-main/src/main/java/com/facebook/presto/testing/InMemoryHistoryBasedPlanStatisticsProvider.java b/presto-main/src/main/java/com/facebook/presto/testing/InMemoryHistoryBasedPlanStatisticsProvider.java index a6df49ffa7f7..e8c5df6e6d97 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/InMemoryHistoryBasedPlanStatisticsProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/InMemoryHistoryBasedPlanStatisticsProvider.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; public class InMemoryHistoryBasedPlanStatisticsProvider @@ -80,6 +81,16 @@ public void waitProcessQueryEvents() } } + public void noProcessQueryEvents() + { + try { + assertFalse(semaphore.tryAcquire(10, TimeUnit.SECONDS)); + } + catch (InterruptedException e) { + throw new AssertionError("Expect no history statistics to be written"); + } + } + // Returns boolean whether stats writing query events were processed public boolean waitProcessQueryEventsIfAvailable() { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index f928b4b798a5..a0378965e270 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -90,6 +90,7 @@ public void testDefaults() .setHistoryBasedOptimizerTimeout(new Duration(10, SECONDS)) .setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS") .setLogPlansUsedInHistoryBasedOptimizer(false) + .setEnforceTimeoutForHBOQueryRegistration(false) .setRedistributeWrites(true) .setScaleWriters(false) .setWriterMinSize(new DataSize(32, MEGABYTE)) @@ -331,6 +332,7 @@ public void testExplicitPropertyMappings() .put("optimizer.history-canonical-plan-node-limit", "2") .put("optimizer.history-based-optimizer-plan-canonicalization-strategies", "IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS") .put("optimizer.log-plans-used-in-history-based-optimizer", "true") + .put("optimizer.enforce-timeout-for-hbo-query-registration", "true") .put("optimizer.history-based-optimizer-timeout", "1s") .put("redistribute-writes", "false") .put("scale-writers", "true") @@ -543,6 +545,7 @@ public void testExplicitPropertyMappings() .setHistoryBasedOptimizerTimeout(new Duration(1, SECONDS)) .setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS") .setLogPlansUsedInHistoryBasedOptimizer(true) + .setEnforceTimeoutForHBOQueryRegistration(true) .setRedistributeWrites(false) .setScaleWriters(true) .setWriterMinSize(new DataSize(42, GIGABYTE)) diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestHistoryBasedStatsTracking.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestHistoryBasedStatsTracking.java index 5a5e6149afb6..6a82d184c53c 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestHistoryBasedStatsTracking.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestHistoryBasedStatsTracking.java @@ -50,6 +50,8 @@ import org.testng.annotations.Test; import static com.facebook.presto.SystemSessionProperties.CONFIDENCE_BASED_BROADCAST_ENABLED; +import static com.facebook.presto.SystemSessionProperties.ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT; +import static com.facebook.presto.SystemSessionProperties.HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT; import static com.facebook.presto.SystemSessionProperties.HISTORY_CANONICAL_PLAN_NODE_LIMIT; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY; @@ -129,6 +131,88 @@ public void testHistoryBasedStatsCalculator() anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54))); } + @Test + public void testHistoryBasedStatsCalculatorEnforceTimeOut() + { + Session sessionWithDefaultTimeoutLimit = Session.builder(createSession()) + .setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true") + .build(); + Session sessionWithZeroTimeoutLimit = Session.builder(createSession()) + .setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true") + .setSystemProperty(HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT, "0ms") + .build(); + // CBO Statistics + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT * FROM nation where substr(name, 1, 1) = 'A'", + anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN))); + + // Write HBO statistics failed as we set timeout limit to be 0 + executeAndNoHistoryWritten("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithZeroTimeoutLimit); + // No HBO statistics read + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT * FROM nation where substr(name, 1, 1) = 'A'", + anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN))); + + // Write HBO Statistics is successful, as we use the default 10 seconds timeout limit + executeAndTrackHistory("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithDefaultTimeoutLimit); + // Read HBO statistics successfully with default timeout + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT * FROM nation where substr(name, 1, 1) = 'A'", + anyTree(node(FilterNode.class, any()).withOutputRowCount(2).withOutputSize(199))); + // Read HBO statistics fail due to timeout + assertPlan( + sessionWithZeroTimeoutLimit, + "SELECT * FROM nation where substr(name, 1, 1) = 'A'", + anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN))); + + // CBO Statistics + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5))); + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN))); + + // Write HBO statistics failed as we set timeout limit to be 0 + executeAndNoHistoryWritten("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithZeroTimeoutLimit); + // No HBO statistics read + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5))); + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN))); + + // Write HBO Statistics is successful, as we use the default 10 seconds timeout limit + executeAndTrackHistory("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithDefaultTimeoutLimit); + // Read HBO statistics successfully with default timeout + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(5).withOutputSize(90))); + assertPlan( + sessionWithDefaultTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54))); + + // Read HBO statistics fail due to timeout + assertPlan( + sessionWithZeroTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5))); + assertPlan( + sessionWithZeroTimeoutLimit, + "SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", + anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN))); + } + @Test public void testFailedQuery() { @@ -525,6 +609,12 @@ private void executeAndTrackHistory(String sql, Session session) getHistoryProvider().waitProcessQueryEvents(); } + private void executeAndNoHistoryWritten(String sql, Session session) + { + getQueryRunner().execute(session, sql); + getHistoryProvider().noProcessQueryEvents(); + } + private InMemoryHistoryBasedPlanStatisticsProvider getHistoryProvider() { DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();