Skip to content

Commit

Permalink
misc. hacks to get the remove actions and allow all arbitary properties
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Dec 4, 2024
1 parent 7b3a859 commit cef3625
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 2 deletions.
19 changes: 19 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.delta.kernel.internal.DataWriteContextImpl;
import io.delta.kernel.internal.IcebergCompatV2Utils;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.actions.SingleAction;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -220,4 +221,22 @@ static CloseableIterator<Row> generateAppendActions(
return SingleAction.createAddFileSingleAction(addFileRow);
});
}

static CloseableIterator<Row> generateRemoveActions(
Engine engine,
Row transactionState,
CloseableIterator<DataFileStatus> fileStatusIter,
DataWriteContext dataWriteContext) {
URI tableRoot = new Path(getTablePath(transactionState)).toUri();
return fileStatusIter.map(
dataFileStatus -> {
Row removeFileRow =
RemoveFile.convertDataFileStatus(
tableRoot,
dataFileStatus,
((DataWriteContextImpl) dataWriteContext).getPartitionValues(),
true /* dataChange */);
return SingleAction.createRemoveFileSingleAction(removeFileRow);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ public static Map<String, String> validateProperties(Map<String, String> configu
} else {
throw DeltaErrors.cannotModifyTableProperty(kv.getKey());
}
} else {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
}
// allow unknown properties to be set
}
return validatedConfigurations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatus;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

/** Metadata about {@code remove} action in the Delta Log. */
public class RemoveFile {
Expand All @@ -36,4 +49,32 @@ public class RemoveFile {
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
// There are more fields which are added when row-id tracking is enabled. When Kernel
// starts supporting row-ids, we should add those fields here.

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, FULL_SCHEMA.length())
.boxed()
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

/**
* Utility to generate `RemoveFile` row from the given {@link DataFileStatus} and partition
* values.
*/
public static Row convertDataFileStatus(
URI tableRoot,
DataFileStatus dataFileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
Path filePath = new Path(dataFileStatus.getPath());
Map<Integer, Object> valueMap = new HashMap<>();
valueMap.put(
COL_NAME_TO_ORDINAL.get("path"), relativizePath(filePath, tableRoot).toUri().toString());
valueMap.put(
COL_NAME_TO_ORDINAL.get("partitionValues"), serializePartitionMap(partitionValues));
valueMap.put(
COL_NAME_TO_ORDINAL.get("deletionTimestamp"), dataFileStatus.getModificationTime());
valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);

// any fields not present in the valueMap are considered null
return new GenericRow(FULL_SCHEMA, valueMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ class TableChangesSuite extends AnyFunSuite with TestUtils {
2L -> (start + 40 * minuteInMilliseconds)
)


val iter = Table.forPath(defaultEngine, tempDir.getCanonicalPath)
.asInstanceOf[TableImpl]
.getChanges(defaultEngine, 0, 2, Set(DeltaAction.ADD).asJava);

while (iter.hasNext) {
val batch = iter.next()
val addsVector = batch.getColumnVector(2)

val addPathVector = addsVector.getChild(0)
for (i <- 0 until addsVector.getSize) {
if (!addsVector.isNullAt(i)) {
val path = addPathVector.getString(i)
}
}

val removesVector = batch.getColumnVector(3)
for (i <- 0 until removesVector.getSize) {
if (!removesVector.isNullAt(i)) {
val removePath = removesVector.getChild(0).getString(i)
}
}


}

// Check the timestamps are returned correctly
Table.forPath(defaultEngine, tempDir.getCanonicalPath)
.asInstanceOf[TableImpl]
Expand Down

0 comments on commit cef3625

Please sign in to comment.