Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3, 3.4: Migrate TestRewritePositionDeleteFiles to assertJ #8070

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -68,7 +69,6 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -192,15 +192,15 @@ public void testNullTransform() throws Exception {
testDanglingDelete(2);
}

private <T> void testDanglingDelete() throws Exception {
private void testDanglingDelete() throws Exception {
testDanglingDelete(NUM_DATA_FILES);
}

private <T> void testDanglingDelete(int numDataFiles) throws Exception {
private void testDanglingDelete(int numDataFiles) throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals(numDataFiles, dataFiles.size());
assertThat(dataFiles).hasSize(numDataFiles);

SparkActions.get(spark)
.rewriteDataFiles(table)
Expand All @@ -210,7 +210,7 @@ private <T> void testDanglingDelete(int numDataFiles) throws Exception {
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
Assert.assertEquals(numDataFiles * DELETE_FILES_PER_PARTITION, deleteFiles.size());
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);

List<Object[]> expectedRecords = records(tableName);

Expand All @@ -221,7 +221,7 @@ private <T> void testDanglingDelete(int numDataFiles) throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = deleteFiles(table);
Assert.assertEquals("Should have removed all dangling delete files", 0, newDeleteFiles.size());
assertThat(newDeleteFiles).as("Remaining dangling deletes").isEmpty();
checkResult(result, deleteFiles, Lists.newArrayList(), numDataFiles);

List<Object[]> actualRecords = records(tableName);
Expand All @@ -241,12 +241,11 @@ private void createTable(String partitionType, String partitionCol) {
tableName, partitionType, partitionCol);
}

private <T> void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
private void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
insertData(partitionValueFunction, NUM_DATA_FILES);
}

