-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix leak of DeleteFile streams #8132
Conversation
@rdblue @aokolnychyi @RussellSpitzer @flyrain fyi. @flyrain Also let me know if there are some other case you are aware this may affect, for reading delete files |
IIUC, this is due to missing of resource cleanup for
There are more places we need to change. For example, we didn't close the iterable here, and this place. Other than your PR, one of options is to close the iterable explicitly in the reader. Each reader extends the
But I still does't feel comfortable about it. A developer can easily get an iterator from an iterable, and forget to close the iterable. I'm also not sure what's the best way for this. Open to suggestions. |
I'll have to take a look at those places. Yea the problem is, I think traditionally we can't rely on reader closing the iterator , much less literable. In Spark for example, we dont return the iterable, and even the iterator does not have any close method. https://github.com/apache/spark/blob/bdeae87067452bb41f4776c4ab444a9d9645fdfc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L110 So hence Iceberg implementing auto-close iterators. |
@flyrain i checked, it works today because the end iterator is always FilterIterator: https://github.com/apache/iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L154 which does auto-close (as i mentioned in description): The only problem is that close() does not close secondary iterators (deleteFileIterator), which is only in those places I showed. |
97f4448
to
4ea5abe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szehon-ho for the change. Yeah, these two places are covered by the change.
- https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java#L96-L96
- https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L340-L340
LGTM overall. Left minor comments.
It's worth to note that the current implementation of delete filter( class PositionStreamDeleteFilter
and PositionStreamDeleteMarker
) cannot guarantee to close the iterable properly, despite it extending ClosableGroup. This raises questions about the necessity of using an iterable here. Iterable is an interface allowing an object to be the target of the "for-each loop" statement. While we don't require delete filters to be used in a for-each loop, we do need the method next(). Therefore, it might be more appropriate to use an iterator instead, which will
- serve our required purpose.
- reduce the complexity by removing iterable
- prevent closing issue of iterables
However, making this change would be a significant modification, so it would be better to handle it in a separate PR for better code management and easier review.
this(filePath, deletes, tableSchema, requestedSchema, counter, DEFAULT_SET_FILTER_THRESHOLD); | ||
} | ||
|
||
protected DeleteFilter( | ||
String filePath, | ||
List<DeleteFile> deletes, | ||
Schema tableSchema, | ||
Schema requestedSchema, | ||
DeleteCounter counter, | ||
long setFilterThreshold) { | ||
this.setFilterThreshold = setFilterThreshold; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: is this change needed? I assume this change will make test easier by setting a customized threshold. However, there is no such test case yet.
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
public class TestDeleteFilter extends TableTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put these tests into class TestPositionFilter
. What we do here is to check if both data file iterator and delete file iterator are closed, would it be easier to modify test case like TestPositionFilter::testPositionStreamRowFilter
for that purpose?
ed4d678
to
b4eac9e
Compare
@flyrain thanks! i addressed the review comments, when you get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks @szehon-ho for working on this.
Merged, thanks @flyrain for the review ! |
Problem
We observed S3 inputStream leak and eventual exhaustion in a simple Spark job reading a partition with several delete files, where we had several tasks on same worker applying deletes.
Spark S3 Exception
Spark S3 Warnings
``` WARN S3InputStream: Unclosed input stream created by: org.apache.iceberg.aws.s3.S3InputStream.(S3InputStream.java:73) org.apache.iceberg.aws.s3.S3InputFile.newStream(S3InputFile.java:83) org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:184) org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:774) org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658) org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:245) org.apache.iceberg.parquet.ReadConf.(ReadConf.java:81) org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71) org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91) org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:37) org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34) org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72) org.apache.iceberg.io.CloseableIterable$7$1.(CloseableIterable.java:188) org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187) java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720) java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) org.apache.iceberg.util.SortedMerge.iterator(SortedMerge.java:56) org.apache.iceberg.deletes.Deletes$PositionStreamDeleteIterable.(Deletes.java:214) org.apache.iceberg.deletes.Deletes$PositionStreamDeleteFilter.(Deletes.java:263) org.apache.iceberg.deletes.Deletes.streamingFilter(Deletes.java:157) org.apache.iceberg.data.DeleteFilter.applyPosDeletes(DeleteFilter.java:258) org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:154) org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:92) org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:42) org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:135) org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119) org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) scala.Option.exists(Option.scala:376) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) org.apache.spark.rdd.RDD.iterator(RDD.scala:329) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) org.apache.spark.scheduler.Task.run(Task.scala:136) org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1513) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) ```Description
It manifested in Spark but it seems its a generic problem in Core. The application of DeleteFile to DataFile has three cases.
The bug concerns only the third case. Here, we do not close the DeleteFile iterator, when we close the returned row iterator.
There is some code that added the DeleteFile iterator to closeableGroup of the Iterable, but the problem is, we do not return the Iterable to Spark, rather we return the Iterator (iterable.iterator())
Fix
Add close of the DeleteFile iterator , when we close the final iterator returned to Spark.
The other part of the problem is, Spark does not explicitly close the row iterator [1]. They just walk to the end of it, and it is Iterator's responsibility to close when it is exhausted on the last hasNext(). See our FilterIterator for instance, which does this [2].
Hence, implemented that in the markDeleted (CDC) version of the delete iterators, that does not use FilterIterator.
Refs: