Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Jul 18, 2023
1 parent c7853e4 commit e970c1d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -134,6 +135,14 @@ public void testTimestampPartition() throws Exception {
testDanglingDelete();
}

@Test
public void testTimestampNtz() throws Exception {
createTable("timestamp_ntz");
LocalDateTime baseTimestamp = Timestamp.valueOf("2023-01-01 15:30:00").toLocalDateTime();
insertData(baseTimestamp::plusDays);
testDanglingDelete();
}

@Test
public void testBytePartition() throws Exception {
createTable("byte");
Expand Down Expand Up @@ -200,7 +209,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

List<DataFile> dataFiles = dataFiles(table);
assertThat(numDataFiles).isEqualTo(dataFiles.size());
assertThat(dataFiles).hasSize(numDataFiles);

SparkActions.get(spark)
.rewriteDataFiles(table)
Expand All @@ -210,7 +219,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(numDataFiles * DELETE_FILES_PER_PARTITION).isEqualTo(deleteFiles.size());
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);

List<Object[]> expectedRecords = records(tableName);

Expand All @@ -221,7 +230,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles.size()).as("Remaining dangling deletes").isEqualTo(0);
assertThat(newDeleteFiles).as("Remaining dangling deletes").isEmpty();
checkResult(result, deleteFiles, Lists.newArrayList(), numDataFiles);

List<Object[]> actualRecords = records(tableName);
Expand Down Expand Up @@ -369,9 +378,7 @@ private void checkResult(
.isEqualTo(size(rewrittenDeletes));
assertThat(result.addedBytesCount()).as("New Delete byte count").isEqualTo(size(newDeletes));

assertThat(result.rewriteResults().size())
.as("Rewritten group count")
.isEqualTo(expectedGroups);
assertThat(result.rewriteResults()).as("Rewritten group count").hasSize(expectedGroups);
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

List<DataFile> dataFiles = dataFiles(table);
assertThat(numDataFiles).isEqualTo(dataFiles.size());
assertThat(dataFiles).hasSize(numDataFiles);

SparkActions.get(spark)
.rewriteDataFiles(table)
Expand All @@ -219,7 +219,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(numDataFiles * DELETE_FILES_PER_PARTITION).isEqualTo(deleteFiles.size());
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);

List<Object[]> expectedRecords = records(tableName);

Expand All @@ -230,7 +230,7 @@ private void testDanglingDelete(int numDataFiles) throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles.size()).as("Remaining dangling deletes").isEqualTo(0);
assertThat(newDeleteFiles).as("Remaining dangling deletes").isEmpty();
checkResult(result, deleteFiles, Lists.newArrayList(), numDataFiles);

List<Object[]> actualRecords = records(tableName);
Expand Down Expand Up @@ -378,9 +378,7 @@ private void checkResult(
.isEqualTo(size(rewrittenDeletes));
assertThat(result.addedBytesCount()).as("New Delete byte count").isEqualTo(size(newDeletes));

assertThat(result.rewriteResults().size())
.as("Rewritten group count")
.isEqualTo(expectedGroups);
assertThat(result.rewriteResults()).as("Rewritten group count").hasSize(expectedGroups);
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
Expand Down

0 comments on commit e970c1d

Please sign in to comment.