Skip to content

Commit

Permalink
add timestamp ntz support
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 21, 2025
1 parent 63d4376 commit dc03702
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,31 @@ public Transaction build(Engine engine) {
boolean shouldUpdateProtocol = false;
Metadata metadata = snapshot.getMetadata();
Protocol protocol = snapshot.getProtocol();
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.get());
Map<String, String> newProperties =
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties =
metadata.filterOutUnchangedProperties(validatedProperties);

ColumnMapping.verifyColumnMappingChange(
metadata.getConfiguration(), newProperties, isNewTable);
if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}

if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}
ColumnMapping.verifyColumnMappingChange(
metadata.getConfiguration(), newProperties, isNewTable);

Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
TableFeatures.autoUpgradeProtocolBasedOnMetadata(metadata, protocol);
if (newProtocolAndFeatures.isPresent()) {
logger.info(
"Automatically enabling table features: {}",
newProtocolAndFeatures.get()._2.stream()
.map(TableFeature::featureName)
.collect(toSet()));
Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
TableFeatures.autoUpgradeProtocolBasedOnMetadata(metadata, protocol);
if (newProtocolAndFeatures.isPresent()) {
logger.info(
"Automatically enabling table features: {}",
newProtocolAndFeatures.get()._2.stream()
.map(TableFeature::featureName)
.collect(toSet()));

shouldUpdateProtocol = true;
protocol = newProtocolAndFeatures.get()._1;
TableFeatures.validateKernelCanWriteToTable(protocol, metadata, table.getPath(engine));
}
shouldUpdateProtocol = true;
protocol = newProtocolAndFeatures.get()._1;
TableFeatures.validateKernelCanWriteToTable(protocol, metadata, table.getPath(engine));
}

return new TransactionImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,6 @@ private static class TimestampNtzTableFeature extends TableFeature.ReaderWriterF
super("timestampNtz", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7);
}

@Override
public boolean hasKernelWriteSupport(Metadata metadata) {
// Kernel can write as long as there are no timestamp_ntz columns defined
// TODO: implement support for writing timestamp_ntz columns
return !hasTypeColumn(metadata.getSchema(), TIMESTAMP_NTZ);
}

