Skip to content

Commit

Permalink
2.x: fix doOnSubscribe signalling Undeliv.Exception instead of onError (
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Feb 16, 2017
1 parent 61af0c0 commit d9e2df9
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public void onSubscribe(Disposable s) {
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
RxJavaPlugins.onError(e);

this.s = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, actual);
return;
}
Expand All @@ -61,12 +60,18 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
actual.onError(t);
if (s != DisposableHelper.DISPOSED) {
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
actual.onComplete();
if (s != DisposableHelper.DISPOSED) {
actual.onComplete();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package io.reactivex.internal.operators.completable;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletablePeek extends Completable {
Expand Down Expand Up @@ -48,77 +48,99 @@ public CompletablePeek(CompletableSource source, Consumer<? super Disposable> on
@Override
protected void subscribeActual(final CompletableObserver s) {

source.subscribe(new CompletableObserver() {
source.subscribe(new CompletableObserverImplementation(s));
}

@Override
public void onComplete() {
try {
onComplete.run();
onTerminate.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.onError(e);
return;
}
final class CompletableObserverImplementation implements CompletableObserver, Disposable {

s.onComplete();
final CompletableObserver actual;

doAfter();
}
Disposable d;

@Override
public void onError(Throwable e) {
try {
onError.accept(e);
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}
private CompletableObserverImplementation(CompletableObserver actual) {
this.actual = actual;
}

s.onError(e);

doAfter();
@Override
public void onSubscribe(final Disposable d) {
try {
onSubscribe.accept(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
this.d = DisposableHelper.DISPOSED;
EmptyDisposable.error(ex, actual);
return;
}
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}

@Override
public void onSubscribe(final Disposable d) {

try {
onSubscribe.accept(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
EmptyDisposable.error(ex, s);
return;
}

s.onSubscribe(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
d.dispose();
}
}));
@Override
public void onError(Throwable e) {
if (d == DisposableHelper.DISPOSED) {
RxJavaPlugins.onError(e);
return;
}
try {
onError.accept(e);
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}

void doAfter() {
actual.onError(e);

try {
onAfterTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
doAfter();
}

@Override
public void onComplete() {
if (d == DisposableHelper.DISPOSED) {
return;
}
});
}

try {
onComplete.run();
onTerminate.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}

actual.onComplete();

doAfter();
}

void doAfter() {
try {
onAfterTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}

@Override
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void onSubscribe(Subscription s) {
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
RxJavaPlugins.onError(e);
this.s = SubscriptionHelper.CANCELLED;
EmptySubscription.error(e, actual);
return;
}
Expand All @@ -81,12 +81,18 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
actual.onError(t);
if (s != SubscriptionHelper.CANCELLED) {
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
actual.onComplete();
if (s != SubscriptionHelper.CANCELLED) {
actual.onComplete();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.junit.*;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;

public class CompletableDoOnTest {

Expand Down Expand Up @@ -74,4 +76,35 @@ public void run() throws Exception {

assertTrue(atomicBoolean.get());
}

@Test
public void onSubscribeCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Disposable bs = Disposables.empty();

new Completable() {
@Override
protected void subscribeActual(CompletableObserver s) {
s.onSubscribe(bs);
s.onError(new TestException("Second"));
s.onComplete();
}
}
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable s) throws Exception {
throw new TestException("First");
}
})
.test()
.assertFailureAndMessage(TestException.class, "First");

assertTrue(bs.isDisposed());

TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@

package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;

import java.util.List;

import static org.junit.Assert.*;
import org.junit.Test;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;

public class FlowableDoOnLifecycleTest {
Expand Down Expand Up @@ -132,4 +134,35 @@ public void run() throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void onSubscribeCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final BooleanSubscription bs = new BooleanSubscription();

new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(bs);
s.onError(new TestException("Second"));
s.onComplete();
}
}
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
throw new TestException("First");
}
})
.test()
.assertFailureAndMessage(TestException.class, "First");

assertTrue(bs.isCancelled());

TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@

package io.reactivex.internal.operators.maybe;

import static org.junit.Assert.assertTrue;

import java.util.List;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;

public class MaybeDoOnEventTest {
Expand Down Expand Up @@ -45,4 +52,36 @@ public void accept(Integer v, Throwable e) throws Exception {
}
});
}

@Test
public void onSubscribeCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Disposable bs = Disposables.empty();

new Maybe<Integer>() {
@Override
protected void subscribeActual(MaybeObserver<? super Integer> s) {
s.onSubscribe(bs);
s.onError(new TestException("Second"));
s.onComplete();
s.onSuccess(1);
}
}
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable s) throws Exception {
throw new TestException("First");
}
})
.test()
.assertFailureAndMessage(TestException.class, "First");

assertTrue(bs.isDisposed());

TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}
}
Loading

0 comments on commit d9e2df9

Please sign in to comment.