From 7d57a3dd2049c919b86a650c8a7caf664c6e64ca Mon Sep 17 00:00:00 2001 From: David Stryker Date: Fri, 7 Oct 2022 20:27:36 -0700 Subject: [PATCH] Ensure MergeWrite columns include rowId KuduTableHandle had a boolean field isDeleteHandle. If the field was not set, rowIds would not be generated. This commit renames the field to more appropriate requiresRowId, and sets it in KuduMetadata.beginMerge. This fixes bugs in delete and update using merge components. --- .../trino/plugin/kudu/KuduClientSession.java | 2 +- .../io/trino/plugin/kudu/KuduMetadata.java | 16 ++++------ .../io/trino/plugin/kudu/KuduTableHandle.java | 30 +++++++++++++------ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java index 70974d298767..89b6235d2b33 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java @@ -167,7 +167,7 @@ public List buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte Optional> desiredColumns = tableHandle.getDesiredColumns(); List columnIndexes; - if (tableHandle.isDeleteHandle()) { + if (tableHandle.isRequiresRowId()) { if (desiredColumns.isPresent()) { columnIndexes = IntStream .range(0, primaryKeyColumnCount) diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java index a8eb69d5ef65..bdfdb444ddd9 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java @@ -406,13 +406,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); } KuduTableHandle handle = (KuduTableHandle) table; - return new KuduTableHandle( - handle.getSchemaTableName(), - handle.getConstraint(), - handle.getDesiredColumns(), - true, - handle.getBucketCount(), - handle.getLimit()); + return handle.withRequiresRowId(true); } @Override @@ -449,7 +443,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT PartitionDesign design = KuduTableProperties.getPartitionDesign(tableMetadata.getProperties()); boolean generateUUID = !design.hasPartitions(); return new KuduMergeTableHandle( - kuduTableHandle, + kuduTableHandle.withRequiresRowId(true), new KuduOutputTableHandle(tableMetadata.getTable(), columnOriginalTypes, columnTypes, generateUUID, table)); } @@ -492,7 +486,7 @@ public Optional> applyFilter(C handle.getTable(clientSession), newDomain, handle.getDesiredColumns(), - handle.isDeleteHandle(), + handle.isRequiresRowId(), handle.getBucketCount(), handle.getLimit()); @@ -550,7 +544,7 @@ public Optional> applyProjecti handle.getTable(clientSession), handle.getConstraint(), Optional.of(desiredColumns.build()), - handle.isDeleteHandle(), + handle.isRequiresRowId(), handle.getBucketCount(), handle.getLimit()); @@ -571,7 +565,7 @@ public Optional> applyLimit(Connect handle.getTable(clientSession), handle.getConstraint(), handle.getDesiredColumns(), - handle.isDeleteHandle(), + handle.isRequiresRowId(), handle.getBucketCount(), OptionalLong.of(limit)); diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java index 3feb5ee3608b..cc63f8448492 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java @@ -36,7 +36,7 @@ public class KuduTableHandle private transient KuduTable table; private final TupleDomain constraint; private final Optional> desiredColumns; - private final boolean isDeleteHandle; + private final boolean requiresRowId; private final OptionalInt bucketCount; private final OptionalLong limit; @@ -45,11 +45,11 @@ public KuduTableHandle( @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("constraint") TupleDomain constraint, @JsonProperty("desiredColumns") Optional> desiredColumns, - @JsonProperty("isDeleteHandle") boolean isDeleteHandle, + @JsonProperty("requiresRowId") boolean requiresRowId, @JsonProperty("bucketCount") OptionalInt bucketCount, @JsonProperty("limit") OptionalLong limit) { - this(schemaTableName, null, constraint, desiredColumns, isDeleteHandle, bucketCount, limit); + this(schemaTableName, null, constraint, desiredColumns, requiresRowId, bucketCount, limit); } public KuduTableHandle( @@ -57,7 +57,7 @@ public KuduTableHandle( KuduTable table, TupleDomain constraint, Optional> desiredColumns, - boolean isDeleteHandle, + boolean requiresRowId, @JsonProperty("bucketCount") OptionalInt bucketCount, @JsonProperty("limit") OptionalLong limit) { @@ -65,11 +65,23 @@ public KuduTableHandle( this.table = table; this.constraint = requireNonNull(constraint, "constraint is null"); this.desiredColumns = requireNonNull(desiredColumns, "desiredColumns is null"); - this.isDeleteHandle = isDeleteHandle; + this.requiresRowId = requiresRowId; this.bucketCount = requireNonNull(bucketCount, "bucketCount is empty"); this.limit = requireNonNull(limit, "limit is null"); } + public KuduTableHandle withRequiresRowId(boolean requiresRowId) + { + return new KuduTableHandle( + schemaTableName, + table, + constraint, + desiredColumns, + requiresRowId, + bucketCount, + limit); + } + public KuduTable getTable(KuduClientSession session) { if (table == null) { @@ -97,9 +109,9 @@ public Optional> getDesiredColumns() } @JsonProperty - public boolean isDeleteHandle() + public boolean isRequiresRowId() { - return isDeleteHandle; + return requiresRowId; } @JsonProperty @@ -117,7 +129,7 @@ public OptionalLong getLimit() @Override public int hashCode() { - return Objects.hash(schemaTableName, constraint, desiredColumns, isDeleteHandle, bucketCount, limit); + return Objects.hash(schemaTableName, constraint, desiredColumns, requiresRowId, bucketCount, limit); } @Override @@ -134,7 +146,7 @@ public boolean equals(Object obj) return Objects.equals(this.schemaTableName, other.schemaTableName) && Objects.equals(this.constraint, other.constraint) && Objects.equals(this.desiredColumns, other.desiredColumns) && - Objects.equals(this.isDeleteHandle, other.isDeleteHandle) && + Objects.equals(this.requiresRowId, other.requiresRowId) && Objects.equals(this.bucketCount, other.bucketCount) && Objects.equals(this.limit, other.limit); }