Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat Low Confidence, 0 estimations as unknown during joins #23047

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class SystemSessionProperties
public static final String FAST_INEQUALITY_JOINS = "fast_inequality_joins";
public static final String QUERY_PRIORITY = "query_priority";
public static final String CONFIDENCE_BASED_BROADCAST_ENABLED = "confidence_based_broadcast_enabled";
public static final String TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED = "treat_low_confidence_zero_estimation_unknown_enabled";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
Expand Down Expand Up @@ -429,6 +430,11 @@ public SystemSessionProperties(
"Enable confidence based broadcasting when enabled",
false,
false),
booleanProperty(
TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED,
"Treat low confidence zero estimations as unknowns during joins when enabled",
false,
false),
booleanProperty(
DISTRIBUTED_INDEX_JOIN,
"Distribute index joins on join keys instead of executing inline",
Expand Down Expand Up @@ -2030,6 +2036,11 @@ public static boolean confidenceBasedBroadcastEnabled(Session session)
return session.getSystemProperty(CONFIDENCE_BASED_BROADCAST_ENABLED, Boolean.class);
}

public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session session)
{
return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class);
}

public static int getHashPartitionCount(Session session)
{
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import java.util.Optional;

import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.isBelowMaxBroadcastSize;
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.mustPartition;

public class ConfidenceBasedBroadcastUtil
{
Expand All @@ -38,4 +41,30 @@ else if (leftConfidence.getConfidenceOrdinal() > rightConfidence.getConfidenceOr

return Optional.empty();
}

public static Optional<JoinNode> treatLowConfidenceZeroEstimationsAsUnknown(boolean probeSideLowConfidenceZero, boolean buildSideLowConfidenceZero, JoinNode joinNode, Rule.Context context)
{
if (buildSideLowConfidenceZero && probeSideLowConfidenceZero) {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
else if (buildSideLowConfidenceZero) {
if (isBelowMaxBroadcastSize(joinNode.flipChildren(), context) && !mustPartition(joinNode)) {
return Optional.of(joinNode.flipChildren().withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
else if (probeSideLowConfidenceZero) {
if (isBelowMaxBroadcastSize(joinNode, context) && !mustPartition(joinNode)) {
return Optional.of(joinNode.withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize;
import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled;
import static com.facebook.presto.SystemSessionProperties.treatLowConfidenceZeroEstimationAsUnknownEnabled;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput;
import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.LOW;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC;
import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.confidenceBasedBroadcast;
import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.treatLowConfidenceZeroEstimationsAsUnknown;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isSmallerThanThreshold;
import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar;
Expand Down Expand Up @@ -114,6 +117,11 @@ public static boolean isBelowMaxBroadcastSize(JoinNode joinNode, Context context

PlanNode buildSide = joinNode.getRight();
PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide);

if (treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession()) && isLowConfidenceZero(buildSide, context)) {
return false;
}

double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide);
return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes()
|| (isSizeBasedJoinDistributionTypeEnabled(context.getSession())
Expand All @@ -134,6 +142,15 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
}
}

boolean buildSideLowConfidenceZero = isLowConfidenceZero(joinNode.getRight(), context);
boolean probeSideLowConfidenceZero = isLowConfidenceZero(joinNode.getLeft(), context);
if ((buildSideLowConfidenceZero || probeSideLowConfidenceZero) && treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession())) {
Optional<JoinNode> result = treatLowConfidenceZeroEstimationsAsUnknown(probeSideLowConfidenceZero, buildSideLowConfidenceZero, joinNode, context);
if (result.isPresent()) {
return result.get();
}
}

if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
// TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here.
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
Expand Down Expand Up @@ -295,4 +312,10 @@ private PlanNodeWithCost getJoinNodeWithCost(Context context, JoinNode possibleJ
estimatedSourceDistributedTaskCount);
return new PlanNodeWithCost(cost.toPlanCost(), possibleJoinNode);
}

private static boolean isLowConfidenceZero(PlanNode planNode, Context context)
{
PlanNodeStatsEstimate statsEstimate = context.getStatsProvider().getStats(planNode);
return statsEstimate.confidenceLevel() == LOW && statsEstimate.getOutputRowCount() == 0;
}
}
Loading
Loading