private <T> void insertData(Function<Integer, ?> partitionValue, int numDataFiles)
throws Exception {
private void insertData(Function<Integer, ?> partitionValue, int numDataFiles) throws Exception {
for (int i = 0; i < numDataFiles; i++) {
Dataset<Row> df =
spark
Expand Down Expand Up @@ -278,11 +277,9 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I
List<DataFile> partitionFiles = filesByPartitionEntry.getValue();

int deletesForPartition = partitionFiles.size() * DELETE_FILE_SIZE;
Assert.assertEquals(
"Number of delete files per partition should be "
+ "evenly divisible by requested deletes per data file times number of data files in this partition",
0,
deletesForPartition % DELETE_FILE_SIZE);
assertThat(deletesForPartition % DELETE_FILE_SIZE)
.as("Number of delete files per partition modulo number of data files in this partition")
.isEqualTo(0);
int deleteFileSize = deletesForPartition / DELETE_FILES_PER_PARTITION;

int counter = 0;
Expand Down Expand Up @@ -361,48 +358,41 @@ private void checkResult(
List<DeleteFile> rewrittenDeletes,
List<DeleteFile> newDeletes,
int expectedGroups) {
Assert.assertEquals(
"Expected rewritten delete file count does not match",
rewrittenDeletes.size(),
result.rewrittenDeleteFilesCount());
Assert.assertEquals(
"Expected new delete file count does not match",
newDeletes.size(),
result.addedDeleteFilesCount());
Assert.assertEquals(
"Expected rewritten delete byte count does not match",
size(rewrittenDeletes),
result.rewrittenBytesCount());
Assert.assertEquals(
"Expected new delete byte count does not match",
size(newDeletes),
result.addedBytesCount());

Assert.assertEquals(
"Expected rewrite group count does not match",
expectedGroups,
result.rewriteResults().size());
Assert.assertEquals(
"Expected rewritten delete file count in all groups to match",
rewrittenDeletes.size(),
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum());
Assert.assertEquals(
"Expected added delete file count in all groups to match",
newDeletes.size(),
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum());
Assert.assertEquals(
"Expected rewritten delete bytes in all groups to match",
size(rewrittenDeletes),
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::rewrittenBytesCount)
.sum());
Assert.assertEquals(
"Expected added delete bytes in all groups to match",
size(newDeletes),
result.rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum());
assertThat(result.rewrittenDeleteFilesCount())
.as("Rewritten delete files")
.isEqualTo(rewrittenDeletes.size());
assertThat(result.addedDeleteFilesCount())
.as("Added delete files")
.isEqualTo(newDeletes.size());
assertThat(result.rewrittenBytesCount())
.as("Rewritten delete bytes")
.isEqualTo(size(rewrittenDeletes));
assertThat(result.addedBytesCount()).as("New Delete byte count").isEqualTo(size(newDeletes));

assertThat(result.rewriteResults()).as("Rewritten group count").hasSize(expectedGroups);
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum())
.as("Rewritten delete file count in all groups")
.isEqualTo(rewrittenDeletes.size());
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum())
.as("Added delete file count in all groups")
.isEqualTo(newDeletes.size());
assertThat(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::rewrittenBytesCount)
.sum())
.as("Rewritten delete bytes in all groups")
.isEqualTo(size(rewrittenDeletes));
assertThat(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::addedBytesCount)
.sum())
.as("Added delete bytes in all groups")
.isEqualTo(size(newDeletes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -69,7 +70,6 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -201,15 +201,15 @@ public void testNullTransform() throws Exception {
testDanglingDelete(2);
}

private <T> void testDanglingDelete() throws Exception {
private void testDanglingDelete() throws Exception {
testDanglingDelete(NUM_DATA_FILES);
}

private <T> void testDanglingDelete(int numDataFiles) throws Exception {
private void testDanglingDelete(int numDataFiles) throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals(numDataFiles, dataFiles.size());
assertThat(dataFiles).hasSize(numDataFiles);

SparkActions.get(spark)
.rewriteDataFiles(table)
Expand All @@ -219,7 +219,7 @@ private <T> void testDanglingDelete(int numDataFiles) throws Exception {
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
Assert.assertEquals(numDataFiles * DELETE_FILES_PER_PARTITION, deleteFiles.size());
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);

List<Object[]> expectedRecords = records(tableName);

Expand All @@ -230,7 +230,7 @@ private <T> void testDanglingDelete(int numDataFiles) throws Exception {
.execute();

List<DeleteFile> newDeleteFiles = deleteFiles(table);
Assert.assertEquals("Should have removed all dangling delete files", 0, newDeleteFiles.size());
assertThat(newDeleteFiles).as("Remaining dangling deletes").isEmpty();
checkResult(result, deleteFiles, Lists.newArrayList(), numDataFiles);

List<Object[]> actualRecords = records(tableName);
Expand All @@ -250,12 +250,11 @@ private void createTable(String partitionType, String partitionCol) {
tableName, partitionType, partitionCol);
}

private <T> void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
private void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
insertData(partitionValueFunction, NUM_DATA_FILES);
}

private <T> void insertData(Function<Integer, ?> partitionValue, int numDataFiles)
throws Exception {
private void insertData(Function<Integer, ?> partitionValue, int numDataFiles) throws Exception {
for (int i = 0; i < numDataFiles; i++) {
Dataset<Row> df =
spark
Expand Down Expand Up @@ -287,11 +286,9 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I
List<DataFile> partitionFiles = filesByPartitionEntry.getValue();

int deletesForPartition = partitionFiles.size() * DELETE_FILE_SIZE;
Assert.assertEquals(
"Number of delete files per partition should be "
+ "evenly divisible by requested deletes per data file times number of data files in this partition",
0,
deletesForPartition % DELETE_FILE_SIZE);
assertThat(deletesForPartition % DELETE_FILE_SIZE)
.as("Number of delete files per partition modulo number of data files in this partition")
.isEqualTo(0);
int deleteFileSize = deletesForPartition / DELETE_FILES_PER_PARTITION;

int counter = 0;
Expand Down Expand Up @@ -370,48 +367,41 @@ private void checkResult(
List<DeleteFile> rewrittenDeletes,
List<DeleteFile> newDeletes,
int expectedGroups) {
Assert.assertEquals(
"Expected rewritten delete file count does not match",
rewrittenDeletes.size(),
result.rewrittenDeleteFilesCount());
Assert.assertEquals(
"Expected new delete file count does not match",
newDeletes.size(),
result.addedDeleteFilesCount());
Assert.assertEquals(
"Expected rewritten delete byte count does not match",
size(rewrittenDeletes),
result.rewrittenBytesCount());
Assert.assertEquals(
"Expected new delete byte count does not match",
size(newDeletes),
result.addedBytesCount());

Assert.assertEquals(
"Expected rewrite group count does not match",
expectedGroups,
result.rewriteResults().size());
Assert.assertEquals(
"Expected rewritten delete file count in all groups to match",
rewrittenDeletes.size(),
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum());
Assert.assertEquals(
"Expected added delete file count in all groups to match",
newDeletes.size(),
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum());
Assert.assertEquals(
"Expected rewritten delete bytes in all groups to match",
size(rewrittenDeletes),
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::rewrittenBytesCount)
.sum());
Assert.assertEquals(
"Expected added delete bytes in all groups to match",
size(newDeletes),
result.rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum());
assertThat(result.rewrittenDeleteFilesCount())
.as("Rewritten delete files")
.isEqualTo(rewrittenDeletes.size());
assertThat(result.addedDeleteFilesCount())
.as("Added delete files")
.isEqualTo(newDeletes.size());
assertThat(result.rewrittenBytesCount())
.as("Rewritten delete bytes")
.isEqualTo(size(rewrittenDeletes));
assertThat(result.addedBytesCount()).as("New Delete byte count").isEqualTo(size(newDeletes));

assertThat(result.rewriteResults()).as("Rewritten group count").hasSize(expectedGroups);
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum())
.as("Rewritten delete file count in all groups")
.isEqualTo(rewrittenDeletes.size());
assertThat(
result.rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum())
.as("Added delete file count in all groups")
.isEqualTo(newDeletes.size());
assertThat(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::rewrittenBytesCount)
.sum())
.as("Rewritten delete bytes in all groups")
.isEqualTo(size(rewrittenDeletes));
assertThat(
result.rewriteResults().stream()
.mapToLong(FileGroupRewriteResult::addedBytesCount)
.sum())
.as("Added delete bytes in all groups")
.isEqualTo(size(newDeletes));
}
}
Loading