Skip to content

Commit

Permalink
Core: Ignore split offsets array when split offset is past file length
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Oct 27, 2023
1 parent aa891ac commit 0ff2819
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 15 deletions.
24 changes: 15 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,21 +460,27 @@ public ByteBuffer keyMetadata() {

@Override
public List<Long> splitOffsets() {
if (splitOffsets == null || splitOffsets.length == 0) {
return null;
if (hasWellDefinedOffsets()) {
return ArrayUtil.toUnmodifiableLongList(splitOffsets);
}

// If the last split offset is past the file size this means the split offsets are corrupted and
// should not be used
if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) {
return null;
return null;
}

long[] splitOffsetArray() {
if (hasWellDefinedOffsets()) {
return splitOffsets;
}

return ArrayUtil.toUnmodifiableLongList(splitOffsets);
return null;
}

long[] splitOffsetArray() {
return splitOffsets;
private boolean hasWellDefinedOffsets() {
// If the last split offset is past the file size this means the split offsets are corrupted and
// should not be used
return splitOffsets != null
&& splitOffsets.length != 0
&& splitOffsets[splitOffsets.length - 1] < fileSizeInBytes;
}

@Override
Expand Down
54 changes: 48 additions & 6 deletions core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -216,6 +217,34 @@ public void testSplitPlanningWithOffsets() {
"We should get one task per row group", 32, Iterables.size(scan.planTasks()));
}

@Test
public void testSplitPlanningWithCorruptedOffsets() throws IOException {
List<DataFile> files16Mb = newFilesWithInvalidOffset(16, 16 * 1024 * 1024, 2);
appendFiles(files16Mb);

// There will be 4 tasks per file, instead of 2 tasks per file, since the offsets are invalid
// and will be ignored.
TableScan scan =
table
.newScan()
.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(0))
.option(TableProperties.SPLIT_SIZE, String.valueOf(4L * 1024 * 1024));

Assert.assertEquals(
"We should get four tasks per file since the offsets will be ignored",
64,
Iterables.size(scan.planTasks()));
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
CombinedScanTask task = tasks.iterator().next().asCombinedScanTask();
Collection<FileScanTask> files = task.files();
for (FileScanTask fileScanTask : files) {
DataFile dataFile = fileScanTask.file();
Assert.assertNull(
"Split offsets should be null since they are invalid", dataFile.splitOffsets());
}
}
}

@Test
public void testSplitPlanningWithOffsetsUnableToSplit() {
List<DataFile> files16Mb = newFiles(16, 16 * 1024 * 1024, 2);
Expand Down Expand Up @@ -291,27 +320,36 @@ private void appendFiles(Iterable<DataFile> files) {
}

private List<DataFile> newFiles(int numFiles, long sizeInBytes) {
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, 1);
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, 1, false);
}

private List<DataFile> newFiles(int numFiles, long sizeInBytes, int numOffset) {
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, numOffset);
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, numOffset, false);
}

private List<DataFile> newFilesWithInvalidOffset(int numFiles, long sizeInBytes, int numOffset) {
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, numOffset, true);
}

private List<DataFile> newFiles(int numFiles, long sizeInBytes, FileFormat fileFormat) {
return newFiles(numFiles, sizeInBytes, fileFormat, 1);
return newFiles(numFiles, sizeInBytes, fileFormat, 1, false);
}

private List<DataFile> newFiles(
int numFiles, long sizeInBytes, FileFormat fileFormat, int numOffset) {
int numFiles,
long sizeInBytes,
FileFormat fileFormat,
int numOffset,
boolean produceInvalidOffset) {
List<DataFile> files = Lists.newArrayList();
for (int fileNum = 0; fileNum < numFiles; fileNum++) {
files.add(newFile(sizeInBytes, fileFormat, numOffset));
files.add(newFile(sizeInBytes, fileFormat, numOffset, produceInvalidOffset));
}
return files;
}

private DataFile newFile(long sizeInBytes, FileFormat fileFormat, int numOffsets) {
private DataFile newFile(
long sizeInBytes, FileFormat fileFormat, int numOffsets, boolean produceInvalidOffset) {
String fileName = UUID.randomUUID().toString();
Builder builder =
DataFiles.builder(PartitionSpec.unpartitioned())
Expand All @@ -326,6 +364,10 @@ private DataFile newFile(long sizeInBytes, FileFormat fileFormat, int numOffsets
.map(i -> i * stepSize)
.boxed()
.collect(Collectors.toList());
if (produceInvalidOffset) {
offsets.add(sizeInBytes + 1);
}

builder.withSplitOffsets(offsets);
}

Expand Down

0 comments on commit 0ff2819

Please sign in to comment.