Skip to content

Commit

Permalink
Make CDF compatible with column mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and ebyhr committed Aug 18, 2023
1 parent 139ac03 commit 00272ae
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,10 +806,6 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
Optional<Long> checkpointInterval = getCheckpointInterval(tableMetadata.getProperties());
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(tableMetadata.getProperties());
ColumnMappingMode columnMappingMode = DeltaLakeTableProperties.getColumnMappingMode(tableMetadata.getProperties());
if (changeDataFeedEnabled.orElse(false) && columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/16967 Add support for CDF with column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Creating tables with %s and %s is unsupported".formatted(CHANGE_DATA_FEED_ENABLED_PROPERTY, COLUMN_MAPPING_MODE_PROPERTY));
}
AtomicInteger fieldId = new AtomicInteger();

try {
Expand Down Expand Up @@ -943,12 +939,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
external = false;
}

Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(tableMetadata.getProperties());
ColumnMappingMode columnMappingMode = DeltaLakeTableProperties.getColumnMappingMode(tableMetadata.getProperties());
if (changeDataFeedEnabled.orElse(false) && columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/16967 Add support for CDF with column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Creating tables with %s and %s is unsupported".formatted(CHANGE_DATA_FEED_ENABLED_PROPERTY, COLUMN_MAPPING_MODE_PROPERTY));
}
AtomicInteger fieldId = new AtomicInteger();

Location finalLocation = Location.of(location);
Expand Down Expand Up @@ -1809,11 +1800,6 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}
checkWriteAllowed(session, handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry()) && columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/16967 Support CDF for tables with 'id' and 'name' column mapping
throw new TrinoException(NOT_SUPPORTED, "Unsupported column mapping mode for tables with change data feed enabled: " + columnMappingMode);
}
checkWriteSupported(session, handle);

List<DeltaLakeColumnHandle> inputColumns = getColumns(handle.getMetadataEntry()).stream()
Expand Down Expand Up @@ -1877,19 +1863,20 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg

long writeTimestamp = Instant.now().toEpochMilli();

ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
List<String> partitionColumns = getPartitionColumns(
handle.getMetadataEntry().getOriginalPartitionColumns(),
mergeHandle.getInsertTableHandle().getInputColumns(),
columnMappingMode);

if (!cdcFiles.isEmpty()) {
appendCdcFilesInfos(transactionLogWriter, cdcFiles, handle.getMetadataEntry().getOriginalPartitionColumns());
appendCdcFilesInfos(transactionLogWriter, cdcFiles, partitionColumns);
}

for (String file : oldFiles) {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(file, writeTimestamp, true));
}

ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
List<String> partitionColumns = getPartitionColumns(
handle.getMetadataEntry().getOriginalPartitionColumns(),
mergeHandle.getInsertTableHandle().getInputColumns(),
columnMappingMode);
appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true);

transactionLogWriter.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN;
Expand Down Expand Up @@ -137,12 +138,19 @@ public ConnectorPageSource createPageSource(
.collect(toImmutableList());

Map<String, Optional<String>> partitionKeys = split.getPartitionKeys();

ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
Optional<List<String>> partitionValues = Optional.empty();
if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) {
partitionValues = Optional.of(new ArrayList<>());
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), typeManager)) {
Optional<String> value = partitionKeys.get(column.getName());
Optional<String> value = switch (columnMappingMode) {
case NONE:
yield partitionKeys.get(column.getName());
case ID, NAME:
yield partitionKeys.get(column.getPhysicalName());
default:
throw new IllegalStateException("Unknown column mapping mode");
};
if (value != null) {
partitionValues.get().add(value.orElse(null));
}
Expand Down Expand Up @@ -183,7 +191,6 @@ public ConnectorPageSource createPageSource(
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withUseColumnIndex(isParquetUseColumnIndex(session));

ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
Map<Integer, String> parquetFieldIdToName = columnMappingMode == ColumnMappingMode.ID ? loadParquetIdAndNameMapping(inputFile, options) : ImmutableMap.of();

ImmutableSet.Builder<String> missingColumnNames = ImmutableSet.builder();
Expand Down
Loading

0 comments on commit 00272ae

Please sign in to comment.