Skip to content

Commit

Permalink
Fix calling dispose() in terminal event
Browse files Browse the repository at this point in the history
Matches ReactiveX/RxJava#4957, which we were doing wrong. Note the lifecycle disposable is not lazily set because that's internally managed and non-terminating.
  • Loading branch information
ZacSweers committed Feb 16, 2017
1 parent f329e12 commit 53049b2
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,36 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
@Override public final void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
AutoDisposableHelper.dispose(mainDisposable);
}
}

// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

private void callMainSubscribeIfNecessary() {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
AutoDisposableHelper.dispose(mainDisposable);
}
}

@Override public final void onComplete() {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onComplete.run();
} catch (Exception e) {
Expand All @@ -106,7 +117,7 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet

@Override public final void onError(Throwable e) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onError.accept(e);
} catch (Exception e1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,36 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
@Override public final void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
AutoDisposableHelper.dispose(mainDisposable);
}
}

// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

private void callMainSubscribeIfNecessary() {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
AutoDisposableHelper.dispose(mainDisposable);
}
}

@Override public final void onSuccess(T value) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onSuccess.accept(value);
} catch (Exception e) {
Expand All @@ -109,7 +120,7 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser

@Override public final void onError(Throwable e) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onError.accept(e);
} catch (Exception e1) {
Expand All @@ -121,7 +132,7 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser

@Override public final void onComplete() {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onComplete.run();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,30 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
@Override public final void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
AutoDisposableHelper.dispose(mainDisposable);
}
}

// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

private void callMainSubscribeIfNecessary() {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
AutoDisposableHelper.dispose(mainDisposable);
}
}

Expand All @@ -108,7 +119,7 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {

@Override public final void onError(Throwable e) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onError.accept(e);
} catch (Exception e1) {
Expand All @@ -120,7 +131,7 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {

@Override public final void onComplete() {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onComplete.run();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,36 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs
@Override public final void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
AutoDisposableHelper.dispose(mainDisposable);
}
}

// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

private void callMainSubscribeIfNecessary() {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (mainDisposable.get() == null) {
try {
onSubscribe.accept(Disposables.disposed());
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
AutoDisposableHelper.dispose(mainDisposable);
}
}

@Override public final void onSuccess(T value) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onSuccess.accept(value);
} catch (Exception e) {
Expand All @@ -105,7 +116,7 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs

@Override public final void onError(Throwable e) {
if (!isDisposed()) {
dispose();
lazyDispose();
try {
onError.accept(e);
} catch (Exception e1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,30 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>
@Override public final void cancel() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
AutoSubscriptionHelper.cancel(mainSubscription);
}
}

// If we've never actually started the upstream subscription (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty subscription instance
// to abide by the Subscriber contract.
if (mainSubscription.get() == null) {
try {
onSubscribe.accept(EmptySubscription.INSTANCE);
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
private void lazyCancel() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
callMainSubscribeIfNecessary();
mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED);
}
}

private void callMainSubscribeIfNecessary() {
// If we've never actually started the upstream subscription (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty subscription instance
// to abide by the Subscriber contract.
if (mainSubscription.get() == null) {
try {
onSubscribe.accept(EmptySubscription.INSTANCE);
} catch (Exception e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
AutoSubscriptionHelper.cancel(mainSubscription);
}
}

Expand All @@ -132,7 +143,7 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>

@Override public void onError(Throwable e) {
if (!isDisposed()) {
cancel();
lazyCancel();
try {
onError.accept(e);
} catch (Exception e1) {
Expand All @@ -144,7 +155,7 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>

@Override public final void onComplete() {
if (!isDisposed()) {
cancel();
lazyCancel();
try {
onComplete.run();
} catch (Exception e) {
Expand Down

0 comments on commit 53049b2

Please sign in to comment.