From effc08d548518df5a54c916e1b50daadb8bf4228 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 19 Mar 2013 16:23:50 -0700 Subject: [PATCH 01/13] Synchronize Observer on OperationMerge fixes https://github.com/Netflix/RxJava/issues/200 This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable. --- .../java/rx/operators/OperationMerge.java | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1eeb21c1e0..1e6e6e7568 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -23,7 +23,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -33,6 +35,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public final class OperationMerge { @@ -115,10 +119,20 @@ private MergeObservable(Observable> sequences) { } public MergeSubscription call(Observer actualObserver) { + + /** + * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. + *

+ * The calls from each sequence must be serialized. + *

+ * Bug report: https://github.com/Netflix/RxJava/issues/200 + */ + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, new AtomicObservableSubscription(ourSubscription)); + /** * Subscribe to the parent Observable to get to the children Observables */ - sequences.subscribe(new ParentObserver(actualObserver)); + sequences.subscribe(new ParentObserver(synchronizedObserver)); /* return our subscription to allow unsubscribing */ return ourSubscription; @@ -380,6 +394,68 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test + public void testSynchronizationOfMultipleSequences() throws Exception { + final TestASynchronousObservable o1 = new TestASynchronousObservable(); + final TestASynchronousObservable o2 = new TestASynchronousObservable(); + + // use this latch to cause onNext to wait until we're ready to let it go + final CountDownLatch endLatch = new CountDownLatch(1); + + final AtomicInteger concurrentCounter = new AtomicInteger(); + final AtomicInteger totalCounter = new AtomicInteger(); + + @SuppressWarnings("unchecked") + Observable m = Observable.create(merge(o1, o2)); + m.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Exception e) { + throw new RuntimeException("failed", e); + } + + @Override + public void onNext(String v) { + totalCounter.incrementAndGet(); + concurrentCounter.incrementAndGet(); + try { + // wait here until we're done asserting + endLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException("failed", e); + } finally { + concurrentCounter.decrementAndGet(); + } + } + + }); + + // wait for both observables to send (one should be blocked) + o1.onNextBeingSent.await(); + o2.onNextBeingSent.await(); + + assertEquals(1, concurrentCounter.get()); + + // release so it can finish + endLatch.countDown(); + + try { + o1.t.join(); + o2.t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertEquals(2, totalCounter.get()); + assertEquals(0, concurrentCounter.get()); + } + /** * unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge */ @@ -452,6 +528,7 @@ public void unsubscribe() { private static class TestASynchronousObservable extends Observable { Thread t; + final CountDownLatch onNextBeingSent = new CountDownLatch(1); @Override public Subscription subscribe(final Observer observer) { @@ -459,6 +536,7 @@ public Subscription subscribe(final Observer observer) { @Override public void run() { + onNextBeingSent.countDown(); observer.onNext("hello"); observer.onCompleted(); } From fb555df3376301595f6596861662c654d77209d2 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 20 Mar 2013 15:12:44 -0700 Subject: [PATCH 02/13] Synchronization of Merge operator (fixes) - return AtomicSubscription not MergeSubscription which I was accidentally still returning - try/finally in unit test so threads are released even if assertion is thrown --- .../main/java/rx/operators/OperationMerge.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1e6e6e7568..eeb1e96407 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -118,7 +118,7 @@ private MergeObservable(Observable> sequences) { this.sequences = sequences; } - public MergeSubscription call(Observer actualObserver) { + public Subscription call(Observer actualObserver) { /** * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. @@ -127,7 +127,8 @@ public MergeSubscription call(Observer actualObserver) { *

