Skip to content

Commit

Permalink
Treat low confidence, zero estimations as unknown during joins
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmuk04 committed Jul 12, 2024
1 parent 5b1cde7 commit 3aff711
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class SystemSessionProperties
public static final String FAST_INEQUALITY_JOINS = "fast_inequality_joins";
public static final String QUERY_PRIORITY = "query_priority";
public static final String CONFIDENCE_BASED_BROADCAST_ENABLED = "confidence_based_broadcast_enabled";
public static final String TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED = "treat_low_confidence_zero_estimation_unknown_enabled";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
Expand Down Expand Up @@ -429,6 +430,11 @@ public SystemSessionProperties(
"Enable confidence based broadcasting when enabled",
false,
false),
booleanProperty(
TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED,
"Treat low confidence zero estimations as unknowns during joins when enabled",
false,
false),
booleanProperty(
DISTRIBUTED_INDEX_JOIN,
"Distribute index joins on join keys instead of executing inline",
Expand Down Expand Up @@ -2030,6 +2036,11 @@ public static boolean confidenceBasedBroadcastEnabled(Session session)
return session.getSystemProperty(CONFIDENCE_BASED_BROADCAST_ENABLED, Boolean.class);
}

public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session session)
{
return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class);
}

public static int getHashPartitionCount(Session session)
{
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.iterative;

import com.facebook.presto.sql.planner.iterative.Rule.Context;
import com.facebook.presto.sql.planner.plan.JoinNode;

import java.util.Optional;

import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.isBelowMaxBroadcastSize;
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.mustPartition;

public class LowConfidenceZeroEstimationUtil
{
private LowConfidenceZeroEstimationUtil() {};

public static Optional<JoinNode> treatLowConfidenceZeroEstimationsAsUnknown(boolean probeSideUnknown, boolean buildSideUnknown, JoinNode joinNode, Context context)
{
if (buildSideUnknown && probeSideUnknown) {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
else if (buildSideUnknown) {
if (isBelowMaxBroadcastSize(joinNode.flipChildren(), context) && !mustPartition(joinNode)) {
return Optional.of(joinNode.flipChildren().withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
else {
if (isBelowMaxBroadcastSize(joinNode, context) && !mustPartition(joinNode)) {
return Optional.of(joinNode.withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize;
import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled;
import static com.facebook.presto.SystemSessionProperties.treatLowConfidenceZeroEstimationAsUnknownEnabled;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput;
import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.LOW;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC;
import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.confidenceBasedBroadcast;
import static com.facebook.presto.sql.planner.iterative.LowConfidenceZeroEstimationUtil.treatLowConfidenceZeroEstimationsAsUnknown;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isSmallerThanThreshold;
import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar;
Expand Down Expand Up @@ -114,6 +117,11 @@ public static boolean isBelowMaxBroadcastSize(JoinNode joinNode, Context context

PlanNode buildSide = joinNode.getRight();
PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide);

if (treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession()) && buildSideStatsEstimate.confidenceLevel() == LOW && buildSideStatsEstimate.getOutputRowCount() == 0) {
return false;
}

double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide);
return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes()
|| (isSizeBasedJoinDistributionTypeEnabled(context.getSession())
Expand All @@ -134,6 +142,15 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
}
}

boolean buildSideUnknown = context.getStatsProvider().getStats(joinNode.getRight()).confidenceLevel() == LOW && context.getStatsProvider().getStats(joinNode.getRight()).getOutputRowCount() == 0;
boolean probeSideUnknown = context.getStatsProvider().getStats(joinNode.getLeft()).confidenceLevel() == LOW && context.getStatsProvider().getStats(joinNode.getLeft()).getOutputRowCount() == 0;
if ((buildSideUnknown || probeSideUnknown) && treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession())) {
Optional<JoinNode> result = treatLowConfidenceZeroEstimationsAsUnknown(probeSideUnknown, buildSideUnknown, joinNode, context);
if (result.isPresent()) {
return result.get();
}
}

if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
// TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here.
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
Expand Down
Loading

0 comments on commit 3aff711

Please sign in to comment.