diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java index 3bdce90b4..32f880c3c 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java @@ -76,25 +76,36 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet @Override public final void dispose() { synchronized (this) { AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + AutoDisposableHelper.dispose(mainDisposable); + } + } - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (mainDisposable.get() == null) { - try { - onSubscribe.accept(Disposables.disposed()); - } catch (Exception e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + private void lazyDispose() { + synchronized (this) { + AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + } + } + + private void callMainSubscribeIfNecessary() { + // If we've never actually called the downstream onSubscribe (i.e. requested immediately in + // onSubscribe and had a terminal event), we need to still send an empty disposable instance + // to abide by the Observer contract. + if (mainDisposable.get() == null) { + try { + onSubscribe.accept(Disposables.disposed()); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); } - AutoDisposableHelper.dispose(mainDisposable); } } @Override public final void onComplete() { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onComplete.run(); } catch (Exception e) { @@ -106,7 +117,7 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet @Override public final void onError(Throwable e) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onError.accept(e); } catch (Exception e1) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java index 08035dd92..878ec223f 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java @@ -79,25 +79,36 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser @Override public final void dispose() { synchronized (this) { AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + AutoDisposableHelper.dispose(mainDisposable); + } + } - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (mainDisposable.get() == null) { - try { - onSubscribe.accept(Disposables.disposed()); - } catch (Exception e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + private void lazyDispose() { + synchronized (this) { + AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + } + } + + private void callMainSubscribeIfNecessary() { + // If we've never actually called the downstream onSubscribe (i.e. requested immediately in + // onSubscribe and had a terminal event), we need to still send an empty disposable instance + // to abide by the Observer contract. + if (mainDisposable.get() == null) { + try { + onSubscribe.accept(Disposables.disposed()); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); } - AutoDisposableHelper.dispose(mainDisposable); } } @Override public final void onSuccess(T value) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onSuccess.accept(value); } catch (Exception e) { @@ -109,7 +120,7 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser @Override public final void onError(Throwable e) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onError.accept(e); } catch (Exception e1) { @@ -121,7 +132,7 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser @Override public final void onComplete() { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onComplete.run(); } catch (Exception e) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java index 349902ea2..acdb3a295 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java @@ -79,19 +79,30 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { @Override public final void dispose() { synchronized (this) { AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + AutoDisposableHelper.dispose(mainDisposable); + } + } - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (mainDisposable.get() == null) { - try { - onSubscribe.accept(Disposables.disposed()); - } catch (Exception e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + private void lazyDispose() { + synchronized (this) { + AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + } + } + + private void callMainSubscribeIfNecessary() { + // If we've never actually called the downstream onSubscribe (i.e. requested immediately in + // onSubscribe and had a terminal event), we need to still send an empty disposable instance + // to abide by the Observer contract. + if (mainDisposable.get() == null) { + try { + onSubscribe.accept(Disposables.disposed()); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); } - AutoDisposableHelper.dispose(mainDisposable); } } @@ -108,7 +119,7 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { @Override public final void onError(Throwable e) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onError.accept(e); } catch (Exception e1) { @@ -120,7 +131,7 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { @Override public final void onComplete() { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onComplete.run(); } catch (Exception e) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java index ee64592cb..c37f51437 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java @@ -75,25 +75,36 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs @Override public final void dispose() { synchronized (this) { AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + AutoDisposableHelper.dispose(mainDisposable); + } + } - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (mainDisposable.get() == null) { - try { - onSubscribe.accept(Disposables.disposed()); - } catch (Exception e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + private void lazyDispose() { + synchronized (this) { + AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + } + } + + private void callMainSubscribeIfNecessary() { + // If we've never actually called the downstream onSubscribe (i.e. requested immediately in + // onSubscribe and had a terminal event), we need to still send an empty disposable instance + // to abide by the Observer contract. + if (mainDisposable.get() == null) { + try { + onSubscribe.accept(Disposables.disposed()); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); } - AutoDisposableHelper.dispose(mainDisposable); } } @Override public final void onSuccess(T value) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onSuccess.accept(value); } catch (Exception e) { @@ -105,7 +116,7 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs @Override public final void onError(Throwable e) { if (!isDisposed()) { - dispose(); + lazyDispose(); try { onError.accept(e); } catch (Exception e1) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java index 9ac9d53cf..8cf2a2d73 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java @@ -95,19 +95,30 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber @Override public final void cancel() { synchronized (this) { AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + AutoSubscriptionHelper.cancel(mainSubscription); + } + } - // If we've never actually started the upstream subscription (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty subscription instance - // to abide by the Subscriber contract. - if (mainSubscription.get() == null) { - try { - onSubscribe.accept(EmptySubscription.INSTANCE); - } catch (Exception e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } + private void lazyCancel() { + synchronized (this) { + AutoDisposableHelper.dispose(lifecycleDisposable); + callMainSubscribeIfNecessary(); + mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); + } + } + + private void callMainSubscribeIfNecessary() { + // If we've never actually started the upstream subscription (i.e. requested immediately in + // onSubscribe and had a terminal event), we need to still send an empty subscription instance + // to abide by the Subscriber contract. + if (mainSubscription.get() == null) { + try { + onSubscribe.accept(EmptySubscription.INSTANCE); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); } - AutoSubscriptionHelper.cancel(mainSubscription); } } @@ -132,7 +143,7 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber @Override public void onError(Throwable e) { if (!isDisposed()) { - cancel(); + lazyCancel(); try { onError.accept(e); } catch (Exception e1) { @@ -144,7 +155,7 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber @Override public final void onComplete() { if (!isDisposed()) { - cancel(); + lazyCancel(); try { onComplete.run(); } catch (Exception e) {