* Bug report: https://github.com/Netflix/RxJava/issues/200 */ - SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, new AtomicObservableSubscription(ourSubscription)); + AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription); + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); /** * Subscribe to the parent Observable to get to the children Observables @@ -135,7 +136,7 @@ public MergeSubscription call(Observer actualObserver) { sequences.subscribe(new ParentObserver(synchronizedObserver)); /* return our subscription to allow unsubscribing */ - return ourSubscription; + return subscription; } /** @@ -439,11 +440,13 @@ public void onNext(String v) { // wait for both observables to send (one should be blocked) o1.onNextBeingSent.await(); o2.onNextBeingSent.await(); - - assertEquals(1, concurrentCounter.get()); - // release so it can finish - endLatch.countDown(); + try { // in try/finally so threads are released via latch countDown even if assertion fails + assertEquals(1, concurrentCounter.get()); + } finally { + // release so it can finish + endLatch.countDown(); + } try { o1.t.join(); From fb22a73a7df3ccada20d3f2e7b0da2db806bcb2e Mon Sep 17 00:00:00 2001 From: John Myers Date: Wed, 27 Mar 2013 21:48:41 -0700 Subject: [PATCH 03/13] Add tests to demonstrate bugs --- .../src/main/java/rx/operators/OperationTake.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e86263e562..fc4372d5ae 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -260,6 +260,18 @@ public void testTake2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeDoesntLeakErrors() { + Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); + Observable.create(take(source, 1)).last(); + } + + @Test + public void testTakeZeroDoesntLeakError() { + Observable source = Observable.error(new Exception("test failed")); + Observable.create(take(source, 0)).lastOrDefault("ok"); + } + @Test public void testUnsubscribeAfterTake() { Subscription s = mock(Subscription.class); From 4919472dd12f0230b9116544182d0e706e44db06 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 21:24:26 -0700 Subject: [PATCH 04/13] Split Take and TakeWhile --- rxjava-core/src/main/java/rx/Observable.java | 13 +- .../main/java/rx/operators/OperationTake.java | 160 ++------- .../java/rx/operators/OperationTakeWhile.java | 313 ++++++++++++++++++ 3 files changed, 349 insertions(+), 137 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c..e332098ba2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,8 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationTake; +import rx.operators.OperationTakeWhile; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -54,7 +56,6 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1779,7 +1780,7 @@ public static Observable takeLast(final Observable items, final int co * @return */ public static Observable takeWhile(final Observable items, Func1 predicate) { - return create(OperationTake.takeWhile(items, predicate)); + return create(OperationTakeWhile.takeWhile(items, predicate)); } /** @@ -1811,16 +1812,18 @@ public Boolean call(T t) { * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTake.takeWhileWithIndex(items, predicate)); + return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); - return create(OperationTake.takeWhileWithIndex(items, new Func2() { + return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2() + { @Override - public Boolean call(T t, Integer integer) { + public Boolean call(T t, Integer integer) + { return (Boolean) _f.call(t, integer); } })); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index fc4372d5ae..1b853d5bba 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,21 +15,23 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.subjects.Subject; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -43,61 +45,17 @@ public final class OperationTake { * @return */ public static Func1, Subscription> take(final Observable items, final int num) { - return takeWhileWithIndex(items, OperationTake. numPredicate(num)); - } - - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param items - * @param predicate - * a function to test each source element for a condition - * @return - */ - public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTake. skipIndex(predicate)); - } - - /** - * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. - * - * @param items - * @param predicate - * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return - */ - public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new TakeWhile(items, predicate).call(observer); - } - - }; - } - - private static Func2 numPredicate(final int num) { - return new Func2() { - - @Override - public Boolean call(T input, Integer index) { - return index < num; + return new Take(items, num).call(observer); } }; } - private static Func2 skipIndex(final Func1 underlying) { - return new Func2() { - @Override - public Boolean call(T input, Integer index) { - return underlying.call(input); - } - }; - } - /** * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. *

