Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Remove unused code for streaming position deletes #11175

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,20 @@ acceptedBreaks:
- code: "java.class.removed"
old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus"
justification: "Removing deprecated code"
- code: "java.method.removed"
old: "method <T> org.apache.iceberg.io.CloseableIterable<T> org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable<T>,\
\ java.util.function.Function<T, java.lang.Long>, org.apache.iceberg.io.CloseableIterable<java.lang.Long>)"
justification: "Remove unused method"
- code: "java.method.removed"
old: "method <T> org.apache.iceberg.io.CloseableIterable<T> org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable<T>,\
\ java.util.function.Function<T, java.lang.Long>, org.apache.iceberg.io.CloseableIterable<java.lang.Long>,\
\ org.apache.iceberg.deletes.DeleteCounter)"
justification: "Remove unused method"
- code: "java.method.removed"
old: "method <T> org.apache.iceberg.io.CloseableIterable<T> org.apache.iceberg.deletes.Deletes::streamingMarker(org.apache.iceberg.io.CloseableIterable<T>,\
\ java.util.function.Function<T, java.lang.Long>, org.apache.iceberg.io.CloseableIterable<java.lang.Long>,\
\ java.util.function.Consumer<T>)"
justification: "Remove unused method"
- code: "java.method.removed"
old: "method java.lang.String org.apache.iceberg.FileScanTaskParser::toJson(org.apache.iceberg.FileScanTask)"
justification: "Removing deprecated code"
Expand Down
172 changes: 0 additions & 172 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FilterIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -44,13 +41,9 @@
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Deletes {

private static final Logger LOG = LoggerFactory.getLogger(Deletes.class);

private static final Schema POSITION_DELETE_SCHEMA =
new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

Expand Down Expand Up @@ -192,29 +185,6 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
}
}

public static <T> CloseableIterable<T> streamingFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter());
}

public static <T> CloseableIterable<T> streamingFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
DeleteCounter counter) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes, counter);
}

public static <T> CloseableIterable<T> streamingMarker(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
Consumer<T> markDeleted) {
return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted);
}

wypoon marked this conversation as resolved.
Show resolved Hide resolved
public static CloseableIterable<Long> deletePositions(
CharSequence dataLocation, CloseableIterable<StructLike> deleteFile) {
return deletePositions(dataLocation, ImmutableList.of(deleteFile));
Expand Down Expand Up @@ -248,148 +218,6 @@ protected boolean shouldKeep(T row) {
}
}

private abstract static class PositionStreamDeleteIterable<T> extends CloseableGroup
implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> rowToPosition;
private long nextDeletePos;

PositionStreamDeleteIterable(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
this.rows = rows;
this.rowToPosition = rowToPosition;
this.deletePosIterator = deletePositions.iterator();
}

@Override
public CloseableIterator<T> iterator() {
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
nextDeletePos = deletePosIterator.next();
iter = applyDelete(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
}

addCloseable(iter);
addCloseable(deletePosIterator);

return iter;
}

boolean isDeleted(T row) {
long currentPos = rowToPosition.apply(row);
if (currentPos < nextDeletePos) {
return false;
}

// consume delete positions until the next is past the current position
boolean isDeleted = currentPos == nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
if (!isDeleted && currentPos == nextDeletePos) {
// if any delete position matches the current position
isDeleted = true;
}
}

return isDeleted;
}

protected abstract CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions);
}

private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteIterable<T> {
private final DeleteCounter counter;

PositionStreamDeleteFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions,
DeleteCounter counter) {
super(rows, rowToPosition, deletePositions);
this.counter = counter;
}

@Override
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
return new FilterIterator<T>(items) {
@Override
protected boolean shouldKeep(T item) {
boolean deleted = isDeleted(item);
if (deleted) {
counter.increment();
}

return !deleted;
}

@Override
public void close() {
try {
deletePositions.close();
} catch (IOException e) {
LOG.warn("Error closing delete file", e);
}
super.close();
}
};
}
}

private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteIterable<T> {
private final Consumer<T> markDeleted;

PositionStreamDeleteMarker(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions,
Consumer<T> markDeleted) {
super(rows, rowToPosition, deletePositions);
this.markDeleted = markDeleted;
}

@Override
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {

return new CloseableIterator<T>() {
@Override
public void close() {
try {
deletePositions.close();
} catch (IOException e) {
LOG.warn("Error closing delete file", e);
}
try {
items.close();
} catch (IOException e) {
LOG.warn("Error closing data file", e);
}
}

@Override
public boolean hasNext() {
return items.hasNext();
}

@Override
public T next() {
T row = items.next();
if (isDeleted(row)) {
markDeleted.accept(row);
}
return row;
}
};
}
}

private static class DataFileFilter<T extends StructLike> extends Filter<T> {
private final CharSequence dataLocation;

Expand Down
Loading