diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java index b3cb5ff531..8e66db0bdc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.observable; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; @@ -59,9 +59,8 @@ static final class SourceObserver extends AtomicInteger implements Observe private static final long serialVersionUID = 8828587559905699186L; final Observer actual; - final SequentialDisposable sa; final Function> mapper; - final Observer inner; + final InnerObserver inner; final int bufferSize; SimpleQueue queue; @@ -82,7 +81,6 @@ static final class SourceObserver extends AtomicInteger implements Observe this.mapper = mapper; this.bufferSize = bufferSize; this.inner = new InnerObserver(actual, this); - this.sa = new SequentialDisposable(); } @Override public void onSubscribe(Disposable s) { @@ -161,7 +159,7 @@ public boolean isDisposed() { @Override public void dispose() { disposed = true; - sa.dispose(); + inner.dispose(); s.dispose(); if (getAndIncrement() == 0) { @@ -169,10 +167,6 @@ public void dispose() { } } - void innerSubscribe(Disposable s) { - sa.update(s); - } - void drain() { if (getAndIncrement() != 0) { return; @@ -231,7 +225,10 @@ void drain() { } } - static final class InnerObserver implements Observer { + static final class InnerObserver extends AtomicReference implements Observer { + + private static final long serialVersionUID = -7449079488798789337L; + final Observer actual; final SourceObserver parent; @@ -242,7 +239,7 @@ static final class InnerObserver implements Observer { @Override public void onSubscribe(Disposable s) { - parent.innerSubscribe(s); + DisposableHelper.set(this, s); } @Override @@ -258,6 +255,10 @@ public void onError(Throwable t) { public void onComplete() { parent.innerComplete(); } + + void dispose() { + DisposableHelper.dispose(this); + } } } @@ -278,8 +279,6 @@ static final class ConcatMapDelayErrorObserver final DelayErrorInnerObserver observer; - final SequentialDisposable arbiter; - final boolean tillTheEnd; SimpleQueue queue; @@ -303,7 +302,6 @@ static final class ConcatMapDelayErrorObserver this.tillTheEnd = tillTheEnd; this.error = new AtomicThrowable(); this.observer = new DelayErrorInnerObserver(actual, this); - this.arbiter = new SequentialDisposable(); } @Override @@ -375,7 +373,7 @@ public boolean isDisposed() { public void dispose() { cancelled = true; d.dispose(); - arbiter.dispose(); + observer.dispose(); } @SuppressWarnings("unchecked") @@ -479,7 +477,9 @@ void drain() { } } - static final class DelayErrorInnerObserver implements Observer { + static final class DelayErrorInnerObserver extends AtomicReference implements Observer { + + private static final long serialVersionUID = 2620149119579502636L; final Observer actual; @@ -492,7 +492,7 @@ static final class DelayErrorInnerObserver implements Observer { @Override public void onSubscribe(Disposable d) { - parent.arbiter.replace(d); + DisposableHelper.replace(this, d); } @Override @@ -520,6 +520,10 @@ public void onComplete() { p.active = false; p.drain(); } + + void dispose() { + DisposableHelper.dispose(this); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java index 909ba644f3..b53de7cde7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java @@ -12,20 +12,18 @@ */ package io.reactivex.internal.operators.observable; +import java.util.concurrent.atomic.*; + import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.SequentialDisposable; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.QueueDisposable; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.plugins.RxJavaPlugins; -import java.util.concurrent.atomic.AtomicInteger; - public final class ObservableConcatMapCompletable extends Completable { final ObservableSource source; @@ -47,9 +45,8 @@ static final class SourceObserver extends AtomicInteger implements Observer mapper; - final CompletableObserver inner; + final InnerObserver inner; final int bufferSize; SimpleQueue queue; @@ -70,7 +67,6 @@ static final class SourceObserver extends AtomicInteger implements Observer implements CompletableObserver { + private static final long serialVersionUID = -5987419458390772447L; final CompletableObserver actual; final SourceObserver parent; @@ -230,7 +223,7 @@ static final class InnerObserver implements CompletableObserver { @Override public void onSubscribe(Disposable s) { - parent.innerSubscribe(s); + DisposableHelper.set(this, s); } @Override @@ -242,6 +235,10 @@ public void onError(Throwable t) { public void onComplete() { parent.innerComplete(); } + + void dispose() { + DisposableHelper.dispose(this); + } } } }