@Override
public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) {
return hasTypeColumn(metadata.getSchema(), TIMESTAMP_NTZ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public static void validatePartitionColumns(StructType schema, List<String> part
|| dataType instanceof StringType
|| dataType instanceof BinaryType
|| dataType instanceof DateType
|| dataType instanceof TimestampType)) {
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType)) {
throw unsupportedPartitionDataType(partitionCol, dataType);
}
});
Expand Down Expand Up @@ -329,7 +330,8 @@ protected static void validateSupportedType(DataType dataType) {
|| dataType instanceof StringType
|| dataType instanceof BinaryType
|| dataType instanceof DateType
|| dataType instanceof TimestampType) {
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType) {
// supported types
return;
} else if (dataType instanceof StructType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,98 +654,102 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("insert into partitioned table - all supported partition column types data") {
withTempDirAndEngine { (tblPath, engine) =>
val parquetAllTypes = goldenTablePath("parquet-all-types")
val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes))
val partCols = Seq(
"byteType",
"shortType",
"integerType",
"longType",
"floatType",
"doubleType",
"decimal",
"booleanType",
"stringType",
"binaryType",
"dateType",
"timestampType")
val casePreservingPartCols =
casePreservingPartitionColNames(schema, partCols.asJava).asScala.to[Seq]

// get the partition values from the data batch at the given rowId
def getPartitionValues(batch: ColumnarBatch, rowId: Int): Map[String, Literal] = {
casePreservingPartCols.map { partCol =>
val colIndex = schema.indexOf(partCol)
val vector = batch.getColumnVector(colIndex)

val literal = if (vector.isNullAt(rowId)) {
Literal.ofNull(vector.getDataType)
} else {
vector.getDataType match {
case _: ByteType => Literal.ofByte(vector.getByte(rowId))
case _: ShortType => Literal.ofShort(vector.getShort(rowId))
case _: IntegerType => Literal.ofInt(vector.getInt(rowId))
case _: LongType => Literal.ofLong(vector.getLong(rowId))
case _: FloatType => Literal.ofFloat(vector.getFloat(rowId))
case _: DoubleType => Literal.ofDouble(vector.getDouble(rowId))
case dt: DecimalType =>
Literal.ofDecimal(vector.getDecimal(rowId), dt.getPrecision, dt.getScale)
case _: BooleanType => Literal.ofBoolean(vector.getBoolean(rowId))
case _: StringType => Literal.ofString(vector.getString(rowId))
case _: BinaryType => Literal.ofBinary(vector.getBinary(rowId))
case _: DateType => Literal.ofDate(vector.getInt(rowId))
case _: TimestampType => Literal.ofTimestamp(vector.getLong(rowId))
case _ =>
throw new IllegalArgumentException(s"Unsupported type: ${vector.getDataType}")
Seq(true, false).foreach { includeTimestampNtz =>
test(s"insert into partitioned table - all supported partition column types data - " +
s"timesatmp_ntz included = $includeTimestampNtz") {
withTempDirAndEngine { (tblPath, engine) =>
val parquetAllTypes = goldenTablePath("parquet-all-types")
val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes))
val partCols = Seq(
"byteType",
"shortType",
"integerType",
"longType",
"floatType",
"doubleType",
"decimal",
"booleanType",
"stringType",
"binaryType",
"dateType",
"timestampType") ++ (if (includeTimestampNtz) Seq("timestampNtzType") else Seq.empty)
val casePreservingPartCols =
casePreservingPartitionColNames(schema, partCols.asJava).asScala.to[Seq]

// get the partition values from the data batch at the given rowId
def getPartitionValues(batch: ColumnarBatch, rowId: Int): Map[String, Literal] = {
casePreservingPartCols.map { partCol =>
val colIndex = schema.indexOf(partCol)
val vector = batch.getColumnVector(colIndex)

val literal = if (vector.isNullAt(rowId)) {
Literal.ofNull(vector.getDataType)
} else {
vector.getDataType match {
case _: ByteType => Literal.ofByte(vector.getByte(rowId))
case _: ShortType => Literal.ofShort(vector.getShort(rowId))
case _: IntegerType => Literal.ofInt(vector.getInt(rowId))
case _: LongType => Literal.ofLong(vector.getLong(rowId))
case _: FloatType => Literal.ofFloat(vector.getFloat(rowId))
case _: DoubleType => Literal.ofDouble(vector.getDouble(rowId))
case dt: DecimalType =>
Literal.ofDecimal(vector.getDecimal(rowId), dt.getPrecision, dt.getScale)
case _: BooleanType => Literal.ofBoolean(vector.getBoolean(rowId))
case _: StringType => Literal.ofString(vector.getString(rowId))
case _: BinaryType => Literal.ofBinary(vector.getBinary(rowId))
case _: DateType => Literal.ofDate(vector.getInt(rowId))
case _: TimestampType => Literal.ofTimestamp(vector.getLong(rowId))
case _: TimestampNTZType => Literal.ofTimestampNtz(vector.getLong(rowId))
case _ =>
throw new IllegalArgumentException(s"Unsupported type: ${vector.getDataType}")
}
}
}
(partCol, literal)
}.toMap
}

val data = readTableUsingKernel(engine, parquetAllTypes, schema).to[Seq]
(partCol, literal)
}.toMap
}

// From the above table read data, convert each row as a new batch with partition info
// Take the values of the partitionCols from the data and create a new batch with the
// selection vector to just select a single row.
var dataWithPartInfo = Seq.empty[(Map[String, Literal], Seq[FilteredColumnarBatch])]

data.foreach { filteredBatch =>
val batch = filteredBatch.getData
Seq.range(0, batch.getSize).foreach { rowId =>
val partValues = getPartitionValues(batch, rowId)
val filteredBatch = new FilteredColumnarBatch(
batch,
Optional.of(selectSingleElement(batch.getSize, rowId)))
dataWithPartInfo = dataWithPartInfo :+ (partValues, Seq(filteredBatch))
val data = readTableUsingKernel(engine, parquetAllTypes, schema).to[Seq]

// From the above table read data, convert each row as a new batch with partition info
// Take the values of the partitionCols from the data and create a new batch with the
// selection vector to just select a single row.
var dataWithPartInfo = Seq.empty[(Map[String, Literal], Seq[FilteredColumnarBatch])]

data.foreach { filteredBatch =>
val batch = filteredBatch.getData
Seq.range(0, batch.getSize).foreach { rowId =>
val partValues = getPartitionValues(batch, rowId)
val filteredBatch = new FilteredColumnarBatch(
batch,
Optional.of(selectSingleElement(batch.getSize, rowId)))
dataWithPartInfo = dataWithPartInfo :+ (partValues, Seq(filteredBatch))
}
}
}

appendData(engine, tblPath, isNewTable = true, schema, partCols, dataWithPartInfo)
verifyCommitInfo(tblPath, version = 0, casePreservingPartCols, operation = WRITE)
appendData(engine, tblPath, isNewTable = true, schema, partCols, dataWithPartInfo)
verifyCommitInfo(tblPath, version = 0, casePreservingPartCols, operation = WRITE)

var expData = dataWithPartInfo.flatMap(_._2).flatMap(_.toTestRows)
var expData = dataWithPartInfo.flatMap(_._2).flatMap(_.toTestRows)

val checkpointInterval = 2
setCheckpointInterval(tblPath, checkpointInterval) // version 1
val checkpointInterval = 2
setCheckpointInterval(tblPath, checkpointInterval) // version 1

for (i <- 2 until 4) {
// insert until a checkpoint is required
val commitResult = appendData(engine, tblPath, data = dataWithPartInfo)
for (i <- 2 until 4) {
// insert until a checkpoint is required
val commitResult = appendData(engine, tblPath, data = dataWithPartInfo)

expData = expData ++ dataWithPartInfo.flatMap(_._2).flatMap(_.toTestRows)
expData = expData ++ dataWithPartInfo.flatMap(_._2).flatMap(_.toTestRows)

val fileCount = dataFileCount(tblPath)
checkpointIfReady(engine, tblPath, commitResult, expSize = fileCount)
val fileCount = dataFileCount(tblPath)
checkpointIfReady(engine, tblPath, commitResult, expSize = fileCount)

verifyCommitResult(commitResult, expVersion = i, i % checkpointInterval == 0)
verifyCommitInfo(tblPath, version = i, partitionCols = null, operation = WRITE)
verifyWrittenContent(tblPath, schema, expData)
}
verifyCommitResult(commitResult, expVersion = i, i % checkpointInterval == 0)
verifyCommitInfo(tblPath, version = i, partitionCols = null, operation = WRITE)
verifyWrittenContent(tblPath, schema, expData)
}

assertCheckpointExists(tblPath, atVersion = checkpointInterval)
assertCheckpointExists(tblPath, atVersion = checkpointInterval)
}
}
}

Expand Down Expand Up @@ -996,7 +1000,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
Some(new MapType(newKeyType, newValueType, m.isValueContainsNull))
case _ => None
}
case _: TimestampNTZType => None // ignore
// case _: TimestampNTZType => None // ignore
case s: StructType =>
val newType = removeUnsupportedTypes(s);
if (newType.length() > 0) {
Expand Down

0 comments on commit dc03702

Please sign in to comment.