From 6231f7f93bdeab2448a1da80785c7590867a3990 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 23 Apr 2014 11:23:35 +0200 Subject: [PATCH] OperatorBuffer --- rxjava-core/src/main/java/rx/Observable.java | 29 +- .../java/rx/operators/OperationBuffer.java | 566 ------------------ .../OperatorBufferWithSingleObservable.java | 190 ++++++ .../rx/operators/OperatorBufferWithSize.java | 141 +++++ .../OperatorBufferWithStartEndObservable.java | 215 +++++++ .../rx/operators/OperatorBufferWithTime.java | 302 ++++++++++ .../rx/operators/OperationBufferTest.java | 296 ++++++++- 7 files changed, 1143 insertions(+), 596 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationBuffer.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorBufferWithSingleObservable.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorBufferWithSize.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorBufferWithStartEndObservable.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7fea0a1fb1..9598e5cdef 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -48,7 +48,6 @@ import rx.observers.SafeSubscriber; import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; -import rx.operators.OperationBuffer; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDebounce; @@ -93,6 +92,10 @@ import rx.operators.OperatorAmb; import rx.operators.OperatorAny; import rx.operators.OperatorAsObservable; +import rx.operators.OperatorBufferWithSingleObservable; +import rx.operators.OperatorBufferWithSize; +import rx.operators.OperatorBufferWithStartEndObservable; +import rx.operators.OperatorBufferWithTime; import rx.operators.OperatorCache; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; @@ -2973,7 +2976,7 @@ public final Observable asObservable() { * @see RxJava Wiki: buffer() */ public final Observable> buffer(Func0> bufferClosingSelector) { - return create(OperationBuffer.buffer(this, bufferClosingSelector)); + return lift(new OperatorBufferWithSingleObservable(bufferClosingSelector, 16)); } /** @@ -2990,7 +2993,7 @@ public final Observable> buffer(Func0RxJava Wiki: buffer() */ public final Observable> buffer(int count) { - return create(OperationBuffer.buffer(this, count)); + return lift(new OperatorBufferWithSize(count, count)); } /** @@ -3011,7 +3014,7 @@ public final Observable> buffer(int count) { * @see RxJava Wiki: buffer() */ public final Observable> buffer(int count, int skip) { - return create(OperationBuffer.buffer(this, count, skip)); + return lift(new OperatorBufferWithSize(count, skip)); } /** @@ -3034,7 +3037,7 @@ public final Observable> buffer(int count, int skip) { * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, long timeshift, TimeUnit unit) { - return create(OperationBuffer.buffer(this, timespan, timeshift, unit)); + return lift(new OperatorBufferWithTime(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation())); } /** @@ -3058,7 +3061,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { - return create(OperationBuffer.buffer(this, timespan, timeshift, unit, scheduler)); + return lift(new OperatorBufferWithTime(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler)); } /** @@ -3079,7 +3082,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit) { - return create(OperationBuffer.buffer(this, timespan, unit)); + return lift(new OperatorBufferWithTime(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation())); } /** @@ -3104,7 +3107,7 @@ public final Observable> buffer(long timespan, TimeUnit unit) { * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, int count) { - return create(OperationBuffer.buffer(this, timespan, unit, count)); + return lift(new OperatorBufferWithTime(timespan, timespan, unit, count, Schedulers.computation())); } /** @@ -3132,7 +3135,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { - return create(OperationBuffer.buffer(this, timespan, unit, count, scheduler)); + return lift(new OperatorBufferWithTime(timespan, timespan, unit, count, scheduler)); } /** @@ -3156,7 +3159,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count, * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { - return create(OperationBuffer.buffer(this, timespan, unit, scheduler)); + return lift(new OperatorBufferWithTime(timespan, timespan, unit, Integer.MAX_VALUE, scheduler)); } /** @@ -3176,7 +3179,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler * @see RxJava Wiki: buffer() */ public final Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { - return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); + return lift(new OperatorBufferWithStartEndObservable(bufferOpenings, bufferClosingSelector)); } /** @@ -3198,7 +3201,7 @@ public final Observable> buffer(ObservableRxJava Wiki: buffer() */ public final Observable> buffer(Observable boundary) { - return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary)); + return lift(new OperatorBufferWithSingleObservable(boundary, 16)); } /** @@ -3222,7 +3225,7 @@ public final Observable> buffer(Observable boundary) { * @see #buffer(rx.Observable, int) */ public final Observable> buffer(Observable boundary, int initialCapacity) { - return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity)); + return lift(new OperatorBufferWithSingleObservable(boundary, initialCapacity)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java deleted file mode 100644 index 9ce6774d60..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ /dev/null @@ -1,566 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.schedulers.Schedulers; -import rx.subscriptions.CompositeSubscription; - -public final class OperationBuffer extends ChunkedOperation { - - private static Func0> bufferMaker() { - return new Func0>() { - @Override - public Buffer call() { - return new Buffer(); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer until the - * {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then - * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using - * the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source - * {@link Observable} completes or produces an error, the current buffer is emitted, and the event is - * propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param bufferClosingSelector - * a {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine - * when a buffer is emitted and replaced by simply producing an object. - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { - return new OnSubscribeFunc>() { - - @Override - public Subscription onSubscribe(Observer> observer) { - NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(buffers, bufferClosingSelector); - return new CompositeSubscription( - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(buffers, observer, creator))); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in the currently active chunks. - * Initially there are no chunks active. - *

- * Chunks can be created by pushing a {@link rx.util.TOpening} value to the "bufferOpenings" - * {@link Observable}. This creates a new buffer which will then start recording values which are produced - * by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an - * {@link Observable} which can produce values. When it does so it will close this (and only this) newly - * created buffer. When the source {@link Observable} completes or produces an error, all chunks are - * emitted, and the event is propagated to all subscribed {@link Observer}s. - *

- * Note that when using this operation multiple overlapping chunks could be active at any - * one point. - *

- * - * @param source - * the {@link Observable} which produces values - * @param bufferOpenings - * an {@link Observable} which when it produces a {@link rx.util.TOpening} value will create a - * new buffer which instantly starts recording the "source" {@link Observable} - * @param bufferClosingSelector - * a {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine - * when a buffer is emitted and replaced by simply producing an object. - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - OverlappingChunks> buffers = new OverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector); - return new CompositeSubscription( - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(buffers, observer, creator))); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer until the buffer contains - * a specified number of elements. The buffer is then emitted, and a new buffer is created to replace it. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param count - * the number of elements a buffer should have before being emitted and replaced - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(Observable source, int count) { - return buffer(source, count, count); - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in all active chunks until the buffer - * contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain - * amount of values have been received. When the source {@link Observable} completes or produces an error, - * the currently active chunks are emitted, and the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation can produce non-connected, connected non-overlapping, or overlapping - * chunks depending on the input parameters. - *

- * - * @param source - * the {@link Observable} which produces values - * @param count - * the number of elements a buffer should have before being emitted - * @param skip - * the interval with which chunks have to be created. Note that when {@code skip == count} that - * this is the same as calling {@link OperationBuffer#buffer(Observable, int)}. If - * {@code skip < count}, this buffer operation will produce overlapping chunks and if - * {@code skip > count} non-overlapping chunks will be created and some values will not be pushed - * into a buffer at all! - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final int count, final int skip) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - Chunks> chunks = new SizeBasedChunks>(observer, OperationBuffer. bufferMaker(), count); - ChunkCreator creator = new SkippingChunkCreator>(chunks, skip); - return new CompositeSubscription( - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(chunks, observer, creator))); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit) { - return buffer(source, timespan, unit, Schedulers.computation()); - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @param scheduler - * the {@link Scheduler} to use for timing chunks - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new TimeBasedChunkCreator>(buffers, timespan, unit, scheduler); - return new CompositeSubscription( - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(buffers, observer, creator))); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * Additionally the buffer is automatically emitted once it reaches a specified number of elements. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @param count - * the maximum size of the buffer. Once a buffer reaches this size, it is emitted - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit, int count) { - return buffer(source, timespan, unit, count, Schedulers.computation()); - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * Additionally the buffer is automatically emitted once it reaches a specified number of elements. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @param count - * the maximum size of the buffer. Once a buffer reaches this size, it is emitted - * @param scheduler - * the {@link Scheduler} to use for timing chunks - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - TimeAndSizeBasedChunks> chunks = new TimeAndSizeBasedChunks>(observer, OperationBuffer. bufferMaker(), count, timespan, unit, scheduler); - ChunkCreator creator = new SingleChunkCreator>(chunks); - return new CompositeSubscription( - chunks, - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(chunks, observer, creator))); - } - }; - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation can produce non-connected, or overlapping chunks depending - * on the input parameters. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param timeshift - * the amount of time between creating chunks - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { - return buffer(source, timespan, timeshift, unit, Schedulers.computation()); - } - - /** - * This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer - * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. - * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s. - *

- * Note that this operation can produce non-connected, or overlapping chunks depending - * on the input parameters. - *

- * - * @param source - * the {@link Observable} which produces values - * @param timespan - * the amount of time all chunks must be actively collect values before being emitted - * @param timeshift - * the amount of time between creating chunks - * @param unit - * the {@link TimeUnit} defining the unit of time for the timespan - * @param scheduler - * the {@link Scheduler} to use for timing chunks - * @return - * the {@link Func1} object representing the specified buffer operation - */ - public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - TimeBasedChunks> buffers = new TimeBasedChunks>(observer, OperationBuffer. bufferMaker(), timespan, unit, scheduler); - ChunkCreator creator = new TimeBasedChunkCreator>(buffers, timeshift, unit, scheduler); - return new CompositeSubscription( - buffers, - new ChunkToSubscription(creator), - source.unsafeSubscribe(new ChunkObserver>(buffers, observer, creator))); - } - }; - } - - /** - * This class represents a single buffer: A sequence of recorded values. - * - * @param - * the type of objects which this {@link Buffer} can hold - */ - protected static class Buffer extends Chunk> { - /** - * @return - * The mutable underlying {@link List} which contains all the - * recorded values in this {@link Buffer} object. - */ - @Override - public List getContents() { - return contents; - } - } - - /** - * Converts a chunk creator into a subscription which stops the chunk. - */ - private static class ChunkToSubscription implements Subscription { - private ChunkCreator cc; - private final AtomicBoolean done; - - public ChunkToSubscription(ChunkCreator cc) { - this.cc = cc; - this.done = new AtomicBoolean(); - } - - @Override - public void unsubscribe() { - if (done.compareAndSet(false, true)) { - ChunkCreator cc0 = cc; - cc = null; - cc0.stop(); - } - } - - @Override - public boolean isUnsubscribed() { - return done.get(); - } - } - - /** - * Create a buffer operator with the given observable sequence as the buffer boundary. - * - * @param source - * @param boundary - * @return - */ - public static OnSubscribeFunc> bufferWithBoundaryObservable(Observable source, Observable boundary) { - return new BufferWithObservableBoundary(source, boundary, 16); - } - - /** - * Create a buffer operator with the given observable sequence as the buffer boundary and - * with the given initial capacity for buffers. - * - * @param source - * @param boundary - * @param initialCapacity - * @return - */ - public static OnSubscribeFunc> bufferWithBoundaryObservable(Observable source, Observable boundary, int initialCapacity) { - if (initialCapacity <= 0) { - throw new IllegalArgumentException("initialCapacity > 0 required"); - } - return new BufferWithObservableBoundary(source, boundary, initialCapacity); - } - - /** - * Buffer until an element is emitted from a helper observable. - * - * @param - * the buffered value type - */ - private static final class BufferWithObservableBoundary implements OnSubscribeFunc> { - final Observable source; - final Observable boundary; - final int initialCapacity; - - public BufferWithObservableBoundary(Observable source, Observable boundary, int initialCapacity) { - this.source = source; - this.boundary = boundary; - this.initialCapacity = initialCapacity; - } - - @Override - public Subscription onSubscribe(Observer> t1) { - CompositeSubscription csub = new CompositeSubscription(); - - SourceObserver so = new SourceObserver(t1, initialCapacity, csub); - csub.add(source.unsafeSubscribe(so)); - csub.add(boundary.unsafeSubscribe(new BoundaryObserver(so))); - - return csub; - } - - /** - * Observes the source. - */ - private static final class SourceObserver extends Subscriber { - final Observer> observer; - /** The buffer, if null, that indicates a terminal state. */ - List buffer; - final int initialCapacity; - final Object guard; - final Subscription cancel; - - public SourceObserver(Observer> observer, int initialCapacity, Subscription cancel) { - this.observer = observer; - this.initialCapacity = initialCapacity; - this.guard = new Object(); - this.cancel = cancel; - buffer = new ArrayList(initialCapacity); - } - - @Override - public void onNext(T args) { - synchronized (guard) { - buffer.add(args); - } - } - - @Override - public void onError(Throwable e) { - synchronized (guard) { - if (buffer == null) { - return; - } - buffer = null; - } - observer.onError(e); - cancel.unsubscribe(); - } - - @Override - public void onCompleted() { - emitAndComplete(); - cancel.unsubscribe(); - } - - void emitAndReplace() { - List buf; - synchronized (guard) { - if (buffer == null) { - return; - } - buf = buffer; - buffer = new ArrayList(initialCapacity); - } - observer.onNext(buf); - } - - void emitAndComplete() { - List buf; - synchronized (guard) { - if (buffer == null) { - return; - } - buf = buffer; - buffer = null; - } - observer.onNext(buf); - observer.onCompleted(); - } - } - - /** - * Observes the boundary. - */ - private static final class BoundaryObserver extends Subscriber { - final SourceObserver so; - - public BoundaryObserver(SourceObserver so) { - this.so = so; - } - - @Override - public void onNext(T args) { - so.emitAndReplace(); - } - - @Override - public void onError(Throwable e) { - so.onError(e); - } - - @Override - public void onCompleted() { - so.onCompleted(); - } - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSingleObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSingleObservable.java new file mode 100644 index 0000000000..e6efffb383 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSingleObservable.java @@ -0,0 +1,190 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.functions.Func0; +import rx.observers.SerializedSubscriber; +import rx.observers.Subscribers; + +/** + * This operation takes + * values from the specified {@link Observable} source and stores them in a buffer until the + * {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then + * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using + * the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source + * {@link Observable} completes or produces an error, the current buffer is emitted, and the event is + * propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

+ * + * @param the buffered value type + */ + +public final class OperatorBufferWithSingleObservable implements Operator, T> { + final Func0> bufferClosingSelector; + final int initialCapacity; + /** + * @param bufferClosingSelector + * a {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine + * when a buffer is emitted and replaced by simply producing an object. + * @param initialCapacity the initial capacity of each buffer + */ + public OperatorBufferWithSingleObservable(Func0> bufferClosingSelector, + int initialCapacity) { + this.bufferClosingSelector = bufferClosingSelector; + this.initialCapacity = initialCapacity; + } + /** + * @param bufferClosing + * An {@link Observable} to determine + * when a buffer is emitted and replaced by simply producing an object. + * @param initialCapacity the initial capacity of each buffer + */ + public OperatorBufferWithSingleObservable(final Observable bufferClosing, + int initialCapacity) { + this.bufferClosingSelector = new Func0>() { + @Override + public Observable call() { + return bufferClosing; + } + }; + this.initialCapacity = initialCapacity; + } + + @Override + public Subscriber call(final Subscriber> child) { + Observable closing; + try { + closing = bufferClosingSelector.call(); + } catch (Throwable t) { + child.onError(t); + return Subscribers.empty(); + } + final BufferingSubscriber bsub = new BufferingSubscriber(new SerializedSubscriber>(child)); + + Subscriber closingSubscriber = new Subscriber() { + + @Override + public void onNext(TClosing t) { + bsub.emit(); + } + + @Override + public void onError(Throwable e) { + bsub.onError(e); + } + + @Override + public void onCompleted() { + bsub.onCompleted(); + } + }; + + child.add(closingSubscriber); + child.add(bsub); + + closing.unsafeSubscribe(closingSubscriber); + + return bsub; + } + + final class BufferingSubscriber extends Subscriber { + final Subscriber> child; + /** Guarded by this. */ + List chunk; + /** Guarded by this. */ + boolean done; + public BufferingSubscriber(Subscriber> child) { + this.child = child; + this.chunk = new ArrayList(initialCapacity); + } + @Override + public void onNext(T t) { + synchronized (this) { + if (done) { + return; + } + chunk.add(t); + } + } + + @Override + public void onError(Throwable e) { + synchronized (this) { + if (done) { + return; + } + done = true; + chunk = null; + } + child.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + try { + List toEmit; + synchronized (this) { + if (done) { + return; + } + done = true; + toEmit = chunk; + chunk = null; + } + child.onNext(toEmit); + } catch (Throwable t) { + child.onError(t); + return; + } + child.onCompleted(); + unsubscribe(); + } + + void emit() { + List toEmit; + synchronized (this) { + if (done) { + return; + } + toEmit = chunk; + chunk = new ArrayList(initialCapacity); + } + try { + child.onNext(toEmit); + } catch (Throwable t) { + unsubscribe(); + synchronized (this) { + if (done) { + return; + } + done = true; + } + child.onError(t); + } + } + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSize.java b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSize.java new file mode 100644 index 0000000000..7334d958f7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithSize.java @@ -0,0 +1,141 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; + +/** + * This operation takes + * values from the specified {@link Observable} source and stores them in all active chunks until the buffer + * contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain + * amount of values have been received. When the source {@link Observable} completes or produces an error, + * the currently active chunks are emitted, and the event is propagated to all subscribed {@link Subscriber}s. + *

+ * Note that this operation can produce non-connected, connected non-overlapping, or overlapping + * chunks depending on the input parameters. + *

+ +* @param the buffered value type + */ +public final class OperatorBufferWithSize implements Operator, T> { + final int count; + final int skip; + + /** + * @param count + * the number of elements a buffer should have before being emitted + * @param skip + * the interval with which chunks have to be created. Note that when {@code skip == count} + * the operator will produce non-overlapping chunks. If + * {@code skip < count}, this buffer operation will produce overlapping chunks and if + * {@code skip > count} non-overlapping chunks will be created and some values will not be pushed + * into a buffer at all! + */ + public OperatorBufferWithSize(int count, int skip) { + this.count = count; + this.skip = skip; + } + + @Override + public Subscriber call(final Subscriber> child) { + if (count == skip) { + return new Subscriber(child) { + List buffer; + @Override + public void onNext(T t) { + if (buffer == null) { + buffer = new ArrayList(count); + } + buffer.add(t); + if (buffer.size() == count) { + List oldBuffer = buffer; + buffer = null; + child.onNext(oldBuffer); + } + } + + @Override + public void onError(Throwable e) { + buffer = null; + child.onError(e); + } + + @Override + public void onCompleted() { + List oldBuffer = buffer; + buffer = null; + if (oldBuffer != null) { + try { + child.onNext(oldBuffer); + } catch (Throwable t) { + onError(t); + return; + } + } + child.onCompleted(); + } + }; + } + return new Subscriber(child) { + final List> chunks = new LinkedList>(); + int index; + @Override + public void onNext(T t) { + if (index++ % skip == 0) { + chunks.add(new ArrayList(count)); + } + + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + List chunk = it.next(); + chunk.add(t); + if (chunk.size() == count) { + it.remove(); + child.onNext(chunk); + } + } + } + + @Override + public void onError(Throwable e) { + chunks.clear(); + child.onError(e); + } + @Override + public void onCompleted() { + try { + for (List chunk : chunks) { + try { + child.onNext(chunk); + } catch (Throwable t) { + onError(t); + return; + } + } + child.onCompleted(); + } finally { + chunks.clear(); + } + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithStartEndObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithStartEndObservable.java new file mode 100644 index 0000000000..931238e580 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithStartEndObservable.java @@ -0,0 +1,215 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.functions.Func1; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * This operation takes + * values from the specified {@link Observable} source and stores them in the currently active chunks. + * Initially there are no chunks active. + *

+ * Chunks can be created by pushing a {@link rx.util.TOpening} value to the "bufferOpenings" + * {@link Observable}. This creates a new buffer which will then start recording values which are produced + * by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an + * {@link Observable} which can produce values. When it does so it will close this (and only this) newly + * created buffer. When the source {@link Observable} completes or produces an error, all chunks are + * emitted, and the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that when using this operation multiple overlapping chunks could be active at any + * one point. + *

+ * + * @param the buffered value type + */ + +public final class OperatorBufferWithStartEndObservable implements Operator, T> { + final Observable bufferOpening; + final Func1> bufferClosing; + + /** + * @param bufferOpenings + * an {@link Observable} which when it produces a {@link rx.util.TOpening} value will create a + * new buffer which instantly starts recording the "source" {@link Observable} + * @param bufferClosingSelector + * a {@link Func1} object which produces {@link Observable}s. These {@link Observable}s determine + * when a buffer is emitted and replaced by simply producing an object. + */ + public OperatorBufferWithStartEndObservable(Observable bufferOpenings, Func1> bufferClosingSelector) { + this.bufferOpening = bufferOpenings; + this.bufferClosing = bufferClosingSelector; + } + + @Override + public Subscriber call(final Subscriber> child) { + + final BufferingSubscriber bsub = new BufferingSubscriber(new SerializedSubscriber>(child)); + + Subscriber openSubscriber = new Subscriber() { + + @Override + public void onNext(TOpening t) { + bsub.startBuffer(t); + } + + @Override + public void onError(Throwable e) { + bsub.onError(e); + } + + @Override + public void onCompleted() { + bsub.onCompleted(); + } + + }; + child.add(openSubscriber); + child.add(bsub); + + bufferOpening.unsafeSubscribe(openSubscriber); + + return bsub; + } + final class BufferingSubscriber extends Subscriber { + final Subscriber> child; + /** Guarded by this. */ + final List> chunks; + /** Guarded by this. */ + boolean done; + final CompositeSubscription closingSubscriptions; + public BufferingSubscriber(Subscriber> child) { + this.child = child; + this.chunks = new LinkedList>(); + this.closingSubscriptions = new CompositeSubscription(); + add(this.closingSubscriptions); + } + + @Override + public void onNext(T t) { + synchronized (this) { + for (List chunk : chunks) { + chunk.add(t); + } + } + } + + @Override + public void onError(Throwable e) { + synchronized (this) { + if (done) { + return; + } + done = true; + chunks.clear(); + } + child.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + try { + List> toEmit; + synchronized (this) { + if (done) { + return; + } + done = true; + toEmit = new LinkedList>(chunks); + chunks.clear(); + } + for (List chunk : toEmit) { + child.onNext(chunk); + } + } catch (Throwable t) { + child.onError(t); + return; + } + child.onCompleted(); + unsubscribe(); + } + void startBuffer(TOpening v) { + final List chunk = new ArrayList(); + synchronized (this) { + if (done) { + return; + } + chunks.add(chunk); + } + Observable cobs; + try { + cobs = bufferClosing.call(v); + } catch (Throwable t) { + onError(t); + return; + } + Subscriber closeSubscriber = new Subscriber() { + + @Override + public void onNext(TClosing t) { + closingSubscriptions.remove(this); + endBuffer(chunk); + } + + @Override + public void onError(Throwable e) { + BufferingSubscriber.this.onError(e); + } + + @Override + public void onCompleted() { + closingSubscriptions.remove(this); + endBuffer(chunk); + } + + }; + closingSubscriptions.add(closeSubscriber); + + cobs.unsafeSubscribe(closeSubscriber); + } + void endBuffer(List toEnd) { + boolean canEnd = false; + synchronized (this) { + if (done) { + return; + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + List chunk = it.next(); + if (chunk == toEnd) { + canEnd = true; + it.remove(); + break; + } + } + } + if (canEnd) { + child.onNext(toEnd); + } + } + + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java new file mode 100644 index 0000000000..98a3efd654 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java @@ -0,0 +1,302 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import rx.Observable; +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.functions.Action0; +import rx.observers.SerializedSubscriber; + +/** + * This operation takes + * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer + * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. + * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. + * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and + * the event is propagated to all subscribed {@link Subscriber}s. + *

+ * Note that this operation can produce non-connected, or overlapping chunks depending + * on the input parameters. + *

+ * + * @param the buffered value type + */ +public final class OperatorBufferWithTime implements Operator, T> { + final long timespan; + final long timeshift; + final TimeUnit unit; + final int count; + final Scheduler scheduler; + + /** + * @param timespan + * the amount of time all chunks must be actively collect values before being emitted + * @param timeshift + * the amount of time between creating chunks + * @param unit + * the {@link TimeUnit} defining the unit of time for the timespan + * @param count + * the maximum size of the buffer. Once a buffer reaches this size, it is emitted + * @param scheduler + * the {@link Scheduler} to use for timing chunks + */ + public OperatorBufferWithTime(long timespan, long timeshift, TimeUnit unit, int count, Scheduler scheduler) { + this.timespan = timespan; + this.timeshift = timeshift; + this.unit = unit; + this.count = count; + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber> child) { + final Worker inner = scheduler.createWorker(); + child.add(inner); + + if (timespan == timeshift) { + ExactSubscriber bsub = new ExactSubscriber(new SerializedSubscriber>(child), inner); + bsub.scheduleExact(); + return bsub; + } + + InexactSubscriber bsub = new InexactSubscriber(new SerializedSubscriber>(child), inner); + bsub.startNewChunk(); + bsub.scheduleChunk(); + return bsub; + } + /** Subscriber when the buffer chunking time and lenght differ. */ + final class InexactSubscriber extends Subscriber { + final Subscriber> child; + final Worker inner; + /** Guarded by this. */ + final List> chunks; + /** Guarded by this. */ + boolean done; + public InexactSubscriber(Subscriber> child, Worker inner) { + super(child); + this.child = child; + this.inner = inner; + this.chunks = new LinkedList>(); + } + + @Override + public void onNext(T t) { + List> sizeReached = null; + synchronized (this) { + if (done) { + return; + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + List chunk = it.next(); + chunk.add(t); + if (chunk.size() == count) { + it.remove(); + if (sizeReached == null) { + sizeReached = new LinkedList>(); + } + sizeReached.add(chunk); + } + } + } + if (sizeReached != null) { + for (List chunk : sizeReached) { + child.onNext(chunk); + } + } + } + + @Override + public void onError(Throwable e) { + synchronized (this) { + if (done) { + return; + } + done = true; + chunks.clear(); + } + child.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + try { + List> sizeReached; + synchronized (this) { + if (done) { + return; + } + done = true; + sizeReached = new LinkedList>(chunks); + chunks.clear(); + } + for (List chunk : sizeReached) { + child.onNext(chunk); + } + } catch (Throwable t) { + child.onError(t); + return; + } + child.onCompleted(); + unsubscribe(); + } + void scheduleChunk() { + inner.schedulePeriodically(new Action0() { + @Override + public void call() { + startNewChunk(); + } + }, timeshift, timeshift, unit); + } + void startNewChunk() { + final List chunk = new ArrayList(); + synchronized (this) { + if (done) { + return; + } + chunks.add(chunk); + } + inner.schedule(new Action0() { + @Override + public void call() { + emitChunk(chunk); + } + }, timespan, unit); + } + void emitChunk(List chunkToEmit) { + boolean emit = false; + synchronized (this) { + if (done) { + return; + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + List chunk = it.next(); + if (chunk == chunkToEmit) { + it.remove(); + emit = true; + break; + } + } + } + if (emit) { + try { + child.onNext(chunkToEmit); + } catch (Throwable t) { + onError(t); + } + } + } + } + /** Subscriber when exact timed chunking is required. */ + final class ExactSubscriber extends Subscriber { + final Subscriber> child; + final Worker inner; + /** Guarded by this. */ + List chunk; + /** Guarded by this. */ + boolean done; + public ExactSubscriber(Subscriber> child, Worker inner) { + super(child); + this.child = child; + this.inner = inner; + this.chunk = new ArrayList(); + } + + @Override + public void onNext(T t) { + List toEmit = null; + synchronized (this) { + if (done) { + return; + } + chunk.add(t); + if (chunk.size() == count) { + toEmit = chunk; + chunk = new ArrayList(); + } + } + if (toEmit != null) { + child.onNext(toEmit); + } + } + + @Override + public void onError(Throwable e) { + synchronized (this) { + if (done) { + return; + } + done = true; + chunk = null; + } + child.onError(e); + } + + @Override + public void onCompleted() { + try { + inner.unsubscribe(); + List toEmit; + synchronized (this) { + if (done) { + return; + } + done = true; + toEmit = chunk; + chunk = null; + } + child.onNext(toEmit); + } catch (Throwable t) { + child.onError(t); + return; + } + child.onCompleted(); + } + void scheduleExact() { + inner.schedulePeriodically(new Action0() { + @Override + public void call() { + emit(); + } + }, timespan, timespan, unit); + } + void emit() { + List toEmit; + synchronized (this) { + if (done) { + return; + } + toEmit = chunk; + chunk = new ArrayList(); + } + try { + child.onNext(toEmit); + } catch (Throwable t) { + onError(t); + } + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index aa1c525e99..39cb174b0a 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -17,11 +17,7 @@ import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static rx.operators.OperationBuffer.buffer; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; @@ -70,7 +66,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 3, 3)); + Observable> buffered = source.buffer(3, 3); buffered.subscribe(observer); Mockito.verify(observer, Mockito.never()).onNext(Mockito.anyListOf(String.class)); @@ -92,7 +88,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 3, 1)); + Observable> buffered = source.buffer(3, 1); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -119,7 +115,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 3, 3)); + Observable> buffered = source.buffer(3, 3); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -145,7 +141,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 2, 3)); + Observable> buffered = source.buffer(2, 3); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -171,7 +167,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, 2, scheduler)); + Observable> buffered = source.buffer(100, TimeUnit.MILLISECONDS, 2, scheduler); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -193,9 +189,14 @@ public void testTimed() { Observable source = Observable.create(new Observable.OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { - push(observer, "one", 98); - push(observer, "two", 99); - push(observer, "three", 100); + push(observer, "one", 97); + push(observer, "two", 98); + /** + * Changed from 100. Because scheduling the cut to 100ms happens before this + * Observable even runs due how lift works, pushing at 100ms would execute after the + * buffer cut. + */ + push(observer, "three", 99); push(observer, "four", 101); push(observer, "five", 102); complete(observer, 150); @@ -203,7 +204,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, scheduler)); + Observable> buffered = source.buffer(100, TimeUnit.MILLISECONDS, scheduler); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -256,7 +257,7 @@ public Subscription onSubscribe(Observer observer) { } }; - Observable> buffered = Observable.create(buffer(source, openings, closer)); + Observable> buffered = source.buffer(openings, closer); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -290,14 +291,16 @@ public Observable call() { @Override public Subscription onSubscribe(Observer observer) { push(observer, new Object(), 100); - complete(observer, 101); + push(observer, new Object(), 200); + push(observer, new Object(), 300); + complete(observer, 301); return Subscriptions.empty(); } }); } }; - Observable> buffered = Observable.create(buffer(source, closer)); + Observable> buffered = source.buffer(closer); buffered.subscribe(observer); InOrder inOrder = Mockito.inOrder(observer); @@ -522,4 +525,263 @@ public void bufferWithBOBoundaryThrows() { verify(o, never()).onCompleted(); verify(o, never()).onNext(any()); } + @Test(timeout = 2000) + public void bufferWithSizeTake1() { + Observable source = Observable.from(1).repeat(); + + Observable> result = source.buffer(2).take(1); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o).onNext(Arrays.asList(1, 1)); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test(timeout = 2000) + public void bufferWithSizeSkipTake1() { + Observable source = Observable.from(1).repeat(); + + Observable> result = source.buffer(2, 3).take(1); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o).onNext(Arrays.asList(1, 1)); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test(timeout = 2000) + public void bufferWithTimeTake1() { + Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + + Observable> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + scheduler.advanceTimeBy(5, TimeUnit.SECONDS); + + verify(o).onNext(Arrays.asList(0L, 1L)); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test(timeout = 2000) + public void bufferWithTimeSkipTake2() { + Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + + Observable> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + scheduler.advanceTimeBy(5, TimeUnit.SECONDS); + + inOrder.verify(o).onNext(Arrays.asList(0L, 1L)); + inOrder.verify(o).onNext(Arrays.asList(1L, 2L)); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test(timeout = 2000) + public void bufferWithBoundaryTake2() { + Observable boundary = Observable.timer(60, 60, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + + Observable> result = source.buffer(boundary).take(2); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + scheduler.advanceTimeBy(5, TimeUnit.SECONDS); + + inOrder.verify(o).onNext(Arrays.asList(0L)); + inOrder.verify(o).onNext(Arrays.asList(1L)); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test(timeout = 2000) + public void bufferWithStartEndBoundaryTake2() { + Observable start = Observable.timer(61, 61, TimeUnit.MILLISECONDS, scheduler); + Func1> end = new Func1>() { + @Override + public Observable call(Long t1) { + return Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler); + } + }; + + Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + + Observable> result = source.buffer(start, end).take(2); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + scheduler.advanceTimeBy(5, TimeUnit.SECONDS); + + inOrder.verify(o).onNext(Arrays.asList(1L, 2L, 3L)); + inOrder.verify(o).onNext(Arrays.asList(3L, 4L)); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void bufferWithSizeThrows() { + PublishSubject source = PublishSubject.create(); + + Observable> result = source.buffer(2); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onError(new OperationReduceTest.CustomException()); + + inOrder.verify(o).onNext(Arrays.asList(1, 2)); + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(Arrays.asList(3)); + verify(o, never()).onCompleted(); + + } + + @Test + public void bufferWithTimeThrows() { + PublishSubject source = PublishSubject.create(); + + Observable> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + source.onNext(3); + source.onError(new OperationReduceTest.CustomException()); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(o).onNext(Arrays.asList(1, 2)); + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(Arrays.asList(3)); + verify(o, never()).onCompleted(); + + } + @Test + public void bufferWithTimeAndSize() { + Observable source = Observable.timer(30, 30, TimeUnit.MILLISECONDS, scheduler); + + Observable> result = source.buffer(100, TimeUnit.MILLISECONDS, 2, scheduler).take(3); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + scheduler.advanceTimeBy(5, TimeUnit.SECONDS); + + inOrder.verify(o).onNext(Arrays.asList(0L, 1L)); + inOrder.verify(o).onNext(Arrays.asList(2L)); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void bufferWithStartEndStartThrows() { + PublishSubject start = PublishSubject.create(); + + Func1> end = new Func1>() { + @Override + public Observable call(Integer t1) { + return Observable.never(); + } + }; + + PublishSubject source = PublishSubject.create(); + + Observable> result = source.buffer(start, end); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + start.onNext(1); + source.onNext(1); + source.onNext(2); + start.onError(new OperationReduceTest.CustomException()); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } + @Test + public void bufferWithStartEndEndFunctionThrows() { + PublishSubject start = PublishSubject.create(); + + Func1> end = new Func1>() { + @Override + public Observable call(Integer t1) { + throw new OperationReduceTest.CustomException(); + } + }; + + PublishSubject source = PublishSubject.create(); + + Observable> result = source.buffer(start, end); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + start.onNext(1); + source.onNext(1); + source.onNext(2); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } + @Test + public void bufferWithStartEndEndThrows() { + PublishSubject start = PublishSubject.create(); + + Func1> end = new Func1>() { + @Override + public Observable call(Integer t1) { + return Observable.error(new OperationReduceTest.CustomException()); + } + }; + + PublishSubject source = PublishSubject.create(); + + Observable> result = source.buffer(start, end); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + start.onNext(1); + source.onNext(1); + source.onNext(2); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } }