From f81c0aca8e45bc9a629df4220030aea10f22d38c Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Fri, 22 Mar 2013 03:15:34 +0000 Subject: [PATCH 1/2] Reimplementation of Concat, improved handling of Observable>. The old version required all of the Observables to be generated and buffered before the concat could begin. If the outer Observable was asynchronous, items could be dropped (test added). The new version passes the test, and does the best job I could (after examining several possible strategies) of achieving clear and consistent semantics in accordance with the principle of least surprise. --- .../java/rx/operators/OperationConcat.java | 209 ++++++++++-------- settings.gradle | 4 +- 2 files changed, 124 insertions(+), 89 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index c621002533..4cb5fc5691 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -18,10 +18,11 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import java.lang.reflect.Array; +import java.util.Arrays; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; import org.junit.Before; @@ -30,105 +31,106 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.BooleanSubscription; import rx.util.AtomicObservableSubscription; -import rx.util.functions.Action1; +import rx.util.Exceptions; import rx.util.functions.Func1; public final class OperationConcat { /** - * Combine the observable sequences from the list of Observables into one observable sequence without any transformation. - * - * @param sequences - * An observable sequence of elements to project. + * Combine the observable sequences from the list of Observables into one + * observable sequence without any transformation. If either the outer + * observable or an inner observable calls onError, we will call onError. + * + *

+ * + * The outer observable might run on a separate thread from (one of) the + * inner observables; in this case care must be taken to avoid a deadlock. + * The Concat operation may block the outer thread while servicing an inner + * thread in order to ensure a well-defined ordering of elements; therefore + * none of the inner threads must be implemented in a way that might wait on + * the outer thread. + * + * @param sequences An observable sequence of elements to project. * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ public static Func1, Subscription> concat(final Observable... sequences) { - return new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - return new Concat(sequences).call(observer); - } - }; + return concat(Observable.toObservable(sequences)); } public static Func1, Subscription> concat(final List> sequences) { - @SuppressWarnings("unchecked") - Observable[] o = sequences.toArray((Observable[]) Array.newInstance(Observable.class, sequences.size())); - return concat(o); + return concat(Observable.toObservable(sequences)); } public static Func1, Subscription> concat(final Observable> sequences) { - final List> list = new ArrayList>(); - sequences.toList().subscribe(new Action1>>() { - @Override - public void call(List> t1) { - list.addAll(t1); - } - - }); - - return concat(list); - } - - private static class Concat implements Func1, Subscription> { - private final Observable[] sequences; - private int num = 0; - private int count = 0; - private Subscription s; - - Concat(final Observable... sequences) { - this.sequences = sequences; - this.num = sequences.length - 1; - } - - private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription(); - - private final Subscription actualSubscription = new Subscription() { + return new Func1, Subscription>() { @Override - public void unsubscribe() { - if (null != s) - s.unsubscribe(); + public Subscription call(Observer observer) { + return new ConcatSubscription(sequences, observer); } }; + } - public Subscription call(Observer observer) { - s = sequences[count].subscribe(new ConcatObserver(observer)); - - return Subscription.wrap(actualSubscription); - } - - private class ConcatObserver implements Observer { - private final Observer observer; + private static class ConcatSubscription extends BooleanSubscription { + // Might be updated by an inner thread's onError during the outer + // thread's onNext, then read in the outer thread's onComplete. + final AtomicBoolean innerError = new AtomicBoolean(false); - ConcatObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onCompleted() { - if (num == count) - observer.onCompleted(); - else { - count++; - s = sequences[count].subscribe(this); + public ConcatSubscription(Observable> sequences, final Observer observer) { + final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription(); + outerSubscription.wrap(sequences.subscribe(new Observer>() { + @Override + public void onNext(Observable nextSequence) { + // We will not return from onNext until the inner observer completes. + // NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted. + final CountDownLatch latch = new CountDownLatch(1); + final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequence.subscribe(new Observer() { + @Override + public void onNext(T item) { + // If the Concat's subscriber called unsubscribe() before the return of onNext, we must do so also. + observer.onNext(item); + if (isUnsubscribed()) { + innerSubscription.unsubscribe(); + outerSubscription.unsubscribe(); + } + } + @Override + public void onError(Exception e) { + outerSubscription.unsubscribe(); + innerError.set(true); + observer.onError(e); + latch.countDown(); + } + @Override + public void onCompleted() { + // Continue on to the next sequence + latch.countDown(); + } + })); + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Exceptions.propagate(e); + } } - } - - @Override - public void onError(Exception e) { - observer.onError(e); - - } - - @Override - public void onNext(T args) { - observer.onNext(args); - - } - } + @Override + public void onError(Exception e) { + // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. + observer.onError(e); + } + @Override + public void onCompleted() { + // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. + if (!innerError.get()) { + observer.onCompleted(); + } + } + })); + } } public static class UnitTest { @@ -193,8 +195,8 @@ public void testConcatWithList() { public void testConcatUnsubscribe() { final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); - final TestObservable w1 = new TestObservable(null, null, "one", "two", "three"); - final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); + final TestObservable w1 = new TestObservable(null, null, "one", "two", "three"); + final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -256,7 +258,40 @@ public void unsubscribe() { Assert.assertEquals(expected.length, index); } - private static class TestObservable extends Observable { + @Test + public void testBlockedObservableOfObservables() { + final String[] o = { "1", "3", "5", "7" }; + final String[] e = { "2", "4", "6" }; + final Observable odds = Observable.toObservable(o); + final Observable even = Observable.toObservable(e); + final CountDownLatch callOnce = new CountDownLatch(1); + final CountDownLatch okToContinue = new CountDownLatch(1); + TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even); + Func1, Subscription> concatF = concat(observableOfObservables); + Observable concat = Observable.create(concatF); + concat.subscribe(observer); + try { + //Block main thread to allow observables to serve up o1. + callOnce.await(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + // The concated observable should have served up all of the odds. + Assert.assertEquals(o.length, index); + try { + // unblock observables so it can serve up o2 and complete + okToContinue.countDown(); + observableOfObservables.t.join(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + // The concatenated observable should now have served up all the evens. + Assert.assertEquals(expected.length, index); + } + + private static class TestObservable extends Observable { private final Subscription s = new Subscription() { @@ -266,28 +301,28 @@ public void unsubscribe() { } }; - private final String[] values; + private final List values; private Thread t = null; private int count = 0; private boolean subscribed = true; private final CountDownLatch once; private final CountDownLatch okToContinue; - public TestObservable(CountDownLatch once, CountDownLatch okToContinue, String... values) { - this.values = values; + public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) { + this.values = Arrays.asList(values); this.once = once; this.okToContinue = okToContinue; } @Override - public Subscription subscribe(final Observer observer) { + public Subscription subscribe(final Observer observer) { t = new Thread(new Runnable() { @Override public void run() { try { - while (count < values.length && subscribed) { - observer.onNext(values[count]); + while (count < values.size() && subscribed) { + observer.onNext(values.get(count)); count++; //Unblock the main thread to call unsubscribe. if (null != once) diff --git a/settings.gradle b/settings.gradle index df14bb21d7..90e4cb7082 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,5 +2,5 @@ rootProject.name='rxjava' include 'rxjava-core', \ 'language-adaptors:rxjava-groovy', \ 'language-adaptors:rxjava-jruby', \ -'language-adaptors:rxjava-clojure', \ -'language-adaptors:rxjava-scala' +'language-adaptors:rxjava-clojure' + From 8e179a1df47d5f2f5b1c29ac37b8396d5c2a094c Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Fri, 29 Mar 2013 08:53:55 +0000 Subject: [PATCH 2/2] Incorporate review feedback. Also restore the errant change to settings.gradle. --- .../java/rx/operators/OperationConcat.java | 338 +++++++++++++----- settings.gradle | 4 +- 2 files changed, 255 insertions(+), 87 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 4cb5fc5691..ff62badd9d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,11 +23,14 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -52,6 +55,11 @@ public final class OperationConcat { * none of the inner threads must be implemented in a way that might wait on * the outer thread. * + *

+ * + * Beware that concat(o1,o2).subscribe() is a blocking call from + * which it is impossible to unsubscribe. + * * @param sequences An observable sequence of elements to project. * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ @@ -90,11 +98,12 @@ public void onNext(Observable nextSequence) { innerSubscription.wrap(nextSequence.subscribe(new Observer() { @Override public void onNext(T item) { - // If the Concat's subscriber called unsubscribe() before the return of onNext, we must do so also. - observer.onNext(item); + // Make our best-effort to release resources in the face of unsubscribe. if (isUnsubscribed()) { innerSubscription.unsubscribe(); outerSubscription.unsubscribe(); + } else { + observer.onNext(item); } } @Override @@ -130,38 +139,16 @@ public void onCompleted() { } } })); - } + } } public static class UnitTest { - private final static String[] expected = { "1", "3", "5", "7", "2", "4", "6" }; - private int index = 0; - - Observer observer = new Observer() { - - @Override - public void onCompleted() { - } - - @Override - public void onError(Exception e) { - // TODO Auto-generated method stub - } - - @Override - public void onNext(String args) { - Assert.assertEquals(expected[index], args); - index++; - } - }; - - @Before - public void before() { - index = 0; - } @Test public void testConcat() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; @@ -171,12 +158,15 @@ public void testConcat() { @SuppressWarnings("unchecked") Observable concat = Observable.create(concat(odds, even)); concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + verify(observer, times(7)).onNext(anyString()); } @Test public void testConcatWithList() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; @@ -187,26 +177,36 @@ public void testConcatWithList() { list.add(even); Observable concat = Observable.create(concat(list)); concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + verify(observer, times(7)).onNext(anyString()); } @Test public void testConcatUnsubscribe() { final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); - final TestObservable w1 = new TestObservable(null, null, "one", "two", "three"); + final TestObservable w1 = new TestObservable("one", "two", "three"); final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); + final Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") - Observable concat = Observable.create(concat(w1, w2)); - Subscription s1 = concat.subscribe(aObserver); - + final Observable concat = Observable.create(concat(w1, w2)); + final AtomicObservableSubscription s1 = new AtomicObservableSubscription(); + Thread t = new Thread() { + @Override + public void run() { + // NB: this statement does not complete until after "six" has been delivered. + s1.wrap(concat.subscribe(aObserver)); + } + }; + t.start(); try { //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once. callOnce.await(); + // NB: This statement has no effect, since s1 cannot possibly + // wrap anything until "six" has been delivered, which cannot + // happen until we okToContinue.countDown() s1.unsubscribe(); //Unblock the observable to continue. okToContinue.countDown(); @@ -217,16 +217,23 @@ public void testConcatUnsubscribe() { fail(e.getMessage()); } - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, times(1)).onNext("three"); - verify(aObserver, times(1)).onNext("four"); - verify(aObserver, never()).onNext("five"); - verify(aObserver, never()).onNext("six"); + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(1)).onNext("four"); + // NB: you might hope that five and six are not delivered, but see above. + inOrder.verify(aObserver, times(1)).onNext("five"); + inOrder.verify(aObserver, times(1)).onNext("six"); + inOrder.verify(aObserver, times(1)).onCompleted(); + } @Test public void testMergeObservableOfObservables() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; @@ -235,31 +242,182 @@ public void testMergeObservableOfObservables() { Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { - @Override - public Subscription call(Observer> observer) { - // simulate what would happen in an observable - observer.onNext(odds); - observer.onNext(even); - observer.onCompleted(); + @Override + public Subscription call(Observer> observer) { + // simulate what would happen in an observable + observer.onNext(odds); + observer.onNext(even); + observer.onCompleted(); - return new Subscription() { + return new Subscription() { - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } + @Override + public void unsubscribe() { + // unregister ... will never be called here since we are executing synchronously + } - }; - } + }; + } - }); + }); Observable concat = Observable.create(concat(observableOfObservables)); concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + verify(observer, times(7)).onNext(anyString()); + } + + /** + * Simple concat of 2 asynchronous observables ensuring it emits in correct order. + */ + @SuppressWarnings("unchecked") + @Test + public void testSimpleAsyncConcat() { + Observer observer = mock(Observer.class); + + TestObservable o1 = new TestObservable("one", "two", "three"); + TestObservable o2 = new TestObservable("four", "five", "six"); + + Observable.concat(o1, o2).subscribe(observer); + + try { + // wait for async observables to complete + o1.t.join(); + o2.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads"); + } + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onNext("five"); + inOrder.verify(observer, times(1)).onNext("six"); + } + + /** + * Test an async Observable that emits more async Observables + */ + @SuppressWarnings("unchecked") + @Test + public void testNestedAsyncConcat() throws Exception { + Observer observer = mock(Observer.class); + + final TestObservable o1 = new TestObservable("one", "two", "three"); + final TestObservable o2 = new TestObservable("four", "five", "six"); + final TestObservable o3 = new TestObservable("seven", "eight", "nine"); + final CountDownLatch allowThird = new CountDownLatch(1); + + final AtomicReference parent = new AtomicReference(); + Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { + + @Override + public Subscription call(final Observer> observer) { + final BooleanSubscription s = new BooleanSubscription(); + parent.set(new Thread(new Runnable() { + + @Override + public void run() { + try { + // emit first + if (!s.isUnsubscribed()) { + System.out.println("Emit o1"); + observer.onNext(o1); + } + // emit second + if (!s.isUnsubscribed()) { + System.out.println("Emit o2"); + observer.onNext(o2); + } + + // wait until sometime later and emit third + try { + allowThird.await(); + } catch (InterruptedException e) { + observer.onError(e); + } + if (!s.isUnsubscribed()) { + System.out.println("Emit o3"); + observer.onNext(o3); + } + + } catch (Exception e) { + observer.onError(e); + } finally { + System.out.println("Done parent Observable"); + observer.onCompleted(); + } + } + })); + parent.get().start(); + return s; + } + }); + + Observable.create(concat(observableOfObservables)).subscribe(observer); + + // wait for parent to start + while (parent.get() == null) { + Thread.sleep(1); + } + + try { + // wait for first 2 async observables to complete + while (o1.t == null) { + Thread.sleep(1); + } + System.out.println("Thread1 started ... waiting for it to complete ..."); + o1.t.join(); + while (o2.t == null) { + Thread.sleep(1); + } + System.out.println("Thread2 started ... waiting for it to complete ..."); + o2.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads", e); + } + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onNext("five"); + inOrder.verify(observer, times(1)).onNext("six"); + // we shouldn't have the following 3 yet + inOrder.verify(observer, never()).onNext("seven"); + inOrder.verify(observer, never()).onNext("eight"); + inOrder.verify(observer, never()).onNext("nine"); + // we should not be completed yet + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + // now allow the third + allowThird.countDown(); + + try { + while (o3.t == null) { + Thread.sleep(1); + } + // wait for 3rd to complete + o3.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads", e); + } + + inOrder.verify(observer, times(1)).onNext("seven"); + inOrder.verify(observer, times(1)).onNext("eight"); + inOrder.verify(observer, times(1)).onNext("nine"); + + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); } + @SuppressWarnings("unchecked") @Test public void testBlockedObservableOfObservables() { + Observer observer = mock(Observer.class); + final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; final Observable odds = Observable.toObservable(o); @@ -278,7 +436,11 @@ public void testBlockedObservableOfObservables() { fail(ex.getMessage()); } // The concated observable should have served up all of the odds. - Assert.assertEquals(o.length, index); + verify(observer, times(1)).onNext("1"); + verify(observer, times(1)).onNext("3"); + verify(observer, times(1)).onNext("5"); + verify(observer, times(1)).onNext("7"); + try { // unblock observables so it can serve up o2 and complete okToContinue.countDown(); @@ -288,19 +450,21 @@ public void testBlockedObservableOfObservables() { fail(ex.getMessage()); } // The concatenated observable should now have served up all the evens. - Assert.assertEquals(expected.length, index); + verify(observer, times(1)).onNext("2"); + verify(observer, times(1)).onNext("4"); + verify(observer, times(1)).onNext("6"); } private static class TestObservable extends Observable { private final Subscription s = new Subscription() { - @Override - public void unsubscribe() { - subscribed = false; - } + @Override + public void unsubscribe() { + subscribed = false; + } - }; + }; private final List values; private Thread t = null; private int count = 0; @@ -308,6 +472,10 @@ public void unsubscribe() { private final CountDownLatch once; private final CountDownLatch okToContinue; + public TestObservable(T... values) { + this(null, null, values); + } + public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) { this.values = Arrays.asList(values); this.once = once; @@ -318,28 +486,28 @@ public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... val public Subscription subscribe(final Observer observer) { t = new Thread(new Runnable() { - @Override - public void run() { - try { - while (count < values.size() && subscribed) { - observer.onNext(values.get(count)); - count++; - //Unblock the main thread to call unsubscribe. - if (null != once) - once.countDown(); - //Block until the main thread has called unsubscribe. - if (null != once) - okToContinue.await(); + @Override + public void run() { + try { + while (count < values.size() && subscribed) { + observer.onNext(values.get(count)); + count++; + //Unblock the main thread to call unsubscribe. + if (null != once) + once.countDown(); + //Block until the main thread has called unsubscribe. + if (null != once) + okToContinue.await(); + } + if (subscribed) + observer.onCompleted(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(e.getMessage()); } - if (subscribed) - observer.onCompleted(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); } - } - }); + }); t.start(); return s; } diff --git a/settings.gradle b/settings.gradle index 90e4cb7082..df14bb21d7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,5 +2,5 @@ rootProject.name='rxjava' include 'rxjava-core', \ 'language-adaptors:rxjava-groovy', \ 'language-adaptors:rxjava-jruby', \ -'language-adaptors:rxjava-clojure' - +'language-adaptors:rxjava-clojure', \ +'language-adaptors:rxjava-scala'