Skip to content

Commit

Permalink
Treat low confidence, zero estimations as unknown during joins
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmuk04 committed Jul 11, 2024
1 parent 5b1cde7 commit 49de2c9
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class SystemSessionProperties
public static final String FAST_INEQUALITY_JOINS = "fast_inequality_joins";
public static final String QUERY_PRIORITY = "query_priority";
public static final String CONFIDENCE_BASED_BROADCAST_ENABLED = "confidence_based_broadcast_enabled";
public static final String TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED = "treat_low_confidence_zero_estimation_unknown_enabled";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
Expand Down Expand Up @@ -429,6 +430,11 @@ public SystemSessionProperties(
"Enable confidence based broadcasting when enabled",
false,
false),
booleanProperty(
TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED,
"Treat low confidence zero estimations as unknowns during joins when enabled",
false,
false),
booleanProperty(
DISTRIBUTED_INDEX_JOIN,
"Distribute index joins on join keys instead of executing inline",
Expand Down Expand Up @@ -2030,6 +2036,11 @@ public static boolean confidenceBasedBroadcastEnabled(Session session)
return session.getSystemProperty(CONFIDENCE_BASED_BROADCAST_ENABLED, Boolean.class);
}

public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session session)
{
return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class);
}

public static int getHashPartitionCount(Session session)
{
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.iterative;

import com.facebook.presto.sql.planner.iterative.Rule.Context;
import com.facebook.presto.sql.planner.plan.JoinNode;

import java.util.Optional;

import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.isBelowMaxBroadcastSize;

public class LowConfidenceZeroEstimationUtil
{
private LowConfidenceZeroEstimationUtil() {}

;

public static Optional<JoinNode> treatLowConfidenceZeroEstimationsAsUnknown(boolean probeSideUnknown, boolean buildSideUnknown, JoinNode joinNode, Context context)
{
if (buildSideUnknown && probeSideUnknown) {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
else if (buildSideUnknown) {
if (isBelowMaxBroadcastSize(joinNode.flipChildren(), context)) {
return Optional.of(joinNode.flipChildren().withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
else {
if (isBelowMaxBroadcastSize(joinNode, context)) {
return Optional.of(joinNode.withDistributionType(REPLICATED));
}
else {
return Optional.of(joinNode.withDistributionType(PARTITIONED));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize;
import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled;
import static com.facebook.presto.SystemSessionProperties.treatLowConfidenceZeroEstimationAsUnknownEnabled;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput;
import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.LOW;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC;
import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.confidenceBasedBroadcast;
import static com.facebook.presto.sql.planner.iterative.LowConfidenceZeroEstimationUtil.treatLowConfidenceZeroEstimationsAsUnknown;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isSmallerThanThreshold;
import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar;
Expand Down Expand Up @@ -114,6 +117,11 @@ public static boolean isBelowMaxBroadcastSize(JoinNode joinNode, Context context

PlanNode buildSide = joinNode.getRight();
PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide);

if (buildSideStatsEstimate.confidenceLevel() == LOW && buildSideStatsEstimate.getOutputRowCount() == 0) {
return false;
}

double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide);
return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes()
|| (isSizeBasedJoinDistributionTypeEnabled(context.getSession())
Expand All @@ -134,6 +142,15 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
}
}

boolean buildSideUnknown = context.getStatsProvider().getStats(joinNode.getRight()).confidenceLevel() == LOW && context.getStatsProvider().getStats(joinNode.getRight()).getOutputRowCount() == 0;
boolean probeSideUnknown = context.getStatsProvider().getStats(joinNode.getLeft()).confidenceLevel() == LOW && context.getStatsProvider().getStats(joinNode.getLeft()).getOutputRowCount() == 0;
if ((buildSideUnknown || probeSideUnknown) && treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession())) {
Optional<JoinNode> result = treatLowConfidenceZeroEstimationsAsUnknown(probeSideUnknown, buildSideUnknown, joinNode, context);
if (result.isPresent()) {
return result.get();
}
}

if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
// TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here.
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.facebook.presto.SystemSessionProperties.CONFIDENCE_BASED_BROADCAST_ENABLED;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE;
import static com.facebook.presto.SystemSessionProperties.TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED;
import static com.facebook.presto.SystemSessionProperties.USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
Expand Down Expand Up @@ -460,6 +461,181 @@ public void testLeftAndRightHighConfidenceLeftSmaller()
values(ImmutableMap.of("A1", 0, "A2", 1))));
}

@Test
public void testJoinWithRightSideLowConfidenceZeroStatisticsLeftSideHighBroadcast()
{
int aRows = 50;
int bRows = 0;
assertDetermineJoinDistributionType()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true")
.overrideStats("valuesA", PlanNodeStatsEstimate.builder()
.setConfidence(HIGH)
.setOutputRowCount(aRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.overrideStats("valuesB", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(bRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.on(p ->
p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)),
p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)),
ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))),
ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)),
Optional.empty()))
.matches(join(
INNER,
ImmutableList.of(equiJoinClause("B1", "A1")),
Optional.empty(),
Optional.of(REPLICATED),
values(ImmutableMap.of("B1", 0)),
values(ImmutableMap.of("A1", 0))));
}

