Skip to content

Commit

Permalink
Core: Remove unused code for streaming position deletes (#11175)
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon committed Sep 26, 2024
1 parent 1e5dcb1 commit b1d38b3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 406 deletions.
171 changes: 19 additions & 152 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FilterIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -44,13 +41,9 @@
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Deletes {

private static final Logger LOG = LoggerFactory.getLogger(Deletes.class);

private static final Schema POSITION_DELETE_SCHEMA =
new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

Expand Down Expand Up @@ -192,27 +185,43 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
}
}

/**
* @deprecated since 1.7.0, will be removed in 1.8.0.
*/
@Deprecated
public static <T> CloseableIterable<T> streamingFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter());
}

/**
* @deprecated since 1.7.0, will be removed in 1.8.0.
*/
@Deprecated
public static <T> CloseableIterable<T> streamingFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
DeleteCounter counter) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes, counter);
PositionDeleteIndex positionIndex = toPositionIndex(posDeletes);
Predicate<T> isDeleted = row -> positionIndex.isDeleted(rowToPosition.apply(row));
return filterDeleted(rows, isDeleted, counter);
}

/**
* @deprecated since 1.7.0, will be removed in 1.8.0.
*/
@Deprecated
public static <T> CloseableIterable<T> streamingMarker(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
Consumer<T> markDeleted) {
return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted);
Consumer<T> markRowDeleted) {
PositionDeleteIndex positionIndex = toPositionIndex(posDeletes);
Predicate<T> isDeleted = row -> positionIndex.isDeleted(rowToPosition.apply(row));
return markDeleted(rows, isDeleted, markRowDeleted);
}

public static CloseableIterable<Long> deletePositions(
Expand Down Expand Up @@ -248,148 +257,6 @@ protected boolean shouldKeep(T row) {
}
}

private abstract static class PositionStreamDeleteIterable<T> extends CloseableGroup
implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> rowToPosition;
private long nextDeletePos;

PositionStreamDeleteIterable(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
this.rows = rows;
this.rowToPosition = rowToPosition;
this.deletePosIterator = deletePositions.iterator();
}

@Override
public CloseableIterator<T> iterator() {
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
nextDeletePos = deletePosIterator.next();
iter = applyDelete(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
}

addCloseable(iter);
addCloseable(deletePosIterator);

return iter;
}

boolean isDeleted(T row) {
long currentPos = rowToPosition.apply(row);
if (currentPos < nextDeletePos) {
return false;
}

// consume delete positions until the next is past the current position
boolean isDeleted = currentPos == nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
if (!isDeleted && currentPos == nextDeletePos) {
// if any delete position matches the current position
isDeleted = true;
}
}

return isDeleted;
}

protected abstract CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions);
}

private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteIterable<T> {
private final DeleteCounter counter;

PositionStreamDeleteFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions,
DeleteCounter counter) {
super(rows, rowToPosition, deletePositions);
this.counter = counter;
}

@Override
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
return new FilterIterator<T>(items) {
@Override
protected boolean shouldKeep(T item) {
boolean deleted = isDeleted(item);
if (deleted) {
counter.increment();
}

return !deleted;
}

@Override
public void close() {
try {
deletePositions.close();
} catch (IOException e) {
LOG.warn("Error closing delete file", e);
}
super.close();
}
};
}
}

private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteIterable<T> {
private final Consumer<T> markDeleted;

PositionStreamDeleteMarker(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions,
Consumer<T> markDeleted) {
super(rows, rowToPosition, deletePositions);
this.markDeleted = markDeleted;
}

@Override
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {

return new CloseableIterator<T>() {
@Override
public void close() {
try {
deletePositions.close();
} catch (IOException e) {
LOG.warn("Error closing delete file", e);
}
try {
items.close();
} catch (IOException e) {
LOG.warn("Error closing data file", e);
}
}

@Override
public boolean hasNext() {
return items.hasNext();
}

@Override
public T next() {
T row = items.next();
if (isDeleted(row)) {
markDeleted.accept(row);
}
return row;
}
};
}
}

private static class DataFileFilter<T extends StructLike> extends Filter<T> {
private final CharSequence dataLocation;

Expand Down
Loading

0 comments on commit b1d38b3

Please sign in to comment.