Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down predicate involving timestamp_tz in Delta Lake #19874

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.testing.LocalQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
Expand Down Expand Up @@ -501,6 +502,34 @@ public QueryAssert isFullyPushedDown()
return this;
}

/**
* Verifies query is fully pushed down and Table Scan is replaced with empty Values.
* Verifies that results are the same as when pushdown is fully disabled.
*/
@CanIgnoreReturnValue
public QueryAssert isReplacedWithEmptyValues()
{
checkState(!(runner instanceof LocalQueryRunner), "isReplacedWithEmptyValues() currently does not work with LocalQueryRunner");
kasiafi marked this conversation as resolved.
Show resolved Hide resolved

transaction(runner.getTransactionManager(), runner.getMetadata(), runner.getAccessControl())
.execute(session, session -> {
Plan plan = runner.createPlan(session, query);
assertPlan(
session,
runner.getMetadata(),
runner.getFunctionManager(),
noopStatsCalculator(),
plan,
PlanMatchPattern.output(PlanMatchPattern.node(ValuesNode.class).with(ValuesNode.class, valuesNode -> valuesNode.getRowCount() == 0)));
});

if (!skipResultsCorrectnessCheckForPushdown) {
// Compare the results with pushdown disabled, so that explicit matches() call is not needed
hasCorrectResultsRegardlessOfPushdown();
}
return this;
}

