diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index c621002533..93c14b2acf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,193 +18,461 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import java.lang.reflect.Array; +import java.util.Arrays; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; + import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.BooleanSubscription; import rx.util.AtomicObservableSubscription; -import rx.util.functions.Action1; +import rx.util.Exceptions; import rx.util.functions.Func1; public final class OperationConcat { /** - * Combine the observable sequences from the list of Observables into one observable sequence without any transformation. - * - * @param sequences - * An observable sequence of elements to project. + * Combine the observable sequences from the list of Observables into one + * observable sequence without any transformation. If either the outer + * observable or an inner observable calls onError, we will call onError. + * + *

+ * + * The outer observable might run on a separate thread from (one of) the + * inner observables; in this case care must be taken to avoid a deadlock. + * The Concat operation may block the outer thread while servicing an inner + * thread in order to ensure a well-defined ordering of elements; therefore + * none of the inner threads must be implemented in a way that might wait on + * the outer thread. + * + *

+ * + * Beware that concat(o1,o2).subscribe() is a blocking call from + * which it is impossible to unsubscribe if observables are running on same thread. + * + * @param sequences An observable sequence of elements to project. * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ public static Func1, Subscription> concat(final Observable... sequences) { + return concat(Observable.toObservable(sequences)); + } + + public static Func1, Subscription> concat(final List> sequences) { + return concat(Observable.toObservable(sequences)); + } + + public static Func1, Subscription> concat(final Observable> sequences) { return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new Concat(sequences).call(observer); + return new ConcatSubscription(sequences, observer); } }; } - public static Func1, Subscription> concat(final List> sequences) { - @SuppressWarnings("unchecked") - Observable[] o = sequences.toArray((Observable[]) Array.newInstance(Observable.class, sequences.size())); - return concat(o); + private static class ConcatSubscription extends BooleanSubscription { + // Might be updated by an inner thread's onError during the outer + // thread's onNext, then read in the outer thread's onComplete. + final AtomicBoolean innerError = new AtomicBoolean(false); + + public ConcatSubscription(Observable> sequences, final Observer observer) { + final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription(); + outerSubscription.wrap(sequences.subscribe(new Observer>() { + @Override + public void onNext(Observable nextSequence) { + // We will not return from onNext until the inner observer completes. + // NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted. + final CountDownLatch latch = new CountDownLatch(1); + final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequence.subscribe(new Observer() { + @Override + public void onNext(T item) { + // Make our best-effort to release resources in the face of unsubscribe. + if (isUnsubscribed()) { + innerSubscription.unsubscribe(); + outerSubscription.unsubscribe(); + } else { + observer.onNext(item); + } + } + @Override + public void onError(Exception e) { + outerSubscription.unsubscribe(); + innerError.set(true); + observer.onError(e); + latch.countDown(); + } + @Override + public void onCompleted() { + // Continue on to the next sequence + latch.countDown(); + } + })); + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Exceptions.propagate(e); + } + } + @Override + public void onError(Exception e) { + // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. + observer.onError(e); + } + @Override + public void onCompleted() { + // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. + if (!innerError.get()) { + observer.onCompleted(); + } + } + })); + } } - public static Func1, Subscription> concat(final Observable> sequences) { - final List> list = new ArrayList>(); - sequences.toList().subscribe(new Action1>>() { - @Override - public void call(List> t1) { - list.addAll(t1); - } + public static class UnitTest { + + @Test + public void testConcat() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); - }); + final String[] o = { "1", "3", "5", "7" }; + final String[] e = { "2", "4", "6" }; - return concat(list); - } + final Observable odds = Observable.toObservable(o); + final Observable even = Observable.toObservable(e); - private static class Concat implements Func1, Subscription> { - private final Observable[] sequences; - private int num = 0; - private int count = 0; - private Subscription s; + @SuppressWarnings("unchecked") + Observable concat = Observable.create(concat(odds, even)); + concat.subscribe(observer); - Concat(final Observable... sequences) { - this.sequences = sequences; - this.num = sequences.length - 1; + verify(observer, times(7)).onNext(anyString()); } - private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription(); - - private final Subscription actualSubscription = new Subscription() { + @Test + public void testConcatWithList() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); - @Override - public void unsubscribe() { - if (null != s) - s.unsubscribe(); - } - }; + final String[] o = { "1", "3", "5", "7" }; + final String[] e = { "2", "4", "6" }; - public Subscription call(Observer observer) { - s = sequences[count].subscribe(new ConcatObserver(observer)); + final Observable odds = Observable.toObservable(o); + final Observable even = Observable.toObservable(e); + final List> list = new ArrayList>(); + list.add(odds); + list.add(even); + Observable concat = Observable.create(concat(list)); + concat.subscribe(observer); - return Subscription.wrap(actualSubscription); + verify(observer, times(7)).onNext(anyString()); } - private class ConcatObserver implements Observer { - private final Observer observer; + @Test + public void testConcatObservableOfObservables() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); - ConcatObserver(Observer observer) { - this.observer = observer; - } + final String[] o = { "1", "3", "5", "7" }; + final String[] e = { "2", "4", "6" }; - @Override - public void onCompleted() { - if (num == count) - observer.onCompleted(); - else { - count++; - s = sequences[count].subscribe(this); - } - } + final Observable odds = Observable.toObservable(o); + final Observable even = Observable.toObservable(e); - @Override - public void onError(Exception e) { - observer.onError(e); + Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { - } + @Override + public Subscription call(Observer> observer) { + // simulate what would happen in an observable + observer.onNext(odds); + observer.onNext(even); + observer.onCompleted(); - @Override - public void onNext(T args) { - observer.onNext(args); + return new Subscription() { - } - } - } + @Override + public void unsubscribe() { + // unregister ... will never be called here since we are executing synchronously + } - public static class UnitTest { - private final static String[] expected = { "1", "3", "5", "7", "2", "4", "6" }; - private int index = 0; + }; + } - Observer observer = new Observer() { + }); + Observable concat = Observable.create(concat(observableOfObservables)); + + concat.subscribe(observer); + + verify(observer, times(7)).onNext(anyString()); + } - @Override - public void onCompleted() { - } + /** + * Simple concat of 2 asynchronous observables ensuring it emits in correct order. + */ + @SuppressWarnings("unchecked") + @Test + public void testSimpleAsyncConcat() { + Observer observer = mock(Observer.class); - @Override - public void onError(Exception e) { - // TODO Auto-generated method stub - } + TestObservable o1 = new TestObservable("one", "two", "three"); + TestObservable o2 = new TestObservable("four", "five", "six"); - @Override - public void onNext(String args) { - Assert.assertEquals(expected[index], args); - index++; + Observable.concat(o1, o2).subscribe(observer); + + try { + // wait for async observables to complete + o1.t.join(); + o2.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads"); } - }; - @Before - public void before() { - index = 0; + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onNext("five"); + inOrder.verify(observer, times(1)).onNext("six"); } + /** + * Test an async Observable that emits more async Observables + */ + @SuppressWarnings("unchecked") @Test - public void testConcat() { - final String[] o = { "1", "3", "5", "7" }; - final String[] e = { "2", "4", "6" }; + public void testNestedAsyncConcat() throws Exception { + Observer observer = mock(Observer.class); - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); + final TestObservable o1 = new TestObservable("one", "two", "three"); + final TestObservable o2 = new TestObservable("four", "five", "six"); + final TestObservable o3 = new TestObservable("seven", "eight", "nine"); + final CountDownLatch allowThird = new CountDownLatch(1); - @SuppressWarnings("unchecked") - Observable concat = Observable.create(concat(odds, even)); - concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + final AtomicReference parent = new AtomicReference(); + Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { + + @Override + public Subscription call(final Observer> observer) { + final BooleanSubscription s = new BooleanSubscription(); + parent.set(new Thread(new Runnable() { + + @Override + public void run() { + try { + // emit first + if (!s.isUnsubscribed()) { + System.out.println("Emit o1"); + observer.onNext(o1); + } + // emit second + if (!s.isUnsubscribed()) { + System.out.println("Emit o2"); + observer.onNext(o2); + } + + // wait until sometime later and emit third + try { + allowThird.await(); + } catch (InterruptedException e) { + observer.onError(e); + } + if (!s.isUnsubscribed()) { + System.out.println("Emit o3"); + observer.onNext(o3); + } + + } catch (Exception e) { + observer.onError(e); + } finally { + System.out.println("Done parent Observable"); + observer.onCompleted(); + } + } + })); + parent.get().start(); + return s; + } + }); + + Observable.create(concat(observableOfObservables)).subscribe(observer); + + // wait for parent to start + while (parent.get() == null) { + Thread.sleep(1); + } + + try { + // wait for first 2 async observables to complete + while (o1.t == null) { + Thread.sleep(1); + } + System.out.println("Thread1 started ... waiting for it to complete ..."); + o1.t.join(); + while (o2.t == null) { + Thread.sleep(1); + } + System.out.println("Thread2 started ... waiting for it to complete ..."); + o2.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads", e); + } + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onNext("five"); + inOrder.verify(observer, times(1)).onNext("six"); + // we shouldn't have the following 3 yet + inOrder.verify(observer, never()).onNext("seven"); + inOrder.verify(observer, never()).onNext("eight"); + inOrder.verify(observer, never()).onNext("nine"); + // we should not be completed yet + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + // now allow the third + allowThird.countDown(); + + try { + while (o3.t == null) { + Thread.sleep(1); + } + // wait for 3rd to complete + o3.t.join(); + } catch (Exception e) { + throw new RuntimeException("failed waiting on threads", e); + } + inOrder.verify(observer, times(1)).onNext("seven"); + inOrder.verify(observer, times(1)).onNext("eight"); + inOrder.verify(observer, times(1)).onNext("nine"); + + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); } + @SuppressWarnings("unchecked") @Test - public void testConcatWithList() { + public void testBlockedObservableOfObservables() { + Observer observer = mock(Observer.class); + final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; - final Observable odds = Observable.toObservable(o); final Observable even = Observable.toObservable(e); - final List> list = new ArrayList>(); - list.add(odds); - list.add(even); - Observable concat = Observable.create(concat(list)); + final CountDownLatch callOnce = new CountDownLatch(1); + final CountDownLatch okToContinue = new CountDownLatch(1); + TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even); + Func1, Subscription> concatF = concat(observableOfObservables); + Observable concat = Observable.create(concatF); concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + try { + //Block main thread to allow observables to serve up o1. + callOnce.await(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + // The concated observable should have served up all of the odds. + verify(observer, times(1)).onNext("1"); + verify(observer, times(1)).onNext("3"); + verify(observer, times(1)).onNext("5"); + verify(observer, times(1)).onNext("7"); + try { + // unblock observables so it can serve up o2 and complete + okToContinue.countDown(); + observableOfObservables.t.join(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + // The concatenated observable should now have served up all the evens. + verify(observer, times(1)).onNext("2"); + verify(observer, times(1)).onNext("4"); + verify(observer, times(1)).onNext("6"); } + + @Test + public void testConcatConcurrentWithInfinity() { + final TestObservable w1 = new TestObservable("one", "two", "three"); + //This observable will send "hello" MAX_VALUE time. + final TestObservable w2 = new TestObservable("hello", Integer.MAX_VALUE); + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + @SuppressWarnings("unchecked") + TestObservable> observableOfObservables = new TestObservable>(w1, w2); + Func1, Subscription> concatF = concat(observableOfObservables); + + Observable concat = Observable.create(concatF); + + concat.take(50).subscribe(aObserver); + + //Wait for the thread to start up. + try { + Thread.sleep(25); + w1.t.join(); + w2.t.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(47)).onNext("hello"); + verify(aObserver, times(1)).onCompleted(); + verify(aObserver, never()).onError(any(Exception.class)); + + } + + + /** + * The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete. + */ @Test public void testConcatUnsubscribe() { final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); - final TestObservable w1 = new TestObservable(null, null, "one", "two", "three"); - final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); + final TestObservable w1 = new TestObservable("one", "two", "three"); + final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); + final Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") - Observable concat = Observable.create(concat(w1, w2)); - Subscription s1 = concat.subscribe(aObserver); - + final Observable concat = Observable.create(concat(w1, w2)); + final AtomicObservableSubscription s1 = new AtomicObservableSubscription(); + Thread t = new Thread() { + @Override + public void run() { + // NB: this statement does not complete until after "six" has been delivered. + s1.wrap(concat.subscribe(aObserver)); + } + }; + t.start(); try { //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once. callOnce.await(); + // NB: This statement has no effect, since s1 cannot possibly + // wrap anything until "six" has been delivered, which cannot + // happen until we okToContinue.countDown() s1.unsubscribe(); //Unblock the observable to continue. okToContinue.countDown(); @@ -215,96 +483,133 @@ public void testConcatUnsubscribe() { fail(e.getMessage()); } - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, times(1)).onNext("three"); - verify(aObserver, times(1)).onNext("four"); - verify(aObserver, never()).onNext("five"); - verify(aObserver, never()).onNext("six"); - } + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(1)).onNext("four"); + // NB: you might hope that five and six are not delivered, but see above. + inOrder.verify(aObserver, times(1)).onNext("five"); + inOrder.verify(aObserver, times(1)).onNext("six"); + inOrder.verify(aObserver, times(1)).onCompleted(); + } + + /** + * All observables will be running in different threads so subscribe() is unblocked. CountDownLatch is only used in order to call unsubscribe() in a predictable manner. + */ @Test - public void testMergeObservableOfObservables() { - final String[] o = { "1", "3", "5", "7" }; - final String[] e = { "2", "4", "6" }; - - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); - - Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { - - @Override - public Subscription call(Observer> observer) { - // simulate what would happen in an observable - observer.onNext(odds); - observer.onNext(even); - observer.onCompleted(); - - return new Subscription() { - - @Override - public void unsubscribe() { - // unregister ... will never be called here since we are executing synchronously - } + public void testConcatUnsubscribeConcurrent() { + final CountDownLatch callOnce = new CountDownLatch(1); + final CountDownLatch okToContinue = new CountDownLatch(1); + final TestObservable w1 = new TestObservable("one", "two", "three"); + final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); - }; - } + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + @SuppressWarnings("unchecked") + TestObservable> observableOfObservables = new TestObservable>(w1, w2); + Func1, Subscription> concatF = concat(observableOfObservables); + + Observable concat = Observable.create(concatF); + + Subscription s1 = concat.subscribe(aObserver); + + try { + //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext exactly once. + callOnce.await(); + //"four" from w2 has been processed by onNext() + s1.unsubscribe(); + //"five" and "six" will NOT be processed by onNext() + //Unblock the observable to continue. + okToContinue.countDown(); + w1.t.join(); + w2.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } - }); - Observable concat = Observable.create(concat(observableOfObservables)); - concat.subscribe(observer); - Assert.assertEquals(expected.length, index); + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(1)).onNext("four"); + inOrder.verify(aObserver, never()).onNext("five"); + inOrder.verify(aObserver, never()).onNext("six"); + verify(aObserver, never()).onCompleted(); + verify(aObserver, never()).onError(any(Exception.class)); } - - private static class TestObservable extends Observable { + + private static class TestObservable extends Observable { private final Subscription s = new Subscription() { - @Override - public void unsubscribe() { - subscribed = false; - } + @Override + public void unsubscribe() { + subscribed = false; + } - }; - private final String[] values; + }; + private final List values; private Thread t = null; private int count = 0; private boolean subscribed = true; private final CountDownLatch once; private final CountDownLatch okToContinue; + private final T seed; + private final int size; + + public TestObservable(T... values) { + this(null, null, values); + } - public TestObservable(CountDownLatch once, CountDownLatch okToContinue, String... values) { - this.values = values; + public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) { + this.values = Arrays.asList(values); + this.size = this.values.size(); this.once = once; this.okToContinue = okToContinue; + this.seed = null; } + public TestObservable(T seed, int size) { + values = null; + once = null; + okToContinue = null; + this.seed = seed; + this.size = size; + } + + @Override - public Subscription subscribe(final Observer observer) { + public Subscription subscribe(final Observer observer) { t = new Thread(new Runnable() { - @Override - public void run() { - try { - while (count < values.length && subscribed) { - observer.onNext(values[count]); - count++; - //Unblock the main thread to call unsubscribe. - if (null != once) - once.countDown(); - //Block until the main thread has called unsubscribe. - if (null != once) - okToContinue.await(); + @Override + public void run() { + try { + while (count < size && subscribed) { + if (null != values) + observer.onNext(values.get(count)); + else + observer.onNext(seed); + count++; + //Unblock the main thread to call unsubscribe. + if (null != once) + once.countDown(); + //Block until the main thread has called unsubscribe. + if (null != okToContinue) + okToContinue.await(); + } + if (subscribed) + observer.onCompleted(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(e.getMessage()); } - if (subscribed) - observer.onCompleted(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(e.getMessage()); } - } - }); + }); t.start(); return s; }