@Test
public void testJoinWithLeftSideLowConfidenceZeroStatisticsRightSideHighBroadcast()
{
int aRows = 0;
int bRows = 50;
assertDetermineJoinDistributionType()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true")
.overrideStats("valuesA", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(aRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.overrideStats("valuesB", PlanNodeStatsEstimate.builder()
.setConfidence(HIGH)
.setOutputRowCount(bRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.on(p ->
p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)),
p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)),
ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))),
ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)),
Optional.empty()))
.matches(join(
INNER,
ImmutableList.of(equiJoinClause("A1", "B1")),
Optional.empty(),
Optional.of(REPLICATED),
values(ImmutableMap.of("A1", 0)),
values(ImmutableMap.of("B1", 0))));
}

@Test
public void testJoinWithLeftSideLowConfidenceZeroStatisticsRightSidePartition()
{
int aRows = 0;
int bRows = 50_000;
assertDetermineJoinDistributionType()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true")
.overrideStats("valuesA", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(aRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.overrideStats("valuesB", PlanNodeStatsEstimate.builder()
.setConfidence(HIGH)
.setOutputRowCount(bRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.on(p ->
p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)),
p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)),
ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))),
ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)),
Optional.empty()))
.matches(join(
INNER,
ImmutableList.of(equiJoinClause("A1", "B1")),
Optional.empty(),
Optional.of(PARTITIONED),
values(ImmutableMap.of("A1", 0)),
values(ImmutableMap.of("B1", 0))));
}

@Test
public void testJoinWithRightSideLowConfidenceZeroStatisticsLeftSidePartition()
{
int aRows = 50_000;
int bRows = 0;
assertDetermineJoinDistributionType()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true")
.overrideStats("valuesA", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(aRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.overrideStats("valuesB", PlanNodeStatsEstimate.builder()
.setConfidence(HIGH)
.setOutputRowCount(bRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.on(p ->
p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)),
p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)),
ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))),
ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)),
Optional.empty()))
.matches(join(
INNER,
ImmutableList.of(equiJoinClause("A1", "B1")),
Optional.empty(),
Optional.of(REPLICATED),
values(ImmutableMap.of("A1", 0)),
values(ImmutableMap.of("B1", 0))));
}

@Test
public void testJoinWithBothSideLowConfidenceZeroStatisticsPartition()
{
int aRows = 0;
int bRows = 0;
assertDetermineJoinDistributionType()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true")
.overrideStats("valuesA", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(aRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.overrideStats("valuesB", PlanNodeStatsEstimate.builder()
.setConfidence(LOW)
.setOutputRowCount(bRows)
.addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100)))
.build())
.on(p ->
p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)),
p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)),
ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))),
ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)),
Optional.empty()))
.matches(join(
INNER,
ImmutableList.of(equiJoinClause("A1", "B1")),
Optional.empty(),
Optional.of(PARTITIONED),
values(ImmutableMap.of("A1", 0)),
values(ImmutableMap.of("B1", 0))));
}

@Test
public void testFlipAndReplicateWhenOneTableMuchSmaller()
{
Expand Down

0 comments on commit 49de2c9

Please sign in to comment.