@@ -109,19 +67,24 @@ public Boolean call(T input, Integer index) { * * @param */ - private static class TakeWhile implements Func1, Subscription> { + private static class Take implements Func1, Subscription> { private final AtomicInteger counter = new AtomicInteger(); private final Observable items; - private final Func2 predicate; + private final int num; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - private TakeWhile(Observable items, Func2 predicate) { + private Take(Observable items, int num) { this.items = items; - this.predicate = predicate; + this.num = num; } @Override public Subscription call(Observer observer) { + if (num < 1) { + observer.onCompleted(); + return Subscriptions.empty(); + } + return subscription.wrap(items.subscribe(new ItemObserver(observer))); } @@ -144,10 +107,14 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + final int count = counter.incrementAndGet(); + if (count <= num) { observer.onNext(args); - } else { - observer.onCompleted(); + if (count == num) { + observer.onCompleted(); + } + } + if (count >= num) { // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } @@ -159,77 +126,6 @@ public void onNext(T args) { public static class UnitTest { - @Test - public void testTakeWhile1() { - Observable w = Observable.toObservable(1, 2, 3); - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - - s.onNext(1); - s.onNext(2); - s.onNext(3); - s.onNext(4); - s.onNext(5); - s.onCompleted(); - - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onNext(4); - verify(aObserver, never()).onNext(5); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhile2() { - Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { - @Override - public Boolean call(String input, Integer index) { - return index < 2; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java new file mode 100644 index 0000000000..f45efabc92 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -0,0 +1,313 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.subjects.Subject; +/** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + */ +public final class OperationTakeWhile { + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate + * a function to test each source element for a condition + * @return + */ + public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { + return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate + * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeWhile(items, predicate).call(observer); + } + + }; + } + + private static Func2 skipIndex(final Func1 underlying) { + return new Func2() { + @Override + public Boolean call(T input, Integer index) { + return underlying.call(input); + } + }; + } + + /** + * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. + *

+ * It IS thread-safe from within it while receiving onNext events from multiple threads. + *

+ * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. + *

+ * Note how the takeWhileWithIndex() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. + * + * @param + */ + private static class TakeWhile implements Func1, Subscription> { + private final AtomicInteger counter = new AtomicInteger(); + private final Observable items; + private final Func2 predicate; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + private TakeWhile(Observable items, Func2 predicate) { + this.items = items; + this.predicate = predicate; + } + + @Override + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + if (predicate.call(args, counter.getAndIncrement())) { + observer.onNext(args); + } else { + observer.onCompleted(); + // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable + subscription.unsubscribe(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeWhile1() { + Observable w = Observable.toObservable(1, 2, 3); + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhile2() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String input, Integer index) + { + return index < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileDoesntLeakErrors() { + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + + Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + return false; + } + })).last(); + } + + @Test + public void testUnsubscribeAfterTake() { + Subscription s = mock(Subscription.class); + TestObservable w = new TestObservable(s, "one", "two", "three"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String s, Integer index) + { + return index < 1; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + w.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + System.out.println("TestObservable thread finished"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(s, times(1)).unsubscribe(); + } + + private static class TestObservable extends Observable { + + final Subscription s; + final String[] values; + Thread t = null; + + public TestObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + } + + @Override + public Subscription subscribe(final Observer observer) { + System.out.println("TestObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestObservable thread"); + for (String s : values) { + System.out.println("TestObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestObservable thread"); + t.start(); + System.out.println("done starting TestObservable thread"); + return s; + } + + } + } + +} From b2697cb4553e77df6091c7dfcc048585e3a5ca02 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:08:24 -0700 Subject: [PATCH 05/13] Implement TrustedObservableTester.assertTrustedObservable() --- .../rx/testing/TrustedObservableTester.java | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java new file mode 100644 index 0000000000..f48be8d3f5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java @@ -0,0 +1,253 @@ +package rx.testing; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class TrustedObservableTester +{ + private TrustedObservableTester() {} + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) + { + return new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } + + public static class TestingObserver implements Observer { + + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); + + public TestingObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } + + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } + + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } + + } + + public static class UnitTest { + @Test(expected = AssertionError.class) + public void testDoubleCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + + } + + @Test(expected = AssertionError.class) + public void testCompletedError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testCompletedNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testErrorCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testDoubleError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + + @Test(expected = AssertionError.class) + public void testErrorNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testNextCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testConcurrentNextNext() { + final List threads = new ArrayList(); + final AtomicReference threadFailure = new AtomicReference(); + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(final Observer observer) + { + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("one"); + } + })); + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("two"); + } + })); + return Subscriptions.empty(); + } + })).subscribe(new SlowObserver()); + for (Thread thread : threads) { + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread thread, Throwable throwable) + { + threadFailure.set(throwable); + } + }); + thread.start(); + } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + // Junit seems pretty bad about exposing test failures inside of created threads. + assertNotNull("exception thrown by thread", threadFailure.get()); + assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass()); + } + + private static class SlowObserver implements Observer + { + @Override + public void onCompleted() + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onError(Exception e) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onNext(String args) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } + } +} From 550005cf260188a06660a3652c65fda594ee8f85 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:11:36 -0700 Subject: [PATCH 06/13] Fix violations of the Observer contract. --- .../main/java/rx/operators/OperationTake.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 1b853d5bba..ebefc09c65 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.testing.TrustedObservableTester.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. @@ -97,12 +98,16 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - observer.onCompleted(); + if (counter.getAndSet(num) < num) { + observer.onCompleted(); + } } @Override public void onError(Exception e) { - observer.onError(e); + if (counter.getAndSet(num) < num) { + observer.onError(e); + } } @Override @@ -129,7 +134,7 @@ public static class UnitTest { @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 2)); + Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -144,7 +149,7 @@ public void testTake1() { @Test public void testTake2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -158,14 +163,23 @@ public void testTake2() { @Test public void testTakeDoesntLeakErrors() { - Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); - Observable.create(take(source, 1)).last(); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + Observable.create(assertTrustedObservable(take(source, 1))).last(); } @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); - Observable.create(take(source, 0)).lastOrDefault("ok"); + Observable source = Observable.error(new Exception("test failed")); + Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); } @Test @@ -175,7 +189,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); take.subscribe(aObserver); // wait for the Observable to complete From 6c1a1abe0002018d282fb366b94b00ebb366de11 Mon Sep 17 00:00:00 2001 From: John Myers Date: Fri, 29 Mar 2013 20:49:31 -0700 Subject: [PATCH 07/13] take(0) subscribes to its source --- .../main/java/rx/operators/OperationTake.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index ebefc09c65..c4335b71d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -23,8 +23,10 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -82,6 +84,23 @@ private Take(Observable items, int num) { @Override public Subscription call(Observer observer) { if (num < 1) { + items.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + }).unsubscribe(); observer.onCompleted(); return Subscriptions.empty(); } @@ -178,8 +197,28 @@ public Subscription call(Observer observer) @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); + final AtomicBoolean subscribed = new AtomicBoolean(false); + final AtomicBoolean unSubscribed = new AtomicBoolean(false); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + subscribed.set(true); + observer.onError(new Exception("test failed")); + return new Subscription() + { + @Override + public void unsubscribe() + { + unSubscribed.set(true); + } + }; + } + }); Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + assertTrue("source subscribed", subscribed.get()); + assertTrue("source unsubscribed", unSubscribed.get()); } @Test From 63cef64fa9df12457791fb7e45f0a0b56237d0ee Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 30 Mar 2013 19:54:40 -0700 Subject: [PATCH 08/13] Small reorganization of code for OperationTake and TrustedObservableTester - removed rx.testing package (if that's going to exist that means it's bleeding into something that should live in /src/test and beyond what works well for inner class testing) - made TrustedObservableTester part of rx.operation package and an inner UnitTest class so it doesn't become part of the public API --- .../AbstractOperation.java} | 109 +++++++++--------- .../main/java/rx/operators/OperationTake.java | 21 ++-- 2 files changed, 65 insertions(+), 65 deletions(-) rename rxjava-core/src/main/java/rx/{testing/TrustedObservableTester.java => operators/AbstractOperation.java} (77%) diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java similarity index 77% rename from rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java rename to rxjava-core/src/main/java/rx/operators/AbstractOperation.java index f48be8d3f5..dd71bd837a 100644 --- a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java +++ b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java @@ -1,11 +1,6 @@ -package rx.testing; +package rx.operators; -import org.junit.Test; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func1; +import static org.junit.Assert.*; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -13,63 +8,72 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; +import org.junit.Test; -public class TrustedObservableTester +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Common utility functions for operator implementations and tests. + */ +/* package */class AbstractOperation { - private TrustedObservableTester() {} + private AbstractOperation() { + } - public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) - { - return new Func1, Subscription>() + public static class UnitTest { + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) { - @Override - public Subscription call(Observer observer) + return new Func1, Subscription>() { - return source.call(new TestingObserver(observer)); - } - }; - } + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } - public static class TestingObserver implements Observer { + public static class TestingObserver implements Observer { - private final Observer actual; - private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final AtomicBoolean isInCallback = new AtomicBoolean(false); + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); - public TestingObserver(Observer actual) { - this.actual = actual; - } + public TestingObserver(Observer actual) { + this.actual = actual; + } - @Override - public void onCompleted() { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onCompleted(); - isInCallback.set(false); - } + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } - @Override - public void onError(Exception e) { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onError(e); - isInCallback.set(false); - } + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } - @Override - public void onNext(T args) { - assertFalse("previous call to onCompleted() or onError()", isFinished.get()); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onNext(args); - isInCallback.set(false); - } + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } - } + } - public static class UnitTest { @Test(expected = AssertionError.class) public void testDoubleCompleted() { Observable.create(assertTrustedObservable(new Func1, Subscription>() @@ -141,7 +145,6 @@ public Subscription call(Observer observer) })).lastOrDefault("end"); } - @Test(expected = AssertionError.class) public void testErrorNext() { Observable.create(assertTrustedObservable(new Func1, Subscription>() @@ -250,4 +253,4 @@ public void onNext(String args) } } } -} +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index c4335b71d8..5ea6b627e4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,7 +15,16 @@ */ package rx.operators; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.AbstractOperation.UnitTest.*; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -23,18 +32,6 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static rx.testing.TrustedObservableTester.assertTrustedObservable; - /** * Returns a specified number of contiguous values from the start of an observable sequence. */ From 4a9f2fb05a3163e11ed35d85a33a6b8e216dde77 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sun, 31 Mar 2013 22:55:58 -0700 Subject: [PATCH 09/13] TakeWhile protect calls to predicate --- .../java/rx/operators/OperationTakeWhile.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index f45efabc92..1bad2d36e5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -123,7 +123,15 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + Boolean isSelected; + try { + isSelected = predicate.call(args, counter.getAndIncrement()); + } + catch (Exception e) { + observer.onError(e); + return; + } + if (isSelected) { observer.onNext(args); } else { observer.onCompleted(); @@ -238,6 +246,35 @@ public Boolean call(String s) })).last(); } + @Test + public void testTakeWhileProtectsPredicateCall() { + TestObservable source = new TestObservable(mock(Subscription.class), "one"); + final RuntimeException testException = new RuntimeException("test exception"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + throw testException; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + source.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, times(1)).onError(testException); + } + @Test public void testUnsubscribeAfterTake() { Subscription s = mock(Subscription.class); From 38e9e3fbf8e9e705b2ba23443c6976ac46d9fb8e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 1 Apr 2013 15:36:44 -0700 Subject: [PATCH 10/13] Improve error handling of user provided Observers https://github.com/Netflix/RxJava/issues/216 --- rxjava-core/src/main/java/rx/Observable.java | 319 +++++++++++++----- .../main/java/rx/operators/OperationTake.java | 3 +- .../java/rx/operators/OperationTakeWhile.java | 16 +- .../{AbstractOperation.java => Tester.java} | 13 +- .../src/main/java/rx/util/AtomicObserver.java | 23 +- 5 files changed, 284 insertions(+), 90 deletions(-) rename rxjava-core/src/main/java/rx/operators/{AbstractOperation.java => Tester.java} (95%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e332098ba2..4bb671446c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; @@ -41,9 +42,6 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; -import rx.operators.OperationTake; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -56,17 +54,21 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; +import rx.operators.OperationTake; import rx.operators.OperationTakeLast; +import rx.operators.OperationTakeWhile; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; +import rx.operators.OperationWhere; import rx.operators.OperationZip; import rx.operators.OperatorGroupBy; import rx.operators.OperatorTakeUntil; import rx.operators.OperatorToIterator; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; +import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; @@ -99,10 +101,9 @@ public class Observable { private final Func1, Subscription> onSubscribe; - private final boolean isTrusted; protected Observable() { - this(null, false); + this(null); } /** @@ -114,18 +115,7 @@ protected Observable() { * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. */ protected Observable(Func1, Subscription> onSubscribe) { - this(onSubscribe, false); - } - - /** - * @param onSubscribe - * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. - * @param isTrusted - * boolean true if the onSubscribe function is guaranteed to conform to the correct contract and thus shortcuts can be taken. - */ - private Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { this.onSubscribe = onSubscribe; - this.isTrusted = isTrusted; } /** @@ -159,7 +149,10 @@ public Subscription subscribe(Observer observer) { // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { - if (isTrusted) { + /** + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + if (observer.getClass().getPackage().getName().startsWith("rx")) { Subscription s = onSubscribe.call(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens @@ -565,7 +558,7 @@ public Subscription call(Observer t1) { return Subscriptions.empty(); } - }, true); + }); } } @@ -593,7 +586,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } - }, true); + }); } } @@ -619,13 +612,6 @@ public static Observable create(Func1, Subscription> func) { return new Observable(func); } - /* - * Private version that creates a 'trusted' Observable to allow performance optimizations. - */ - private static Observable _create(Func1, Subscription> func) { - return new Observable(func, true); - } - /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

@@ -697,7 +683,7 @@ public static Observable error(Exception exception) { * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, Func1 predicate) { - return _create(OperationFilter.filter(that, predicate)); + return create(OperationFilter.filter(that, predicate)); } /** @@ -729,7 +715,7 @@ public Boolean call(T t1) { * Filters an Observable by discarding any of its emissions that do not meet some test. *

* - * + * * @param that * the Observable to filter * @param predicate @@ -737,7 +723,7 @@ public Boolean call(T t1) { * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable where(Observable that, Func1 predicate) { - return _create(OperationWhere.where(that, predicate)); + return create(OperationWhere.where(that, predicate)); } /** @@ -797,7 +783,7 @@ public static Observable range(int start, int count) { * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Func0> observableFactory) { - return _create(OperationDefer.defer(observableFactory)); + return create(OperationDefer.defer(observableFactory)); } /** @@ -816,7 +802,7 @@ public static Observable defer(Object observableFactory) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(observableFactory); - return _create(OperationDefer.defer(new Func0>() { + return create(OperationDefer.defer(new Func0>() { @Override @SuppressWarnings("unchecked") @@ -980,7 +966,7 @@ public Boolean call(T args) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - return _create(OperationMap.map(sequence, func)); + return create(OperationMap.map(sequence, func)); } /** @@ -1036,7 +1022,7 @@ public R call(T t1) { * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, Func1> func) { - return _create(OperationMap.mapMany(sequence, func)); + return create(OperationMap.mapMany(sequence, func)); } /** @@ -1085,7 +1071,7 @@ public R call(T t1) { * @see MSDN: Observable.Materialize */ public static Observable> materialize(final Observable sequence) { - return _create(OperationMaterialize.materialize(sequence)); + return create(OperationMaterialize.materialize(sequence)); } /** @@ -1097,7 +1083,7 @@ public static Observable> materialize(final Observable se * @see MSDN: Observable.Dematerialize */ public static Observable dematerialize(final Observable> sequence) { - return _create(OperationDematerialize.dematerialize(sequence)); + return create(OperationDematerialize.dematerialize(sequence)); } /** @@ -1114,7 +1100,7 @@ public static Observable dematerialize(final Observable> * @see MSDN: Observable.Merge */ public static Observable merge(List> source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1131,7 +1117,7 @@ public static Observable merge(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable> source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1148,7 +1134,7 @@ public static Observable merge(Observable> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable... source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1181,7 +1167,7 @@ public static Observable takeUntil(final Observable source, final O * @see MSDN: Observable.Concat Method */ public static Observable concat(Observable... source) { - return _create(OperationConcat.concat(source)); + return create(OperationConcat.concat(source)); } /** @@ -1202,7 +1188,7 @@ public static Observable concat(Observable... source) { * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { - return _create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); + return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); } /** @@ -1219,7 +1205,7 @@ public static Observable> groupBy(Observable Observable> groupBy(Observable source, final Func1 keySelector) { - return _create(OperatorGroupBy.groupBy(source, keySelector)); + return create(OperatorGroupBy.groupBy(source, keySelector)); } /** @@ -1238,7 +1224,7 @@ public static Observable> groupBy(Observable s * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(List> source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1257,7 +1243,7 @@ public static Observable mergeDelayError(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable> source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1276,7 +1262,7 @@ public static Observable mergeDelayError(Observable> source * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable... source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1317,7 +1303,7 @@ public static Observable never() { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { - return _create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); + return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } /** @@ -1381,7 +1367,7 @@ public Observable call(Exception e) { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { - return _create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); + return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); } /** @@ -1405,7 +1391,7 @@ public static Observable onErrorResumeNext(final Observable that, fina * @return the source Observable, with its behavior modified as described */ public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { - return _create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); + return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } /** @@ -1435,7 +1421,7 @@ public static Observable onErrorReturn(final Observable that, Func1Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1); + return takeLast(create(OperationScan.scan(sequence, accumulator)), 1); } /** @@ -1507,7 +1493,7 @@ public T call(T t1, T t2) { * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1); + return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** @@ -1571,7 +1557,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, Func2 accumulator) { - return _create(OperationScan.scan(sequence, accumulator)); + return create(OperationScan.scan(sequence, accumulator)); } /** @@ -1629,7 +1615,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { - return _create(OperationScan.scan(sequence, initialValue, accumulator)); + return create(OperationScan.scan(sequence, initialValue, accumulator)); } /** @@ -1669,20 +1655,28 @@ public T call(T t1, T t2) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param sequence an observable sequence whose elements to apply the predicate to. - * @param predicate a function to test each element for a condition. - * @param the type of observable. + * + * @param sequence + * an observable sequence whose elements to apply the predicate to. + * @param predicate + * a function to test each element for a condition. + * @param + * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, final Func1 predicate) { - return _create(OperationAll.all(sequence, predicate)); + return create(OperationAll.all(sequence, predicate)); } /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param sequence an observable sequence whose elements to apply the predicate to. - * @param predicate a function to test each element for a condition. - * @param the type of observable. + * + * @param sequence + * an observable sequence whose elements to apply the predicate to. + * @param predicate + * a function to test each element for a condition. + * @param + * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, Object predicate) { @@ -1712,7 +1706,7 @@ public Boolean call(T t) { * @see MSDN: Observable.Skip Method */ public static Observable skip(final Observable items, int num) { - return _create(OperationSkip.skip(items, num)); + return create(OperationSkip.skip(items, num)); } /** @@ -1730,7 +1724,7 @@ public static Observable skip(final Observable items, int num) { * @return an Observable that is a chronologically well-behaved version of the source Observable */ public static Observable synchronize(Observable observable) { - return _create(OperationSynchronize.synchronize(observable)); + return create(OperationSynchronize.synchronize(observable)); } /** @@ -1752,7 +1746,7 @@ public static Observable synchronize(Observable observable) { * Observable */ public static Observable take(final Observable items, final int num) { - return _create(OperationTake.take(items, num)); + return create(OperationTake.take(items, num)); } /** @@ -1768,7 +1762,7 @@ public static Observable take(final Observable items, final int num) { * Observable */ public static Observable takeLast(final Observable items, final int count) { - return _create(OperationTakeLast.takeLast(items, count)); + return create(OperationTakeLast.takeLast(items, count)); } /** @@ -1849,7 +1843,7 @@ public Boolean call(T t, Integer integer) * items emitted by the source Observable */ public static Observable> toList(final Observable that) { - return _create(OperationToObservableList.toObservableList(that)); + return create(OperationToObservableList.toObservableList(that)); } /** @@ -2047,7 +2041,7 @@ private static T singleOrDefault(Observable that, boolean hasDefault, T d * @return an Observable that emits each item in the source Iterable sequence */ public static Observable toObservable(Iterable iterable) { - return _create(OperationToObservableIterable.toObservableIterable(iterable)); + return create(OperationToObservableIterable.toObservableIterable(iterable)); } /** @@ -2066,7 +2060,7 @@ public static Observable toObservable(Iterable iterable) { * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future) { - return _create(OperationToObservableFuture.toObservableFuture(future)); + return create(OperationToObservableFuture.toObservableFuture(future)); } /** @@ -2090,7 +2084,7 @@ public static Observable toObservable(Future future) { * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future, long time, TimeUnit unit) { - return _create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); } /** @@ -2123,7 +2117,7 @@ public static Observable toObservable(T... items) { * @return */ public static Observable> toSortedList(Observable sequence) { - return _create(OperationToObservableSortedList.toSortedList(sequence)); + return create(OperationToObservableSortedList.toSortedList(sequence)); } /** @@ -2136,7 +2130,7 @@ public static Observable> toSortedList(Observable sequence) { * @return */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { - return _create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); + return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); } /** @@ -2151,7 +2145,7 @@ public static Observable> toSortedList(Observable sequence, Func2 public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(sortFunction); - return _create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { + return create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { @Override public Integer call(T t1, T t2) { @@ -2186,7 +2180,7 @@ public Integer call(T t1, T t2) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { - return _create(OperationZip.zip(w0, w1, reduceFunction)); + return create(OperationZip.zip(w0, w1, reduceFunction)); } /** @@ -2310,7 +2304,7 @@ public R call(T0 t0, T1 t1) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { - return _create(OperationZip.zip(w0, w1, w2, function)); + return create(OperationZip.zip(w0, w1, w2, function)); } /** @@ -2385,7 +2379,7 @@ public R call(T0 t0, T1 t1, T2 t2) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { - return _create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); + return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); } /** @@ -2472,7 +2466,7 @@ public Boolean call(T t1) { * Filters an Observable by discarding any of its emissions that do not meet some test. *

* - * + * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter @@ -2663,7 +2657,7 @@ public Observable> materialize() { */ @SuppressWarnings("unchecked") public Observable dematerialize() { - return dematerialize((Observable>)this); + return dematerialize((Observable>) this); } /** @@ -3008,7 +3002,9 @@ public Observable scan(final T initialValue, final Object accumulator) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param predicate a function to test each element for a condition. + * + * @param predicate + * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Func1 predicate) { @@ -3017,7 +3013,9 @@ public Observable all(Func1 predicate) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param predicate a function to test each element for a condition. + * + * @param predicate + * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Object predicate) { @@ -3556,6 +3554,175 @@ public void testMaterializeDematerializeChaining() { verify(observer, times(0)).onError(any(Exception.class)); } + /** + * The error from the user provided Observer is not handled by the subscribe method try/catch. + * + * It is handled by the AtomicObserver that wraps the provided Observer. + * + * Result: Passes (if AtomicObserver functionality exists) + */ + @Test + public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription s = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + try { + if (!s.isUnsubscribed()) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + } + } finally { + latch.countDown(); + } + } + }).start(); + return s; + } + }).subscribe(new AtomicObserver(new AtomicObservableSubscription(), new Observer() { + // we are manually wrapping in AtomicObserver here to simulate + // what will happen when a user provided Observer implementation is passed in + // since the subscribe method will wrap it in AtomicObserver if it's not in an rx.* package + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + })); + + // wait for async sequence to complete + latch.await(); + + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObserverSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + return Subscriptions.empty(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous + * + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObservableSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + throw new NumberFormatException(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + System.out.println(v); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 5ea6b627e4..a4b1922e22 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -18,7 +18,7 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.AbstractOperation.UnitTest.*; +import static rx.operators.Tester.UnitTest.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -281,6 +281,7 @@ public void run() { } } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 1bad2d36e5..d00d66c3aa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -26,11 +26,13 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; +import rx.util.AtomicObserver; import rx.util.functions.Func1; import rx.util.functions.Func2; -import rx.subjects.Subject; + /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. */ @@ -45,7 +47,7 @@ public final class OperationTakeWhile { * @return */ public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + return takeWhileWithIndex(items, OperationTakeWhile. skipIndex(predicate)); } /** @@ -108,7 +110,10 @@ private class ItemObserver implements Observer { private final Observer observer; public ItemObserver(Observer observer) { - this.observer = observer; + // Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior + // needs "isFinished" logic to not send duplicated events + // The 'testTakeWhile1' and 'testTakeWhile2' tests fail without this. + this.observer = new AtomicObserver(subscription, observer); } @Override @@ -126,8 +131,7 @@ public void onNext(T args) { Boolean isSelected; try { isSelected = predicate.call(args, counter.getAndIncrement()); - } - catch (Exception e) { + } catch (Exception e) { observer.onError(e); return; } @@ -171,7 +175,7 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { Subject s = Subject.create(); - Observable w = (Observable)s; + Observable w = (Observable) s; Observable take = Observable.create(takeWhile(w, new Func1() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/AbstractOperation.java b/rxjava-core/src/main/java/rx/operators/Tester.java similarity index 95% rename from rxjava-core/src/main/java/rx/operators/AbstractOperation.java rename to rxjava-core/src/main/java/rx/operators/Tester.java index dd71bd837a..9692323015 100644 --- a/rxjava-core/src/main/java/rx/operators/AbstractOperation.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -17,11 +17,16 @@ import rx.util.functions.Func1; /** - * Common utility functions for operator implementations and tests. + * Common utility functions for testing operator implementations. */ -/* package */class AbstractOperation -{ - private AbstractOperation() { +/* package */class Tester { + /* + * This is purposefully package-only so it does not leak into the public API outside of this package. + * + * This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility. + */ + + private Tester() { } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserver.java b/rxjava-core/src/main/java/rx/util/AtomicObserver.java index b3e18f5b3e..24519dc276 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserver.java @@ -1,8 +1,10 @@ package rx.util; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import rx.Observer; +import rx.plugins.RxJavaPlugins; /** * Wrapper around Observer to ensure compliance with Rx contract. @@ -32,7 +34,7 @@ *

  • When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).
  • * *

    - * It will not synchronized onNext execution. Use the {@link SynchronizedObserver} to do that. + * It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that. * * @param */ @@ -50,7 +52,12 @@ public AtomicObserver(AtomicObservableSubscription subscription, Observer act @Override public void onCompleted() { if (isFinished.compareAndSet(false, true)) { - actual.onCompleted(); + try { + actual.onCompleted(); + } catch (Exception e) { + // handle errors if the onCompleted implementation fails, not just if the Observable fails + onError(e); + } // auto-unsubscribe subscription.unsubscribe(); } @@ -59,7 +66,17 @@ public void onCompleted() { @Override public void onError(Exception e) { if (isFinished.compareAndSet(false, true)) { - actual.onError(e); + try { + actual.onError(e); + } catch (Exception e2) { + // if the onError itself fails then pass to the plugin + // see https://github.com/Netflix/RxJava/issues/216 for further discussion + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e2); + // and throw exception despite that not being proper for Rx + // https://github.com/Netflix/RxJava/issues/198 + throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); + } // auto-unsubscribe subscription.unsubscribe(); } From c6c5449578020bd9100be0520c4b0d67e7b7d502 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 1 Apr 2013 16:09:47 -0700 Subject: [PATCH 11/13] Protect subscribe/forEach implementations against user provided function failures Related to https://github.com/Netflix/RxJava/issues/216 The new forEach unit test went into a deadlock prior to this fix. --- rxjava-core/src/main/java/rx/Observable.java | 119 +++++++++++++++++-- 1 file changed, 110 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 4bb671446c..d6b4aa6e36 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -81,6 +81,7 @@ import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.FuncN; +import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; @@ -152,7 +153,7 @@ public Subscription subscribe(Observer observer) { /** * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ - if (observer.getClass().getPackage().getName().startsWith("rx")) { + if (isInternalImplementation(observer)) { Subscription s = onSubscribe.call(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens @@ -178,6 +179,16 @@ public Subscription subscribe(Observer observer) { } } + /** + * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. + *

    + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + private Subscription protectivelyWrapAndSubscribe(Observer o) { + AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + return subscription.wrap(subscribe(new AtomicObserver(subscription, o))); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Map callbacks) { // lookup and memoize onNext @@ -187,7 +198,12 @@ public Subscription subscribe(final Map callbacks) { } final FuncN onNext = Functions.from(_onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); @@ -224,7 +240,12 @@ public Subscription subscribe(final Object o) { } final FuncN onNext = Functions.from(o); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -244,7 +265,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -273,7 +299,12 @@ public Subscription subscribe(final Object onNext, final Object onError) { } final FuncN onNextFunction = Functions.from(onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -295,7 +326,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext, final Action1 onError) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -326,7 +362,12 @@ public Subscription subscribe(final Object onNext, final Object onError, final O } final FuncN onNextFunction = Functions.from(onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { if (onComplete != null) { @@ -350,7 +391,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { onComplete.call(); @@ -389,7 +435,12 @@ public void forEach(final Action1 onNext) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionFromOnError = new AtomicReference(); - subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { latch.countDown(); } @@ -3260,6 +3311,23 @@ public Iterable mostRecent(T initialValue) { return mostRecent(this, initialValue); } + /** + * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. + *

    + * For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + * + * NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface. + * + * @param f + * @return + */ + private boolean isInternalImplementation(Object o) { + if (o == null) { + return true; + } + return (o.getClass().getPackage().getName().startsWith("rx.")); + } + public static class UnitTest { @Mock @@ -3723,6 +3791,39 @@ public void onNext(String v) { } } + @Test + public void testForEachWithError() { + try { + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).forEach(new Action1() { + + @Override + public void call(String t1) { + throw new RuntimeException("fail"); + } + }); + fail("we expect an exception to be thrown"); + } catch (Exception e) { + // do nothing as we expect this + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } From 924ce5c051e5a50f89cd17392a93ddd00d4e547c Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 1 Apr 2013 22:16:32 -0700 Subject: [PATCH 12/13] Restrict identification of "internal" operators to only the rx.operators package - based on discussion at https://github.com/Netflix/RxJava/pull/221 - don't wrap at AtomicObserver again - anything outside of rx.operators will be wrapped --- rxjava-core/src/main/java/rx/Observable.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d6b4aa6e36..95e3830979 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3325,7 +3325,11 @@ private boolean isInternalImplementation(Object o) { if (o == null) { return true; } - return (o.getClass().getPackage().getName().startsWith("rx.")); + // prevent double-wrapping (yeah it happens) + if (o instanceof AtomicObserver) + return true; + // we treat the following package as "internal" and don't wrap it + return o.getClass().getPackage().getName().startsWith("rx.operators"); } public static class UnitTest { @@ -3658,11 +3662,7 @@ public void run() { }).start(); return s; } - }).subscribe(new AtomicObserver(new AtomicObservableSubscription(), new Observer() { - // we are manually wrapping in AtomicObserver here to simulate - // what will happen when a user provided Observer implementation is passed in - // since the subscribe method will wrap it in AtomicObserver if it's not in an rx.* package - + }).subscribe(new Observer() { @Override public void onCompleted() { System.out.println("completed"); @@ -3683,7 +3683,7 @@ public void onNext(String v) { count.incrementAndGet(); } - })); + }); // wait for async sequence to complete latch.await(); From 169e7e06ea223bd3fdca2460b74e9cd361439fff Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 1 Apr 2013 22:51:19 -0700 Subject: [PATCH 13/13] Trying to fix non-deterministic test - not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread See https://github.com/Netflix/RxJava/pull/201 for more information. --- .../src/main/java/rx/operators/OperationMerge.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index eeb1e96407..d5aebe0ad5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -441,6 +441,15 @@ public void onNext(String v) { o1.onNextBeingSent.await(); o2.onNextBeingSent.await(); + // I can't think of a way to know for sure that both threads have or are trying to send onNext + // since I can't use a CountDownLatch for "after" onNext since I want to catch during it + // but I can't know for sure onNext is invoked + // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time + // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following + // onNext is invoked. + + Thread.sleep(300); + try { // in try/finally so threads are released via latch countDown even if assertion fails assertEquals(1, concurrentCounter.get()); } finally { @@ -541,6 +550,8 @@ public Subscription subscribe(final Observer observer) { public void run() { onNextBeingSent.countDown(); observer.onNext("hello"); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock observer.onCompleted(); }