From 421b9a4e18de53671af2935873579873ee8b9fee Mon Sep 17 00:00:00 2001 From: btilbrook-nextfaze Date: Wed, 31 Jan 2018 20:04:50 +1030 Subject: [PATCH] 2.x: Document size-bounded replay emission retention caveat (#5827) (#5828) --- src/main/java/io/reactivex/Flowable.java | 24 +++++++++++++++++++ src/main/java/io/reactivex/Observable.java | 24 +++++++++++++++++++ .../reactivex/processors/ReplayProcessor.java | 4 ++++ .../io/reactivex/subjects/ReplaySubject.java | 3 +++ 4 files changed, 55 insertions(+) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index d4c18160c2..7c861b8d92 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -11234,6 +11234,9 @@ public final Flowable replay(Function, ? extends Publ * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, * replaying {@code bufferSize} notifications. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11270,6 +11273,9 @@ public final Flowable replay(Function, ? extends Publ * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, * replaying no more than {@code bufferSize} items that were emitted within a specified time window. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11309,6 +11315,9 @@ public final Flowable replay(Function, ? extends Publ * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, * replaying no more than {@code bufferSize} items that were emitted within a specified time window. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11357,6 +11366,9 @@ public final Flowable replay(Function, ? extends Publ * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, * replaying a maximum of {@code bufferSize} items. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11512,6 +11524,9 @@ public final Flowable replay(final Function, ? extend * an ordinary Publisher, except that it does not begin emitting items when it is subscribed to, but only * when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11542,6 +11557,9 @@ public final ConnectableFlowable replay(final int bufferSize) { * Publisher resembles an ordinary Publisher, except that it does not begin emitting items when it is * subscribed to, but only when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11576,6 +11594,9 @@ public final ConnectableFlowable replay(int bufferSize, long time, TimeUnit u * Connectable Publisher resembles an ordinary Publisher, except that it does not begin emitting items * when it is subscribed to, but only when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
@@ -11618,6 +11639,9 @@ public final ConnectableFlowable replay(final int bufferSize, final long time * an ordinary Publisher, except that it does not begin emitting items when it is subscribed to, but only * when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Backpressure:
diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index f2255018e8..e4fd63fd8d 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -9498,6 +9498,9 @@ public final Observable replay(Function, ? extends * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, * replaying {@code bufferSize} notifications. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9529,6 +9532,9 @@ public final Observable replay(Function, ? extends * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, * replaying no more than {@code bufferSize} items that were emitted within a specified time window. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9563,6 +9569,9 @@ public final Observable replay(Function, ? extends * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, * replaying no more than {@code bufferSize} items that were emitted within a specified time window. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9606,6 +9615,9 @@ public final Observable replay(Function, ? extends * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, * replaying a maximum of {@code bufferSize} items. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9740,6 +9752,9 @@ public final Observable replay(final Function, ? ex * an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only * when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9765,6 +9780,9 @@ public final ConnectableObservable replay(final int bufferSize) { * ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items when it is * subscribed to, but only when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9794,6 +9812,9 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit * Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items * when it is subscribed to, but only when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
@@ -9830,6 +9851,9 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only * when its {@code connect} method is called. *

+ * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than + * {@code bufferSize} source emissions. + *

* *

*
Scheduler:
diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index ac6a6c240d..cdfcb0a3af 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -66,6 +66,10 @@ * if an individual item gets delayed due to backpressure. * *

+ * Due to concurrency requirements, a size-bounded {@code ReplayProcessor} may hold strong references to more source + * emissions than specified. + * + *

* Example usage: *

 {@code
 
diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java
index 2cdc5cb04a..fe89540733 100644
--- a/src/main/java/io/reactivex/subjects/ReplaySubject.java
+++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java
@@ -92,6 +92,9 @@
  * {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the retained/cached items
  * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
  * {@link #getValues()} or {@link #getValues(Object[])}.
+ * 

+ * Note that due to concurrency requirements, a size-bounded {@code ReplaySubject} may hold strong references to more + * source emissions than specified. *

*
Scheduler:
*
{@code ReplaySubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and