From f6bb9173b13424d77e7ad8439b5ef9627e530cb2 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sat, 28 Oct 2023 12:33:22 -0700 Subject: [PATCH] Core: Ignore split offsets array when split offset is past file length (#8925) (#8938) --- .../java/org/apache/iceberg/BaseFile.java | 24 ++++++---- .../iceberg/util/TestTableScanUtil.java | 46 +++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 31e91457ad40..daa209d0b401 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -460,21 +460,27 @@ public ByteBuffer keyMetadata() { @Override public List splitOffsets() { - if (splitOffsets == null || splitOffsets.length == 0) { - return null; + if (hasWellDefinedOffsets()) { + return ArrayUtil.toUnmodifiableLongList(splitOffsets); } - // If the last split offset is past the file size this means the split offsets are corrupted and - // should not be used - if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) { - return null; + return null; + } + + long[] splitOffsetArray() { + if (hasWellDefinedOffsets()) { + return splitOffsets; } - return ArrayUtil.toUnmodifiableLongList(splitOffsets); + return null; } - long[] splitOffsetArray() { - return splitOffsets; + private boolean hasWellDefinedOffsets() { + // If the last split offset is past the file size this means the split offsets are corrupted and + // should not be used + return splitOffsets != null + && splitOffsets.length != 0 + && splitOffsets[splitOffsets.length - 1] < fileSizeInBytes; } @Override diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java index 0dff941616ab..cfe3cb625e4f 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java @@ -25,19 +25,26 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.BaseFileScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MergeableScanTask; import org.apache.iceberg.MockFileScanTask; import org.apache.iceberg.PartitionScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SplittableScanTask; import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -126,6 +133,45 @@ public void testTaskGroupPlanning() { assertThat(taskGroups).as("Must have 3 task groups").hasSize(3); } + @Test + public void testTaskGroupPlanningCorruptedOffset() { + DataFile dataFile = + DataFiles.builder(TableTestBase.SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withSplitOffsets( + ImmutableList.of(2L, 12L)) // the last offset is beyond the end of the file + .build(); + + ResidualEvaluator residualEvaluator = + ResidualEvaluator.of(TableTestBase.SPEC, Expressions.equal("id", 1), false); + + BaseFileScanTask baseFileScanTask = + new BaseFileScanTask( + dataFile, + null, + SchemaParser.toJson(TableTestBase.SCHEMA), + PartitionSpecParser.toJson(TableTestBase.SPEC), + residualEvaluator); + + List baseFileScanTasks = ImmutableList.of(baseFileScanTask); + + int taskCount = 0; + for (ScanTaskGroup task : + TableScanUtil.planTaskGroups(CloseableIterable.withNoopClose(baseFileScanTasks), 1, 1, 0)) { + for (FileScanTask fileScanTask : task.tasks()) { + DataFile taskDataFile = fileScanTask.file(); + Assertions.assertThat(taskDataFile.splitOffsets()).isNull(); + taskCount++; + } + } + + // 10 tasks since the split offsets are ignored and there are 1 byte splits for a 10 byte file + Assertions.assertThat(taskCount).isEqualTo(10); + } + @Test public void testTaskMerging() { List tasks =