Skip to content

Commit

Permalink
Ensure MergeWrite columns include rowId
Browse files Browse the repository at this point in the history
KuduTableHandle had a boolean field isDeleteHandle.  If the
field was not set, rowIds would not be generated.  This
commit renames the field to more appropriate requiresRowId,
and sets it in KuduMetadata.beginMerge.  This fixes bugs
in delete and update using merge components.
  • Loading branch information
djsstarburst authored and electrum committed Oct 18, 2022
1 parent 313e9ec commit 7d57a3d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public List<KuduSplit> buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
Optional<List<ColumnHandle>> desiredColumns = tableHandle.getDesiredColumns();

List<Integer> columnIndexes;
if (tableHandle.isDeleteHandle()) {
if (tableHandle.isRequiresRowId()) {
if (desiredColumns.isPresent()) {
columnIndexes = IntStream
.range(0, primaryKeyColumnCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
KuduTableHandle handle = (KuduTableHandle) table;
return new KuduTableHandle(
handle.getSchemaTableName(),
handle.getConstraint(),
handle.getDesiredColumns(),
true,
handle.getBucketCount(),
handle.getLimit());
return handle.withRequiresRowId(true);
}

@Override
Expand Down Expand Up @@ -449,7 +443,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
PartitionDesign design = KuduTableProperties.getPartitionDesign(tableMetadata.getProperties());
boolean generateUUID = !design.hasPartitions();
return new KuduMergeTableHandle(
kuduTableHandle,
kuduTableHandle.withRequiresRowId(true),
new KuduOutputTableHandle(tableMetadata.getTable(), columnOriginalTypes, columnTypes, generateUUID, table));
}

Expand Down Expand Up @@ -492,7 +486,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle.getTable(clientSession),
newDomain,
handle.getDesiredColumns(),
handle.isDeleteHandle(),
handle.isRequiresRowId(),
handle.getBucketCount(),
handle.getLimit());

Expand Down Expand Up @@ -550,7 +544,7 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
handle.getTable(clientSession),
handle.getConstraint(),
Optional.of(desiredColumns.build()),
handle.isDeleteHandle(),
handle.isRequiresRowId(),
handle.getBucketCount(),
handle.getLimit());

Expand All @@ -571,7 +565,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
handle.getTable(clientSession),
handle.getConstraint(),
handle.getDesiredColumns(),
handle.isDeleteHandle(),
handle.isRequiresRowId(),
handle.getBucketCount(),
OptionalLong.of(limit));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class KuduTableHandle
private transient KuduTable table;
private final TupleDomain<ColumnHandle> constraint;
private final Optional<List<ColumnHandle>> desiredColumns;
private final boolean isDeleteHandle;
private final boolean requiresRowId;
private final OptionalInt bucketCount;
private final OptionalLong limit;

Expand All @@ -45,31 +45,43 @@ public KuduTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("desiredColumns") Optional<List<ColumnHandle>> desiredColumns,
@JsonProperty("isDeleteHandle") boolean isDeleteHandle,
@JsonProperty("requiresRowId") boolean requiresRowId,
@JsonProperty("bucketCount") OptionalInt bucketCount,
@JsonProperty("limit") OptionalLong limit)
{
this(schemaTableName, null, constraint, desiredColumns, isDeleteHandle, bucketCount, limit);
this(schemaTableName, null, constraint, desiredColumns, requiresRowId, bucketCount, limit);
}

public KuduTableHandle(
SchemaTableName schemaTableName,
KuduTable table,
TupleDomain<ColumnHandle> constraint,
Optional<List<ColumnHandle>> desiredColumns,
boolean isDeleteHandle,
boolean requiresRowId,
@JsonProperty("bucketCount") OptionalInt bucketCount,
@JsonProperty("limit") OptionalLong limit)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.table = table;
this.constraint = requireNonNull(constraint, "constraint is null");
this.desiredColumns = requireNonNull(desiredColumns, "desiredColumns is null");
this.isDeleteHandle = isDeleteHandle;
this.requiresRowId = requiresRowId;
this.bucketCount = requireNonNull(bucketCount, "bucketCount is empty");
this.limit = requireNonNull(limit, "limit is null");
}

public KuduTableHandle withRequiresRowId(boolean requiresRowId)
{
return new KuduTableHandle(
schemaTableName,
table,
constraint,
desiredColumns,
requiresRowId,
bucketCount,
limit);
}

public KuduTable getTable(KuduClientSession session)
{
if (table == null) {
Expand Down Expand Up @@ -97,9 +109,9 @@ public Optional<List<ColumnHandle>> getDesiredColumns()
}

@JsonProperty
public boolean isDeleteHandle()
public boolean isRequiresRowId()
{
return isDeleteHandle;
return requiresRowId;
}

@JsonProperty
Expand All @@ -117,7 +129,7 @@ public OptionalLong getLimit()
@Override
public int hashCode()
{
return Objects.hash(schemaTableName, constraint, desiredColumns, isDeleteHandle, bucketCount, limit);
return Objects.hash(schemaTableName, constraint, desiredColumns, requiresRowId, bucketCount, limit);
}

@Override
Expand All @@ -134,7 +146,7 @@ public boolean equals(Object obj)
return Objects.equals(this.schemaTableName, other.schemaTableName) &&
Objects.equals(this.constraint, other.constraint) &&
Objects.equals(this.desiredColumns, other.desiredColumns) &&
Objects.equals(this.isDeleteHandle, other.isDeleteHandle) &&
Objects.equals(this.requiresRowId, other.requiresRowId) &&
Objects.equals(this.bucketCount, other.bucketCount) &&
Objects.equals(this.limit, other.limit);
}
Expand Down

0 comments on commit 7d57a3d

Please sign in to comment.