Skip to content

Commit

Permalink
Add UTC-based optimizations in DeltaLakeMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
kasiafi committed Nov 27, 2023
1 parent b60c9b5 commit 3964149
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.testing.LocalQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
Expand Down Expand Up @@ -501,6 +502,34 @@ public QueryAssert isFullyPushedDown()
return this;
}

/**
* Verifies query is fully pushed down and Table Scan is replaced with empty Values.
* Verifies that results are the same as when pushdown is fully disabled.
*/
@CanIgnoreReturnValue
public QueryAssert isReplacedWithEmptyValues()
{
checkState(!(runner instanceof LocalQueryRunner), "isReplacedWithEmptyValues() currently does not work with LocalQueryRunner");

transaction(runner.getTransactionManager(), runner.getMetadata(), runner.getAccessControl())
.execute(session, session -> {
Plan plan = runner.createPlan(session, query);
assertPlan(
session,
runner.getMetadata(),
runner.getFunctionManager(),
noopStatsCalculator(),
plan,
PlanMatchPattern.output(PlanMatchPattern.node(ValuesNode.class).with(ValuesNode.class, valuesNode -> valuesNode.getRowCount() == 0)));
});

if (!skipResultsCorrectnessCheckForPushdown) {
// Compare the results with pushdown disabled, so that explicit matches() call is not needed
hasCorrectResultsRegardlessOfPushdown();
}
return this;
}

/**
* Verifies query is not fully pushed down and that results are the same as when pushdown is fully disabled.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.base.filter.UtcConstraintExtractor;
import io.trino.plugin.base.projection.ApplyProjectionUtil;
import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode;
import io.trino.plugin.deltalake.expression.ParsingException;
Expand Down Expand Up @@ -175,6 +176,7 @@
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
Expand Down Expand Up @@ -2694,31 +2696,56 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle;
SchemaTableName tableName = tableHandle.getSchemaTableName();

Set<DeltaLakeColumnHandle> partitionColumns = ImmutableSet.copyOf(extractPartitionColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager));
Map<ColumnHandle, Domain> constraintDomains = constraint.getSummary().getDomains().orElseThrow(() -> new IllegalArgumentException("constraint summary is NONE"));
checkArgument(constraint.getSummary().getDomains().isPresent(), "constraint summary is NONE");

ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> enforceableDomains = ImmutableMap.builder();
ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> unenforceableDomains = ImmutableMap.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> constraintColumns = ImmutableSet.builder();
// We need additional field to track partition columns used in queries as enforceDomains seem to be not catching
// cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0'
constraint.getPredicateColumns().stream()
.flatMap(Collection::stream)
.map(DeltaLakeColumnHandle.class::cast)
.forEach(constraintColumns::add);
for (Entry<ColumnHandle, Domain> domainEntry : constraintDomains.entrySet()) {
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey();
if (!partitionColumns.contains(column)) {
unenforceableDomains.put(column, domainEntry.getValue());
}
else {
enforceableDomains.put(column, domainEntry.getValue());
UtcConstraintExtractor.ExtractionResult extractionResult = extractTupleDomain(constraint);
TupleDomain<ColumnHandle> predicate = extractionResult.tupleDomain();

if (predicate.isAll() && constraint.getPredicateColumns().isEmpty()) {
return Optional.empty();
}

TupleDomain<DeltaLakeColumnHandle> newEnforcedConstraint;
TupleDomain<DeltaLakeColumnHandle> newUnenforcedConstraint;
Set<DeltaLakeColumnHandle> newConstraintColumns;
if (predicate.isNone()) {
// Engine does not pass none Constraint.summary. It can become none when combined with the expression and connector's domain knowledge.
newEnforcedConstraint = TupleDomain.none();
newUnenforcedConstraint = TupleDomain.all();
newConstraintColumns = constraint.getPredicateColumns().stream()
.flatMap(Collection::stream)
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableSet());
}
else {
Set<DeltaLakeColumnHandle> partitionColumns = ImmutableSet.copyOf(extractPartitionColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager));
Map<ColumnHandle, Domain> constraintDomains = predicate.getDomains().orElseThrow();

ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> enforceableDomains = ImmutableMap.builder();
ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> unenforceableDomains = ImmutableMap.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> constraintColumns = ImmutableSet.builder();
// We need additional field to track partition columns used in queries as enforceDomains seem to be not catching
// cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0'
constraint.getPredicateColumns().stream()
.flatMap(Collection::stream)
.map(DeltaLakeColumnHandle.class::cast)
.forEach(constraintColumns::add);
for (Entry<ColumnHandle, Domain> domainEntry : constraintDomains.entrySet()) {
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey();
if (!partitionColumns.contains(column)) {
unenforceableDomains.put(column, domainEntry.getValue());
}
else {
enforceableDomains.put(column, domainEntry.getValue());
}
constraintColumns.add(column);
}
constraintColumns.add(column);

newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow());
newUnenforcedConstraint = TupleDomain.withColumnDomains(unenforceableDomains.buildOrThrow());
newConstraintColumns = constraintColumns.build();
}

TupleDomain<DeltaLakeColumnHandle> newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow());
TupleDomain<DeltaLakeColumnHandle> newUnenforcedConstraint = TupleDomain.withColumnDomains(unenforceableDomains.buildOrThrow());
DeltaLakeTableHandle newHandle = new DeltaLakeTableHandle(
tableName.getSchemaName(),
tableName.getTableName(),
Expand All @@ -2733,7 +2760,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.getNonPartitionConstraint()
.intersect(newUnenforcedConstraint)
.simplify(domainCompactionThreshold),
Sets.union(tableHandle.getConstraintColumns(), constraintColumns.build()),
Sets.union(tableHandle.getConstraintColumns(), newConstraintColumns),
tableHandle.getWriteType(),
tableHandle.getProjectedColumns(),
tableHandle.getUpdatedColumns(),
Expand All @@ -2753,7 +2780,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
return Optional.of(new ConstraintApplicationResult<>(
newHandle,
newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast),
constraint.getExpression(),
extractionResult.remainingExpression(),
false));
}

Expand Down
Loading

0 comments on commit 3964149

Please sign in to comment.