Skip to content

Commit

Permalink
Merge pull request #1893 from akarnokd/MergeDelayErrorFix
Browse files Browse the repository at this point in the history
Fixed incorrect error merging.
  • Loading branch information
benjchristensen committed Nov 21, 2014
2 parents 4439cc1 + 752c2a7 commit 053e506
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 64 deletions.
19 changes: 13 additions & 6 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ private boolean drainQueuesIfNeeded() {
} finally {
boolean moreToDrain = releaseEmitLock();
// request outside of lock
request(emitted);
if (emitted > 0) {
request(emitted);
}
if (!moreToDrain) {
return true;
}
Expand Down Expand Up @@ -397,11 +399,13 @@ public Boolean call(InnerSubscriber<T> s) {

@Override
public void onError(Throwable e) {
completed = true;
innerError(e);
if (!completed) {
completed = true;
innerError(e, true);
}
}

private void innerError(Throwable e) {
private void innerError(Throwable e, boolean parent) {
if (delayErrors) {
synchronized (this) {
if (exceptions == null) {
Expand All @@ -411,7 +415,9 @@ private void innerError(Throwable e) {
exceptions.add(e);
boolean sendOnComplete = false;
synchronized (this) {
wip--;
if (!parent) {
wip--;
}
if ((wip == 0 && completed) || (wip < 0)) {
sendOnComplete = true;
}
Expand Down Expand Up @@ -520,6 +526,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
final MergeSubscriber<T> parentSubscriber;
final MergeProducer<T> producer;
/** Make sure the inner termination events are delivered only once. */
@SuppressWarnings("unused")
volatile int terminated;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
Expand All @@ -545,7 +552,7 @@ public void onNext(T t) {
public void onError(Throwable e) {
// it doesn't go through queues, it immediately onErrors and tears everything down
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
parentSubscriber.innerError(e);
parentSubscriber.innerError(e, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,26 +496,32 @@ public void testErrorInParentObservable() {

@Test
public void testErrorInParentObservableDelayed() throws Exception {
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
@Override
public void call(Subscriber<? super Observable<String>> op) {
op.onNext(Observable.create(o1));
op.onNext(Observable.create(o2));
op.onError(new NullPointerException("throwing exception in parent"));
}
});

TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
Observable<String> m = Observable.mergeDelayError(parentObservable);
m.subscribe(ts);
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
ts.assertTerminalEvent();

verify(stringObserver, times(2)).onNext("hello");
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
verify(stringObserver, never()).onCompleted();
for (int i = 0; i < 50; i++) {
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
@Override
public void call(Subscriber<? super Observable<String>> op) {
op.onNext(Observable.create(o1));
op.onNext(Observable.create(o2));
op.onError(new NullPointerException("throwing exception in parent"));
}
});

@SuppressWarnings("unchecked")
Observer<String> stringObserver = mock(Observer.class);

TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
Observable<String> m = Observable.mergeDelayError(parentObservable);
m.subscribe(ts);
System.out.println("testErrorInParentObservableDelayed | " + i);
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
ts.assertTerminalEvent();

verify(stringObserver, times(2)).onNext("hello");
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
verify(stringObserver, never()).onCompleted();
}
}

private static class TestASynchronous1sDelayedObservable implements Observable.OnSubscribe<String> {
Expand Down
79 changes: 41 additions & 38 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,47 +609,50 @@ public void testAsyncChild() {

@Test
public void testOnErrorCutsAheadOfOnNext() {
final PublishSubject<Long> subject = PublishSubject.create();

final AtomicLong counter = new AtomicLong();
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long t) {
// simulate slow consumer to force backpressure failure
try {
Thread.sleep(1);
} catch (InterruptedException e) {
for (int i = 0; i < 50; i++) {
final PublishSubject<Long> subject = PublishSubject.create();

final AtomicLong counter = new AtomicLong();
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long t) {
// simulate slow consumer to force backpressure failure
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}

});
subject.observeOn(Schedulers.computation()).subscribe(ts);

// this will blow up with backpressure
while (counter.get() < 102400) {
subject.onNext(counter.get());
counter.incrementAndGet();
}

});
subject.observeOn(Schedulers.computation()).subscribe(ts);

// this will blow up with backpressure
while (counter.get() < 102400) {
subject.onNext(counter.get());
counter.incrementAndGet();

ts.awaitTerminalEvent();
assertEquals(1, ts.getOnErrorEvents().size());
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
// example [0, 1, 2] not [0, 1, 4]
List<Long> onNextEvents = ts.getOnNextEvents();
assertTrue(onNextEvents.isEmpty() || onNextEvents.size() == onNextEvents.get(onNextEvents.size() - 1) + 1);
// we should emit the error without emitting the full buffer size
assertTrue(onNextEvents.size() < RxRingBuffer.SIZE);
}

ts.awaitTerminalEvent();
assertEquals(1, ts.getOnErrorEvents().size());
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
// example [0, 1, 2] not [0, 1, 4]
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
// we should emit the error without emitting the full buffer size
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
}

/**
Expand Down

0 comments on commit 053e506

Please sign in to comment.