/**
* Verifies query is not fully pushed down and that results are the same as when pushdown is fully disabled.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.base.filter;
kasiafi marked this conversation as resolved.
Show resolved Hide resolved

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -52,27 +52,35 @@
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.NOT_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.MAX_SHORT_PRECISION;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static java.lang.Math.toIntExact;
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public final class ConstraintExtractor
/**
* Some expressions involving the TIMESTAMP WITH TIME ZONE type can be optimized when the time zone is known.
* It is not possible in the engine, but can be possible in the connector if the connector follows some
* convention regarding time zones. In some connectors, like the Delta Lake connector, or the Iceberg connector,
* all values of TIMESTAMP WITH TIME ZONE type are represented using the UTC time zone.
*/
public final class UtcConstraintExtractor
{
private ConstraintExtractor() {}
private UtcConstraintExtractor() {}

public static ExtractionResult extractTupleDomain(Constraint constraint)
{
TupleDomain<IcebergColumnHandle> result = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast);
TupleDomain<ColumnHandle> result = constraint.getSummary();
ImmutableList.Builder<ConnectorExpression> remainingExpressions = ImmutableList.builder();
for (ConnectorExpression conjunct : extractConjuncts(constraint.getExpression())) {
Optional<TupleDomain<IcebergColumnHandle>> converted = toTupleDomain(conjunct, constraint.getAssignments());
Optional<TupleDomain<ColumnHandle>> converted = toTupleDomain(conjunct, constraint.getAssignments());
if (converted.isEmpty()) {
remainingExpressions.add(conjunct);
}
Expand All @@ -86,15 +94,15 @@ public static ExtractionResult extractTupleDomain(Constraint constraint)
return new ExtractionResult(result, and(remainingExpressions.build()));
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(ConnectorExpression expression, Map<String, ColumnHandle> assignments)
private static Optional<TupleDomain<ColumnHandle>> toTupleDomain(ConnectorExpression expression, Map<String, ColumnHandle> assignments)
{
if (expression instanceof Call call) {
return toTupleDomain(call, assignments);
}
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(Call call, Map<String, ColumnHandle> assignments)
private static Optional<TupleDomain<ColumnHandle>> toTupleDomain(Call call, Map<String, ColumnHandle> assignments)
{
if (call.getArguments().size() == 2) {
ConnectorExpression firstArgument = call.getArguments().get(0);
Expand Down Expand Up @@ -145,7 +153,7 @@ private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(Call cal
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> unwrapCastInComparison(
private static Optional<TupleDomain<ColumnHandle>> unwrapCastInComparison(
// upon invocation, we don't know if this really is a comparison
FunctionName functionName,
ConnectorExpression castSource,
Expand All @@ -164,13 +172,10 @@ private static Optional<TupleDomain<IcebergColumnHandle>> unwrapCastInComparison
return Optional.empty();
}

IcebergColumnHandle column = resolve(sourceVariable, assignments);
if (column.getType() instanceof TimestampWithTimeZoneType sourceType) {
// Iceberg supports only timestamp(6) with time zone
checkArgument(sourceType.getPrecision() == 6, "Unexpected type: %s", column.getType());

ColumnHandle column = resolve(sourceVariable, assignments);
if (sourceVariable.getType() instanceof TimestampWithTimeZoneType columnType) {
if (constant.getType() == DateType.DATE) {
return unwrapTimestampTzToDateCast(column, functionName, (long) constant.getValue())
return unwrapTimestampTzToDateCast(column, columnType, functionName, (long) constant.getValue())
.map(domain -> TupleDomain.withColumnDomains(ImmutableMap.of(column, domain)));
}
// TODO support timestamp constant
Expand All @@ -179,38 +184,50 @@ private static Optional<TupleDomain<IcebergColumnHandle>> unwrapCastInComparison
return Optional.empty();
}

private static Optional<Domain> unwrapTimestampTzToDateCast(IcebergColumnHandle column, FunctionName functionName, long date)
private static Optional<Domain> unwrapTimestampTzToDateCast(ColumnHandle column, Type columnType, FunctionName functionName, long date)
{
Type type = column.getType();
checkArgument(type.equals(TIMESTAMP_TZ_MICROS), "Column of unexpected type %s: %s", type, column);

// Verify no overflow. Date values must be in integer range.
verify(date <= Integer.MAX_VALUE, "Date value out of range: %s", date);

// In Iceberg, timestamp with time zone values are all in UTC

LongTimestampWithTimeZone startOfDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction(date * MILLISECONDS_PER_DAY, 0, UTC_KEY);
LongTimestampWithTimeZone startOfNextDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction((date + 1) * MILLISECONDS_PER_DAY, 0, UTC_KEY);
Object startOfDate;
Object startOfNextDate;
int precision = ((TimestampWithTimeZoneType) columnType).getPrecision();
if (precision <= MAX_SHORT_PRECISION) {
startOfDate = packDateTimeWithZone(date * MILLISECONDS_PER_DAY, UTC_KEY);
startOfNextDate = packDateTimeWithZone((date + 1) * MILLISECONDS_PER_DAY, UTC_KEY);
}
else {
startOfDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction(date * MILLISECONDS_PER_DAY, 0, UTC_KEY);
startOfNextDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction((date + 1) * MILLISECONDS_PER_DAY, 0, UTC_KEY);
}

return createDomain(functionName, type, startOfDate, startOfNextDate);
return createDomain(functionName, columnType, startOfDate, startOfNextDate);
}

private static Optional<Domain> unwrapYearInTimestampTzComparison(FunctionName functionName, Type type, Constant constant)
{
checkArgument(constant.getValue() != null, "Unexpected constant: %s", constant);
checkArgument(type.equals(TIMESTAMP_TZ_MICROS), "Unexpected type: %s", type);

int year = toIntExact((Long) constant.getValue());
ZonedDateTime periodStart = ZonedDateTime.of(year, 1, 1, 0, 0, 0, 0, UTC);
ZonedDateTime periodEnd = periodStart.plusYears(1);

LongTimestampWithTimeZone start = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodStart.toEpochSecond(), 0, UTC_KEY);
LongTimestampWithTimeZone end = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodEnd.toEpochSecond(), 0, UTC_KEY);
Object start;
Object end;
int precision = ((TimestampWithTimeZoneType) type).getPrecision();
if (precision <= MAX_SHORT_PRECISION) {
start = packDateTimeWithZone(periodStart.toEpochSecond() * MILLISECONDS_PER_SECOND, UTC_KEY);
end = packDateTimeWithZone(periodEnd.toEpochSecond() * MILLISECONDS_PER_SECOND, UTC_KEY);
}
else {
start = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodStart.toEpochSecond(), 0, UTC_KEY);
end = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodEnd.toEpochSecond(), 0, UTC_KEY);
}

return createDomain(functionName, type, start, end);
}

private static Optional<Domain> createDomain(FunctionName functionName, Type type, LongTimestampWithTimeZone startOfDate, LongTimestampWithTimeZone startOfNextDate)
private static Optional<Domain> createDomain(FunctionName functionName, Type type, Object startOfDate, Object startOfNextDate)
{
if (functionName.equals(EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.range(type, startOfDate, true, startOfNextDate, false)), false));
Expand All @@ -237,7 +254,7 @@ private static Optional<Domain> createDomain(FunctionName functionName, Type typ
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> unwrapDateTruncInComparison(
private static Optional<TupleDomain<ColumnHandle>> unwrapDateTruncInComparison(
// upon invocation, we don't know if this really is a comparison
FunctionName functionName,
Constant unit,
Expand All @@ -261,10 +278,8 @@ private static Optional<TupleDomain<IcebergColumnHandle>> unwrapDateTruncInCompa
return Optional.empty();
}

IcebergColumnHandle column = resolve(sourceVariable, assignments);
if (column.getType() instanceof TimestampWithTimeZoneType type) {
// Iceberg supports only timestamp(6) with time zone
checkArgument(type.getPrecision() == 6, "Unexpected type: %s", column.getType());
ColumnHandle column = resolve(sourceVariable, assignments);
if (sourceVariable.getType() instanceof TimestampWithTimeZoneType type) {
verify(constant.getType().equals(type), "This method should not be invoked when type mismatch (i.e. surely not a comparison)");

return unwrapDateTruncInComparison(((Slice) unit.getValue()).toStringUtf8(), functionName, constant)
Expand All @@ -278,12 +293,23 @@ private static Optional<Domain> unwrapDateTruncInComparison(String unit, Functio
{
Type type = constant.getType();
checkArgument(constant.getValue() != null, "Unexpected constant: %s", constant);
checkArgument(type.equals(TIMESTAMP_TZ_MICROS), "Unexpected type: %s", type);

// Normalized to UTC because for comparisons the zone is irrelevant
ZonedDateTime dateTime = Instant.ofEpochMilli(((LongTimestampWithTimeZone) constant.getValue()).getEpochMillis())
.plusNanos(LongMath.divide(((LongTimestampWithTimeZone) constant.getValue()).getPicosOfMilli(), PICOSECONDS_PER_NANOSECOND, UNNECESSARY))
.atZone(UTC);
ZonedDateTime dateTime;
int precision = ((TimestampWithTimeZoneType) type).getPrecision();
if (precision <= MAX_SHORT_PRECISION) {
// Normalized to UTC because for comparisons the zone is irrelevant
dateTime = Instant.ofEpochMilli(unpackMillisUtc((long) constant.getValue()))
.atZone(UTC);
}
else {
if (precision > 9) {
return Optional.empty();
}
// Normalized to UTC because for comparisons the zone is irrelevant
dateTime = Instant.ofEpochMilli(((LongTimestampWithTimeZone) constant.getValue()).getEpochMillis())
.plusNanos(LongMath.divide(((LongTimestampWithTimeZone) constant.getValue()).getPicosOfMilli(), PICOSECONDS_PER_NANOSECOND, UNNECESSARY))
.atZone(UTC);
}

ZonedDateTime periodStart;
ZonedDateTime nextPeriodStart;
Expand All @@ -310,8 +336,16 @@ private static Optional<Domain> unwrapDateTruncInComparison(String unit, Functio
}
boolean constantAtPeriodStart = dateTime.equals(periodStart);

LongTimestampWithTimeZone start = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodStart.toEpochSecond(), 0, UTC_KEY);
LongTimestampWithTimeZone end = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(nextPeriodStart.toEpochSecond(), 0, UTC_KEY);
Object start;
Object end;
if (precision <= MAX_SHORT_PRECISION) {
start = packDateTimeWithZone(periodStart.toEpochSecond() * MILLISECONDS_PER_SECOND, UTC_KEY);
end = packDateTimeWithZone(nextPeriodStart.toEpochSecond() * MILLISECONDS_PER_SECOND, UTC_KEY);
}
else {
start = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(periodStart.toEpochSecond(), 0, UTC_KEY);
end = LongTimestampWithTimeZone.fromEpochSecondsAndFraction(nextPeriodStart.toEpochSecond(), 0, UTC_KEY);
}

if (functionName.equals(EQUAL_OPERATOR_FUNCTION_NAME)) {
if (!constantAtPeriodStart) {
Expand Down Expand Up @@ -352,7 +386,7 @@ private static Optional<Domain> unwrapDateTruncInComparison(String unit, Functio
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> unwrapYearInTimestampTzComparison(
private static Optional<TupleDomain<ColumnHandle>> unwrapYearInTimestampTzComparison(
// upon invocation, we don't know if this really is a comparison
FunctionName functionName,
ConnectorExpression yearSource,
Expand All @@ -371,26 +405,23 @@ private static Optional<TupleDomain<IcebergColumnHandle>> unwrapYearInTimestampT
return Optional.empty();
}

IcebergColumnHandle column = resolve(sourceVariable, assignments);
if (column.getType() instanceof TimestampWithTimeZoneType type) {
// Iceberg supports only timestamp(6) with time zone
checkArgument(type.getPrecision() == 6, "Unexpected type: %s", column.getType());

ColumnHandle column = resolve(sourceVariable, assignments);
if (sourceVariable.getType() instanceof TimestampWithTimeZoneType type) {
return unwrapYearInTimestampTzComparison(functionName, type, constant)
.map(domain -> TupleDomain.withColumnDomains(ImmutableMap.of(column, domain)));
}

return Optional.empty();
}

private static IcebergColumnHandle resolve(Variable variable, Map<String, ColumnHandle> assignments)
private static ColumnHandle resolve(Variable variable, Map<String, ColumnHandle> assignments)
{
ColumnHandle columnHandle = assignments.get(variable.getName());
checkArgument(columnHandle != null, "No assignment for %s", variable);
return (IcebergColumnHandle) columnHandle;
return columnHandle;
}

public record ExtractionResult(TupleDomain<IcebergColumnHandle> tupleDomain, ConnectorExpression remainingExpression)
public record ExtractionResult(TupleDomain<ColumnHandle> tupleDomain, ConnectorExpression remainingExpression)
{
public ExtractionResult
{
Expand Down
Loading