Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize Observer on OperationMerge #201

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 95 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -33,6 +35,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Func1;

public final class OperationMerge {
Expand Down Expand Up @@ -114,14 +118,25 @@ private MergeObservable(Observable<Observable<T>> sequences) {
this.sequences = sequences;
}

public MergeSubscription call(Observer<T> actualObserver) {
public Subscription call(Observer<T> actualObserver) {

/**
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
* <p>
* The calls from each sequence must be serialized.
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/200
*/
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);

/**
* Subscribe to the parent Observable to get to the children Observables
*/
sequences.subscribe(new ParentObserver(actualObserver));
sequences.subscribe(new ParentObserver(synchronizedObserver));

/* return our subscription to allow unsubscribing */
return ourSubscription;
return subscription;
}

/**
Expand Down Expand Up @@ -380,6 +395,79 @@ public void testMergeArrayWithThreading() {
verify(stringObserver, times(1)).onCompleted();
}

@Test
public void testSynchronizationOfMultipleSequences() throws Exception {
final TestASynchronousObservable o1 = new TestASynchronousObservable();
final TestASynchronousObservable o2 = new TestASynchronousObservable();

// use this latch to cause onNext to wait until we're ready to let it go
final CountDownLatch endLatch = new CountDownLatch(1);

final AtomicInteger concurrentCounter = new AtomicInteger();
final AtomicInteger totalCounter = new AtomicInteger();

@SuppressWarnings("unchecked")
Observable<String> m = Observable.create(merge(o1, o2));
m.subscribe(new Observer<String>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Exception e) {
throw new RuntimeException("failed", e);
}

@Override
public void onNext(String v) {
totalCounter.incrementAndGet();
concurrentCounter.incrementAndGet();
try {
// wait here until we're done asserting
endLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("failed", e);
} finally {
concurrentCounter.decrementAndGet();
}
}

});

// wait for both observables to send (one should be blocked)
o1.onNextBeingSent.await();
o2.onNextBeingSent.await();

// I can't think of a way to know for sure that both threads have or are trying to send onNext
// since I can't use a CountDownLatch for "after" onNext since I want to catch during it
// but I can't know for sure onNext is invoked
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
// onNext is invoked.

Thread.sleep(300);

try { // in try/finally so threads are released via latch countDown even if assertion fails
assertEquals(1, concurrentCounter.get());
} finally {
// release so it can finish
endLatch.countDown();
}

try {
o1.t.join();
o2.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertEquals(2, totalCounter.get());
assertEquals(0, concurrentCounter.get());
}

/**
* unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge
*/
Expand Down Expand Up @@ -452,14 +540,18 @@ public void unsubscribe() {

private static class TestASynchronousObservable extends Observable<String> {
Thread t;
final CountDownLatch onNextBeingSent = new CountDownLatch(1);

@Override
public Subscription subscribe(final Observer<String> observer) {
t = new Thread(new Runnable() {

@Override
public void run() {
onNextBeingSent.countDown();
observer.onNext("hello");
// I can't use a countDownLatch to prove we are actually sending 'onNext'
// since it will block if synchronized and I'll deadlock
observer.onCompleted();
}

Expand Down