From d9ec10673070f650a616a09bc8e4b6c0cef0ad59 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 20 Sep 2024 11:38:12 -0700 Subject: [PATCH 1/3] Remove unused code for streaming position deletes. --- .palantir/revapi.yml | 14 + .../org/apache/iceberg/deletes/Deletes.java | 172 ------------ .../iceberg/deletes/TestPositionFilter.java | 254 ------------------ 3 files changed, 14 insertions(+), 426 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 9b8017f0beec..4416eec37681 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1091,6 +1091,20 @@ acceptedBreaks: - code: "java.class.removed" old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable,\ + \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable)" + justification: "Remove unused method" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable,\ + \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable,\ + \ org.apache.iceberg.deletes.DeleteCounter)" + justification: "Remove unused method" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingMarker(org.apache.iceberg.io.CloseableIterable,\ + \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable,\ + \ java.util.function.Consumer)" + justification: "Remove unused method" - code: "java.method.removed" old: "method java.lang.String org.apache.iceberg.FileScanTaskParser::toJson(org.apache.iceberg.FileScanTask)" justification: "Removing deprecated code" diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index cef57cd16726..77e6f8020298 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -29,10 +29,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FilterIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -44,13 +41,9 @@ import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.ThreadPools; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class Deletes { - private static final Logger LOG = LoggerFactory.getLogger(Deletes.class); - private static final Schema POSITION_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); @@ -192,29 +185,6 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable posDel } } - public static CloseableIterable streamingFilter( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable posDeletes) { - return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter()); - } - - public static CloseableIterable streamingFilter( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable posDeletes, - DeleteCounter counter) { - return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes, counter); - } - - public static CloseableIterable streamingMarker( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable posDeletes, - Consumer markDeleted) { - return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted); - } - public static CloseableIterable deletePositions( CharSequence dataLocation, CloseableIterable deleteFile) { return deletePositions(dataLocation, ImmutableList.of(deleteFile)); @@ -248,148 +218,6 @@ protected boolean shouldKeep(T row) { } } - private abstract static class PositionStreamDeleteIterable extends CloseableGroup - implements CloseableIterable { - private final CloseableIterable rows; - private final CloseableIterator deletePosIterator; - private final Function rowToPosition; - private long nextDeletePos; - - PositionStreamDeleteIterable( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable deletePositions) { - this.rows = rows; - this.rowToPosition = rowToPosition; - this.deletePosIterator = deletePositions.iterator(); - } - - @Override - public CloseableIterator iterator() { - CloseableIterator iter; - if (deletePosIterator.hasNext()) { - nextDeletePos = deletePosIterator.next(); - iter = applyDelete(rows.iterator(), deletePosIterator); - } else { - iter = rows.iterator(); - } - - addCloseable(iter); - addCloseable(deletePosIterator); - - return iter; - } - - boolean isDeleted(T row) { - long currentPos = rowToPosition.apply(row); - if (currentPos < nextDeletePos) { - return false; - } - - // consume delete positions until the next is past the current position - boolean isDeleted = currentPos == nextDeletePos; - while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) { - this.nextDeletePos = deletePosIterator.next(); - if (!isDeleted && currentPos == nextDeletePos) { - // if any delete position matches the current position - isDeleted = true; - } - } - - return isDeleted; - } - - protected abstract CloseableIterator applyDelete( - CloseableIterator items, CloseableIterator deletePositions); - } - - private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { - private final DeleteCounter counter; - - PositionStreamDeleteFilter( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable deletePositions, - DeleteCounter counter) { - super(rows, rowToPosition, deletePositions); - this.counter = counter; - } - - @Override - protected CloseableIterator applyDelete( - CloseableIterator items, CloseableIterator deletePositions) { - return new FilterIterator(items) { - @Override - protected boolean shouldKeep(T item) { - boolean deleted = isDeleted(item); - if (deleted) { - counter.increment(); - } - - return !deleted; - } - - @Override - public void close() { - try { - deletePositions.close(); - } catch (IOException e) { - LOG.warn("Error closing delete file", e); - } - super.close(); - } - }; - } - } - - private static class PositionStreamDeleteMarker extends PositionStreamDeleteIterable { - private final Consumer markDeleted; - - PositionStreamDeleteMarker( - CloseableIterable rows, - Function rowToPosition, - CloseableIterable deletePositions, - Consumer markDeleted) { - super(rows, rowToPosition, deletePositions); - this.markDeleted = markDeleted; - } - - @Override - protected CloseableIterator applyDelete( - CloseableIterator items, CloseableIterator deletePositions) { - - return new CloseableIterator() { - @Override - public void close() { - try { - deletePositions.close(); - } catch (IOException e) { - LOG.warn("Error closing delete file", e); - } - try { - items.close(); - } catch (IOException e) { - LOG.warn("Error closing data file", e); - } - } - - @Override - public boolean hasNext() { - return items.hasNext(); - } - - @Override - public T next() { - T row = items.next(); - if (isDeleted(row)) { - markDeleted.accept(row); - } - return row; - } - }; - } - } - private static class DataFileFilter extends Filter { private final CharSequence dataLocation; diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index 8e35c8c9fc99..2a629b2dc2b3 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -20,20 +20,16 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.StructLike; import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -110,155 +106,6 @@ public void testPositionMerging() { .containsExactly(0L, 3L, 3L, 9L, 16L, 19L, 19L, 22L, 22L, 56L, 63L, 70L, 91L); } - @Test - public void testPositionStreamRowFilter() { - CloseableIterable rows = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of(0L, "a"), - Row.of(1L, "b"), - Row.of(2L, "c"), - Row.of(3L, "d"), - Row.of(4L, "e"), - Row.of(5L, "f"), - Row.of(6L, "g"), - Row.of(7L, "h"), - Row.of(8L, "i"), - Row.of(9L, "j"))); - - CloseableIterable deletes = - CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - - CloseableIterable actual = - Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); - - assertThat(Iterables.transform(actual, row -> row.get(0, Long.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L)); - } - - @Test - public void testPositionStreamRowDeleteMarker() { - CloseableIterable rows = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of(0L, "a", false), - Row.of(1L, "b", false), - Row.of(2L, "c", false), - Row.of(3L, "d", false), - Row.of(4L, "e", false), - Row.of(5L, "f", false), - Row.of(6L, "g", false), - Row.of(7L, "h", false), - Row.of(8L, "i", false), - Row.of(9L, "j", false))); - - CloseableIterable deletes = - CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - - CloseableIterable actual = - Deletes.streamingMarker( - rows, - row -> row.get(0, Long.class), /* row to position */ - deletes, - row -> row.set(2, true) /* delete marker */); - - assertThat(Iterables.transform(actual, row -> row.get(2, Boolean.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf( - Lists.newArrayList(true, false, false, true, true, false, false, true, false, true)); - } - - @Test - public void testPositionStreamRowFilterWithDuplicates() { - CloseableIterable rows = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of(0L, "a"), - Row.of(1L, "b"), - Row.of(2L, "c"), - Row.of(3L, "d"), - Row.of(4L, "e"), - Row.of(5L, "f"), - Row.of(6L, "g"), - Row.of(7L, "h"), - Row.of(8L, "i"), - Row.of(9L, "j"))); - - CloseableIterable deletes = - CloseableIterable.withNoopClose(Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L)); - - CloseableIterable actual = - Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); - - assertThat(Iterables.transform(actual, row -> row.get(0, Long.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L)); - } - - @Test - public void testPositionStreamRowFilterWithRowGaps() { - // test the case where row position is greater than the delete position - CloseableIterable rows = - CloseableIterable.withNoopClose( - Lists.newArrayList(Row.of(2L, "c"), Row.of(3L, "d"), Row.of(5L, "f"), Row.of(6L, "g"))); - - CloseableIterable deletes = - CloseableIterable.withNoopClose(Lists.newArrayList(0L, 2L, 3L, 4L, 7L, 9L)); - - CloseableIterable actual = - Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); - - assertThat(Iterables.transform(actual, row -> row.get(0, Long.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(5L, 6L)); - } - - @Test - public void testCombinedPositionStreamRowFilter() { - CloseableIterable positionDeletes1 = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of("file_a.avro", 0L), - Row.of("file_a.avro", 3L), - Row.of("file_a.avro", 9L), - Row.of("file_b.avro", 5L), - Row.of("file_b.avro", 6L))); - - CloseableIterable positionDeletes2 = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of("file_a.avro", 3L), - Row.of("file_a.avro", 4L), - Row.of("file_a.avro", 7L), - Row.of("file_b.avro", 2L))); - - CloseableIterable rows = - CloseableIterable.withNoopClose( - Lists.newArrayList( - Row.of(0L, "a"), - Row.of(1L, "b"), - Row.of(2L, "c"), - Row.of(3L, "d"), - Row.of(4L, "e"), - Row.of(5L, "f"), - Row.of(6L, "g"), - Row.of(7L, "h"), - Row.of(8L, "i"), - Row.of(9L, "j"))); - - CloseableIterable actual = - Deletes.streamingFilter( - rows, - row -> row.get(0, Long.class), - Deletes.deletePositions( - "file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); - - assertThat(Iterables.transform(actual, row -> row.get(0, Long.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L)); - } - @Test public void testPositionSetRowFilter() { CloseableIterable rows = @@ -341,105 +188,4 @@ public void testCombinedPositionSetRowFilter(ExecutorService executorService) { .as("Filter should produce expected rows") .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L)); } - - @Test - public void testClosePositionStreamRowDeleteMarker() { - List deletes = Lists.newArrayList(1L, 2L); - - List records = - Lists.newArrayList( - Row.of(29, "a", 1L), Row.of(43, "b", 2L), Row.of(61, "c", 3L), Row.of(89, "d", 4L)); - - CheckingClosableIterable data = new CheckingClosableIterable<>(records); - CheckingClosableIterable deletePositions = new CheckingClosableIterable<>(deletes); - - CloseableIterable posDeletesIterable = - Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); - - // end iterator is always wrapped with FilterIterator - CloseableIterable eqDeletesIterable = - Deletes.filterDeleted(posDeletesIterable, i -> false, new DeleteCounter()); - List result = Lists.newArrayList(eqDeletesIterable.iterator()); - - // as first two records deleted, expect only last two records - assertThat(Iterables.transform(result, row -> row.get(2, Long.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(3L, 4L)); - - assertThat(data.isClosed).isTrue(); - assertThat(deletePositions.isClosed).isTrue(); - } - - @Test - public void testDeleteMarkerFileClosed() { - - List deletes = Lists.newArrayList(1L, 2L); - - List records = - Lists.newArrayList( - Row.of(29, "a", 1L, false), - Row.of(43, "b", 2L, false), - Row.of(61, "c", 3L, false), - Row.of(89, "d", 4L, false)); - - CheckingClosableIterable data = new CheckingClosableIterable<>(records); - CheckingClosableIterable deletePositions = new CheckingClosableIterable<>(deletes); - - CloseableIterable resultIterable = - Deletes.streamingMarker( - data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true)); - - // end iterator is always wrapped with FilterIterator - CloseableIterable eqDeletesIterable = - Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter()); - List result = Lists.newArrayList(eqDeletesIterable.iterator()); - - // as first two records deleted, expect only those two records marked - assertThat(Iterables.transform(result, row -> row.get(3, Boolean.class))) - .as("Filter should produce expected rows") - .containsExactlyElementsOf(Lists.newArrayList(true, true, false, false)); - - assertThat(data.isClosed).isTrue(); - assertThat(deletePositions.isClosed).isTrue(); - } - - private static class CheckingClosableIterable implements CloseableIterable { - AtomicBoolean isClosed = new AtomicBoolean(false); - final Iterable iterable; - - CheckingClosableIterable(Iterable iterable) { - this.iterable = iterable; - } - - public boolean isClosed() { - return isClosed.get(); - } - - @Override - public void close() throws IOException { - isClosed.set(true); - } - - @Override - public CloseableIterator iterator() { - Iterator it = iterable.iterator(); - return new CloseableIterator() { - - @Override - public boolean hasNext() { - return it.hasNext(); - } - - @Override - public E next() { - return it.next(); - } - - @Override - public void close() { - isClosed.set(true); - } - }; - } - } } From 8d7382d66022c4183034bb27e689f5ecbb8d0805 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 20 Sep 2024 15:41:03 -0700 Subject: [PATCH 2/3] Restore removed methods but deprecate them. Implement them the same way that the non-streaming versions are implemented so we can remove the classes that are no longer needed. --- .palantir/revapi.yml | 14 ------- .../org/apache/iceberg/deletes/Deletes.java | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4416eec37681..9b8017f0beec 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1091,20 +1091,6 @@ acceptedBreaks: - code: "java.class.removed" old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" justification: "Removing deprecated code" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable,\ - \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable)" - justification: "Remove unused method" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingFilter(org.apache.iceberg.io.CloseableIterable,\ - \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable,\ - \ org.apache.iceberg.deletes.DeleteCounter)" - justification: "Remove unused method" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.CloseableIterable org.apache.iceberg.deletes.Deletes::streamingMarker(org.apache.iceberg.io.CloseableIterable,\ - \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable,\ - \ java.util.function.Consumer)" - justification: "Remove unused method" - code: "java.method.removed" old: "method java.lang.String org.apache.iceberg.FileScanTaskParser::toJson(org.apache.iceberg.FileScanTask)" justification: "Removing deprecated code" diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 77e6f8020298..5b7054a029e6 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -185,6 +185,45 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable posDel } } + /** + * @deprecated since 1.7.0, will be removed in future. + */ + @Deprecated + public static CloseableIterable streamingFilter( + CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes) { + return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter()); + } + + /** + * @deprecated since 1.7.0, will be removed in future. + */ + @Deprecated + public static CloseableIterable streamingFilter( + CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + DeleteCounter counter) { + PositionDeleteIndex positionIndex = toPositionIndex(posDeletes); + Predicate isDeleted = row -> positionIndex.isDeleted(rowToPosition.apply(row)); + return filterDeleted(rows, isDeleted, counter); + } + + /** + * @deprecated since 1.7.0, will be removed in future. + */ + @Deprecated + public static CloseableIterable streamingMarker( + CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + Consumer markRowDeleted) { + PositionDeleteIndex positionIndex = toPositionIndex(posDeletes); + Predicate isDeleted = row -> positionIndex.isDeleted(rowToPosition.apply(row)); + return markDeleted(rows, isDeleted, markRowDeleted); + } + public static CloseableIterable deletePositions( CharSequence dataLocation, CloseableIterable deleteFile) { return deletePositions(dataLocation, ImmutableList.of(deleteFile)); From 4e45c7d3dacf721c56843c1dcebdb4f48a2f38f0 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 25 Sep 2024 07:27:54 -0700 Subject: [PATCH 3/3] Update deprecation comment. --- core/src/main/java/org/apache/iceberg/deletes/Deletes.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 5b7054a029e6..a72e01613040 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -186,7 +186,7 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable posDel } /** - * @deprecated since 1.7.0, will be removed in future. + * @deprecated since 1.7.0, will be removed in 1.8.0. */ @Deprecated public static CloseableIterable streamingFilter( @@ -197,7 +197,7 @@ public static CloseableIterable streamingFilter( } /** - * @deprecated since 1.7.0, will be removed in future. + * @deprecated since 1.7.0, will be removed in 1.8.0. */ @Deprecated public static CloseableIterable streamingFilter( @@ -211,7 +211,7 @@ public static CloseableIterable streamingFilter( } /** - * @deprecated since 1.7.0, will be removed in future. + * @deprecated since 1.7.0, will be removed in 1.8.0. */ @Deprecated public static CloseableIterable streamingMarker(