Skip to content

Commit

Permalink
2.x: inline disposability in Obs.concatMap(Completable) (#5652)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 9, 2017
1 parent 3abd86a commit 1b0cd2a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package io.reactivex.internal.operators.observable;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
Expand Down Expand Up @@ -59,9 +59,8 @@ static final class SourceObserver<T, U> extends AtomicInteger implements Observe

private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> actual;
final SequentialDisposable sa;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final Observer<U> inner;
final InnerObserver<U> inner;
final int bufferSize;

SimpleQueue<T> queue;
Expand All @@ -82,7 +81,6 @@ static final class SourceObserver<T, U> extends AtomicInteger implements Observe
this.mapper = mapper;
this.bufferSize = bufferSize;
this.inner = new InnerObserver<U>(actual, this);
this.sa = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable s) {
Expand Down Expand Up @@ -161,18 +159,14 @@ public boolean isDisposed() {
@Override
public void dispose() {
disposed = true;
sa.dispose();
inner.dispose();
s.dispose();

if (getAndIncrement() == 0) {
queue.clear();
}
}

void innerSubscribe(Disposable s) {
sa.update(s);
}

void drain() {
if (getAndIncrement() != 0) {
return;
Expand Down Expand Up @@ -231,7 +225,10 @@ void drain() {
}
}

static final class InnerObserver<U> implements Observer<U> {
static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {

private static final long serialVersionUID = -7449079488798789337L;

final Observer<? super U> actual;
final SourceObserver<?, ?> parent;

Expand All @@ -242,7 +239,7 @@ static final class InnerObserver<U> implements Observer<U> {

@Override
public void onSubscribe(Disposable s) {
parent.innerSubscribe(s);
DisposableHelper.set(this, s);
}

@Override
Expand All @@ -258,6 +255,10 @@ public void onError(Throwable t) {
public void onComplete() {
parent.innerComplete();
}

void dispose() {
DisposableHelper.dispose(this);
}
}
}

Expand All @@ -278,8 +279,6 @@ static final class ConcatMapDelayErrorObserver<T, R>

final DelayErrorInnerObserver<R> observer;

final SequentialDisposable arbiter;

final boolean tillTheEnd;

SimpleQueue<T> queue;
Expand All @@ -303,7 +302,6 @@ static final class ConcatMapDelayErrorObserver<T, R>
this.tillTheEnd = tillTheEnd;
this.error = new AtomicThrowable();
this.observer = new DelayErrorInnerObserver<R>(actual, this);
this.arbiter = new SequentialDisposable();
}

@Override
Expand Down Expand Up @@ -375,7 +373,7 @@ public boolean isDisposed() {
public void dispose() {
cancelled = true;
d.dispose();
arbiter.dispose();
observer.dispose();
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -479,7 +477,9 @@ void drain() {
}
}

static final class DelayErrorInnerObserver<R> implements Observer<R> {
static final class DelayErrorInnerObserver<R> extends AtomicReference<Disposable> implements Observer<R> {

private static final long serialVersionUID = 2620149119579502636L;

final Observer<? super R> actual;

Expand All @@ -492,7 +492,7 @@ static final class DelayErrorInnerObserver<R> implements Observer<R> {

@Override
public void onSubscribe(Disposable d) {
parent.arbiter.replace(d);
DisposableHelper.replace(this, d);
}

@Override
Expand Down Expand Up @@ -520,6 +520,10 @@ public void onComplete() {
p.active = false;
p.drain();
}

void dispose() {
DisposableHelper.dispose(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
*/
package io.reactivex.internal.operators.observable;

import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.atomic.AtomicInteger;

public final class ObservableConcatMapCompletable<T> extends Completable {

final ObservableSource<T> source;
Expand All @@ -47,9 +45,8 @@ static final class SourceObserver<T> extends AtomicInteger implements Observer<T

private static final long serialVersionUID = 6893587405571511048L;
final CompletableObserver actual;
final SequentialDisposable sa;
final Function<? super T, ? extends CompletableSource> mapper;
final CompletableObserver inner;
final InnerObserver inner;
final int bufferSize;

SimpleQueue<T> queue;
Expand All @@ -70,7 +67,6 @@ static final class SourceObserver<T> extends AtomicInteger implements Observer<T
this.mapper = mapper;
this.bufferSize = bufferSize;
this.inner = new InnerObserver(actual, this);
this.sa = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable s) {
Expand Down Expand Up @@ -149,18 +145,14 @@ public boolean isDisposed() {
@Override
public void dispose() {
disposed = true;
sa.dispose();
inner.dispose();
s.dispose();

if (getAndIncrement() == 0) {
queue.clear();
}
}

void innerSubscribe(Disposable s) {
sa.update(s);
}

void drain() {
if (getAndIncrement() != 0) {
return;
Expand Down Expand Up @@ -219,7 +211,8 @@ void drain() {
}
}

static final class InnerObserver implements CompletableObserver {
static final class InnerObserver extends AtomicReference<Disposable> implements CompletableObserver {
private static final long serialVersionUID = -5987419458390772447L;
final CompletableObserver actual;
final SourceObserver<?> parent;

Expand All @@ -230,7 +223,7 @@ static final class InnerObserver implements CompletableObserver {

@Override
public void onSubscribe(Disposable s) {
parent.innerSubscribe(s);
DisposableHelper.set(this, s);
}

@Override
Expand All @@ -242,6 +235,10 @@ public void onError(Throwable t) {
public void onComplete() {
parent.innerComplete();
}

void dispose() {
DisposableHelper.dispose(this);
}
}
}
}

0 comments on commit 1b0cd2a

Please sign in to comment.