diff --git a/rxjava-core/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java b/rxjava-core/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java new file mode 100644 index 0000000000..a0333e5e3c --- /dev/null +++ b/rxjava-core/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java @@ -0,0 +1,85 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import org.junit.Assert; +import org.junit.Test; +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class BufferUntilSubscriberTest { + + @Test + public void testIssue1677() throws InterruptedException { + final AtomicLong counter = new AtomicLong(); + final Integer[] numbers = new Integer[5000]; + for (int i = 0; i < numbers.length; i++) + numbers[i] = i + 1; + final int NITERS = 250; + final CountDownLatch latch = new CountDownLatch(NITERS); + for (int iters = 0; iters < NITERS; iters++) { + final CountDownLatch innerLatch = new CountDownLatch(1); + final PublishSubject s = PublishSubject.create(); + final AtomicBoolean completed = new AtomicBoolean(); + Observable.from(numbers) + .takeUntil(s) + .window(50) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(Observable integerObservable) { + return integerObservable + .subscribeOn(Schedulers.computation()) + .map(new Func1() { + @Override + public Integer call(Integer integer) { + if (integer >= 5 && completed.compareAndSet(false, true)) { + s.onCompleted(); + } + // do some work + Math.pow(Math.random(), Math.random()); + return integer * 2; + } + }); + } + }) + .toList() + .doOnNext(new Action1>() { + @Override + public void call(List integers) { + counter.incrementAndGet(); + latch.countDown(); + innerLatch.countDown(); + } + }) + .subscribe(); + if (!innerLatch.await(30, TimeUnit.SECONDS)) + Assert.fail("Failed inner latch wait, iteration " + iters); + } + if (!latch.await(30, TimeUnit.SECONDS)) + Assert.fail("Incomplete! Went through " + latch.getCount() + " iterations"); + else + Assert.assertEquals(NITERS, counter.get()); + } +} diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index a48e22fbad..2874cf9daf 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -174,15 +174,17 @@ public void call(Subscriber o) { * @return the source Observable, transformed by the transformer function * @see RxJava wiki: Implementing Your Own Operators */ - public Observable compose(Transformer transformer) { - return transformer.call(this); + @SuppressWarnings("unchecked") + public Observable compose(Transformer transformer) { + // Casting to Observable is type-safe because we know Observable is covariant. + return (Observable) transformer.call(this); } /** * Transformer function used by {@link #compose}. * @warn more complete description needed */ - public static interface Transformer extends Func1, Observable> { + public static interface Transformer extends Func1, Observable> { // cover for generics insanity } diff --git a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java index f7751a64d2..313e128cdd 100644 --- a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java +++ b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java @@ -16,12 +16,12 @@ package rx.internal.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observer; import rx.Subscriber; import rx.functions.Action0; +import rx.observers.EmptyObserver; import rx.observers.Subscribers; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -51,6 +51,9 @@ */ public class BufferUntilSubscriber extends Subject { + @SuppressWarnings("rawtypes") + private final static Observer EMPTY_OBSERVER = new EmptyObserver(); + /** * @warn create() undescribed * @return @@ -62,25 +65,22 @@ public static BufferUntilSubscriber create() { /** The common state. */ static final class State { - /** The first observer or the one which buffers until the first arrives. */ - volatile Observer observerRef = new BufferedObserver(); - /** Allow a single subscriber only. */ - volatile int first; + volatile Observer observerRef = null; /** Field updater for observerRef. */ @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater OBSERVER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef"); - /** Field updater for first. */ - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater FIRST_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(State.class, "first"); - - boolean casFirst(int expected, int next) { - return FIRST_UPDATER.compareAndSet(this, expected, next); - } - void setObserverRef(Observer o) { - observerRef = o; + + boolean casObserverRef(Observer expected, Observer next) { + return OBSERVER_UPDATER.compareAndSet(this, expected, next); } + + Object guard = new Object(); + /* protected by guard */ + boolean emitting = false; + + final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); + final NotificationLite nl = NotificationLite.instance(); } static final class OnSubscribeAction implements OnSubscribe { @@ -92,23 +92,38 @@ public OnSubscribeAction(State state) { @Override public void call(final Subscriber s) { - if (state.casFirst(0, 1)) { - final NotificationLite nl = NotificationLite.instance(); - // drain queued notifications before subscription - // we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer - BufferedObserver buffered = (BufferedObserver)state.observerRef; - Object o; - while ((o = buffered.buffer.poll()) != null) { - nl.accept(s, o); - } - // register real observer for pass-thru ... and drain any further events received on first notification - state.setObserverRef(new PassThruObserver(s, buffered.buffer, state)); + if (state.casObserverRef(null, s)) { s.add(Subscriptions.create(new Action0() { @Override public void call() { - state.setObserverRef(Subscribers.empty()); + state.observerRef = EMPTY_OBSERVER; } })); + boolean win = false; + synchronized (state.guard) { + if (!state.emitting) { + state.emitting = true; + win = true; + } + } + if (win) { + final NotificationLite nl = NotificationLite.instance(); + while(true) { + Object o; + while ((o = state.buffer.poll()) != null) { + nl.accept(state.observerRef, o); + } + synchronized (state.guard) { + if (state.buffer.isEmpty()) { + // Although the buffer is empty, there is still a chance + // that further events may be put into the `buffer`. + // `emit(Object v)` should handle it. + state.emitting = false; + break; + } + } + } + } } else { s.onError(new IllegalStateException("Only one subscriber allowed!")); } @@ -116,98 +131,61 @@ public void call() { } final State state; - + + private boolean forward = false; + private BufferUntilSubscriber(State state) { super(new OnSubscribeAction(state)); this.state = state; } - @Override - public void onCompleted() { - state.observerRef.onCompleted(); - } - - @Override - public void onError(Throwable e) { - state.observerRef.onError(e); + private void emit(Object v) { + synchronized (state.guard) { + state.buffer.add(v); + if (state.observerRef != null && !state.emitting) { + // Have an observer and nobody is emitting, + // should drain the `buffer` + forward = true; + state.emitting = true; + } + } + if (forward) { + Object o; + while ((o = state.buffer.poll()) != null) { + state.nl.accept(state.observerRef, o); + } + // Because `emit(Object v)` will be called in sequence, + // no event will be put into `buffer` after we drain it. + } } @Override - public void onNext(T t) { - state.observerRef.onNext(t); - } - - /** - * This is a temporary observer between buffering and the actual that gets into the line of notifications - * from the producer and will drain the queue of any items received during the race of the initial drain and - * switching this. - * - * It will then immediately swap itself out for the actual (after a single notification), but since this is - * now being done on the same producer thread no further buffering will occur. - */ - private static final class PassThruObserver extends Subscriber { - - private final Observer actual; - // this assumes single threaded synchronous notifications (the Rx contract for a single Observer) - private final ConcurrentLinkedQueue buffer; - private final State state; - - PassThruObserver(Observer actual, ConcurrentLinkedQueue buffer, - State state) { - this.actual = actual; - this.buffer = buffer; - this.state = state; + public void onCompleted() { + if (forward) { + state.observerRef.onCompleted(); } - - @Override - public void onCompleted() { - drainIfNeededAndSwitchToActual(); - actual.onCompleted(); + else { + emit(state.nl.completed()); } + } - @Override - public void onError(Throwable e) { - drainIfNeededAndSwitchToActual(); - actual.onError(e); + @Override + public void onError(Throwable e) { + if (forward) { + state.observerRef.onError(e); } - - @Override - public void onNext(T t) { - drainIfNeededAndSwitchToActual(); - actual.onNext(t); + else { + emit(state.nl.error(e)); } - - private void drainIfNeededAndSwitchToActual() { - final NotificationLite nl = NotificationLite.instance(); - Object o; - while ((o = buffer.poll()) != null) { - nl.accept(this, o); - } - // now we can safely change over to the actual and get rid of the pass-thru - // but only if not unsubscribed - state.setObserverRef(actual); - } - } - private static final class BufferedObserver extends Subscriber { - private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); - private static final NotificationLite nl = NotificationLite.instance(); - - @Override - public void onCompleted() { - buffer.add(nl.completed()); - } - - @Override - public void onError(Throwable e) { - buffer.add(nl.error(e)); + @Override + public void onNext(T t) { + if (forward) { + state.observerRef.onNext(t); } - - @Override - public void onNext(T t) { - buffer.add(nl.next(t)); + else { + emit(state.nl.next(t)); } - } } diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 0c19c25eed..832b9ce641 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -409,7 +409,7 @@ public void onError(Throwable e) { boolean sendOnComplete = false; synchronized (this) { wip--; - if (wip == 0 && completed) { + if ((wip == 0 && completed) || (wip < 0)) { sendOnComplete = true; } } diff --git a/src/main/java/rx/internal/operators/OperatorTakeUntil.java b/src/main/java/rx/internal/operators/OperatorTakeUntil.java index 310d10c637..11fd5572a9 100644 --- a/src/main/java/rx/internal/operators/OperatorTakeUntil.java +++ b/src/main/java/rx/internal/operators/OperatorTakeUntil.java @@ -36,24 +36,7 @@ public OperatorTakeUntil(final Observable other) { @Override public Subscriber call(final Subscriber child) { - final Subscriber parent = new SerializedSubscriber(child) { - - @Override - public void onCompleted() { - child.onCompleted(); - } - - @Override - public void onError(Throwable e) { - child.onError(e); - } - - @Override - public void onNext(T t) { - child.onNext(t); - } - - }; + final Subscriber parent = new SerializedSubscriber(child); other.unsafeSubscribe(new Subscriber(child) { diff --git a/src/test/java/rx/CovarianceTest.java b/src/test/java/rx/CovarianceTest.java index 769733d704..26f060b5d3 100644 --- a/src/test/java/rx/CovarianceTest.java +++ b/src/test/java/rx/CovarianceTest.java @@ -20,12 +20,13 @@ import org.junit.Test; import rx.Observable.Transformer; +import rx.functions.Func1; import rx.functions.Func2; /** * Test super/extends of generics. * - * See https://github.com/ReactiveX/RxJava/pull/331 + * See https://github.com/Netflix/RxJava/pull/331 */ public class CovarianceTest { @@ -61,11 +62,11 @@ public Integer call(Media t1, Media t2) { @Test public void testCovarianceOfCompose() { - Observable movie = Observable. just(new HorrorMovie()); + Observable movie = Observable.just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new Movie()); } @@ -77,12 +78,45 @@ public void testCovarianceOfCompose2() { Observable movie = Observable. just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new HorrorMovie()); } }); } - + + @Test + public void testCovarianceOfCompose3() { + Observable movie = Observable.just(new HorrorMovie()); + Observable movie2 = movie.compose(new Transformer() { + @Override + public Observable call(Observable t1) { + return Observable.just(new HorrorMovie()).map(new Func1() { + + @Override + public HorrorMovie call(HorrorMovie horrorMovie) { + return horrorMovie; + } + }); + } + }); + } + + @Test + public void testCovarianceOfCompose4() { + Observable movie = Observable.just(new HorrorMovie()); + Observable movie2 = movie.compose(new Transformer() { + @Override + public Observable call(Observable t1) { + return t1.map(new Func1() { + + @Override + public HorrorMovie call(HorrorMovie horrorMovie) { + return horrorMovie; + } + }); + } + }); + } /* * Most tests are moved into their applicable classes such as [Operator]Tests.java diff --git a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index 2c60a89632..f5163bdb8e 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -15,26 +15,11 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.List; - import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; @@ -42,6 +27,15 @@ import rx.exceptions.CompositeException; import rx.exceptions.TestException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; + public class OperatorMergeDelayErrorTest { @Mock @@ -289,6 +283,35 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test(timeout=1000L) + public void testSynchronousError() { + final Observable> o1 = Observable.error(new RuntimeException("unit test")); + + final CountDownLatch latch = new CountDownLatch(1); + Observable.mergeDelayError(o1).subscribe(new Subscriber() { + @Override + public void onCompleted() { + fail("Expected onError path"); + } + + @Override + public void onError(Throwable e) { + latch.countDown(); + } + + @Override + public void onNext(String s) { + fail("Expected onError path"); + } + }); + + try { + latch.await(); + } catch (InterruptedException ex) { + fail("interrupted"); + } + } + private static class TestSynchronousObservable implements Observable.OnSubscribe { @Override