Skip to content

Commit

Permalink
2.x: Optimize ObservableConcatMapCompletable (#5915)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 15, 2018
1 parent 4bc516c commit a5d99b7
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

package io.reactivex.internal.operators.mixed;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -56,7 +57,9 @@ public ObservableConcatMapCompletable(Observable<T> source,

@Override
protected void subscribeActual(CompletableObserver s) {
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
if (!tryScalarSource(source, mapper, s)) {
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
}
}

static final class ConcatMapCompletableObserver<T>
Expand All @@ -77,7 +80,7 @@ static final class ConcatMapCompletableObserver<T>

final int prefetch;

final SimplePlainQueue<T> queue;
SimpleQueue<T> queue;

Disposable upstream;

Expand All @@ -96,20 +99,40 @@ static final class ConcatMapCompletableObserver<T>
this.prefetch = prefetch;
this.errors = new AtomicThrowable();
this.inner = new ConcatMapInnerObserver(this);
this.queue = new SpscLinkedArrayQueue<T>(prefetch);
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(upstream, s)) {
this.upstream = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
queue = qd;
done = true;
downstream.onSubscribe(this);
drain();
return;
}
if (m == QueueDisposable.ASYNC) {
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(prefetch);
downstream.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
queue.offer(t);
if (t != null) {
queue.offer(t);
}
drain();
}

Expand Down Expand Up @@ -187,6 +210,9 @@ void drain() {
return;
}

AtomicThrowable errors = this.errors;
ErrorMode errorMode = this.errorMode;

do {
if (disposed) {
queue.clear();
Expand All @@ -206,8 +232,24 @@ void drain() {
}

boolean d = done;
T v = queue.poll();
boolean empty = v == null;
boolean empty = true;
CompletableSource cs = null;
try {
T v = queue.poll();
if (v != null) {
cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource");
empty = false;
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
queue.clear();
upstream.dispose();
errors.addThrowable(ex);
ex = errors.terminate();
downstream.onError(ex);
return;
}

if (d && empty) {
disposed = true;
Expand All @@ -221,21 +263,6 @@ void drain() {
}

if (!empty) {

CompletableSource cs;

try {
cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
queue.clear();
upstream.dispose();
errors.addThrowable(ex);
ex = errors.terminate();
downstream.onError(ex);
return;
}
active = true;
cs.subscribe(inner);
}
Expand Down Expand Up @@ -274,4 +301,30 @@ void dispose() {
}
}
}

static <T> boolean tryScalarSource(Observable<T> source, Function<? super T, ? extends CompletableSource> mapper, CompletableObserver observer) {
if (source instanceof Callable) {
@SuppressWarnings("unchecked")
Callable<T> call = (Callable<T>) source;
CompletableSource cs = null;
try {
T item = call.call();
if (item != null) {
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return true;
}

if (cs == null) {
EmptyDisposable.complete(observer);
} else {
cs.subscribe(observer);
}
return true;
}
return false;
}
}
95 changes: 95 additions & 0 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2827,4 +2827,99 @@ public static <T> void checkInvalidParallelSubscribers(ParallelFlowable<T> sourc
tss[i].assertFailure(IllegalArgumentException.class);
}
}

public static <T> Observable<T> rejectObservableFusion() {
return new Observable<T>() {
@Override
protected void subscribeActual(Observer<? super T> observer) {
observer.onSubscribe(new QueueDisposable<T>() {

@Override
public int requestFusion(int mode) {
return 0;
}

@Override
public boolean offer(T value) {
throw new IllegalStateException();
}

@Override
public boolean offer(T v1, T v2) {
throw new IllegalStateException();
}

@Override
public T poll() throws Exception {
return null;
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public void clear() {
}

@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return false;
}
});
}
};
}

public static <T> Flowable<T> rejectFlowableFusion() {
return new Flowable<T>() {
@Override
protected void subscribeActual(Subscriber<? super T> observer) {
observer.onSubscribe(new QueueSubscription<T>() {

@Override
public int requestFusion(int mode) {
return 0;
}

@Override
public boolean offer(T value) {
throw new IllegalStateException();
}

@Override
public boolean offer(T v1, T v2) {
throw new IllegalStateException();
}

@Override
public T poll() throws Exception {
return null;
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public void clear() {
}

@Override
public void cancel() {
}

@Override
public void request(long n) {
}
});
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.*;
Expand Down Expand Up @@ -112,6 +113,19 @@ public CompletableSource apply(Integer v) throws Exception {
.assertFailure(TestException.class);
}

@Test
public void mapperCrashHidden() {
Observable.just(1).hide()
.concatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void immediateError() {
PublishSubject<Integer> ps = PublishSubject.create();
Expand Down Expand Up @@ -359,4 +373,62 @@ public void doneButNotEmpty() {

to.assertResult();
}

@Test
public void asyncFused() {
final PublishSubject<Integer> ps = PublishSubject.create();
final CompletableSubject cs = CompletableSubject.create();

final TestObserver<Void> to = ps.observeOn(ImmediateThinScheduler.INSTANCE)
.concatMapCompletable(
Functions.justFunction(cs)
)
.test();

ps.onNext(1);
ps.onComplete();

cs.onComplete();

to.assertResult();
}

@Test
public void fusionRejected() {
final CompletableSubject cs = CompletableSubject.create();

TestHelper.rejectObservableFusion()
.concatMapCompletable(
Functions.justFunction(cs)
)
.test()
.assertEmpty();
}

@Test
public void emptyScalarSource() {
final CompletableSubject cs = CompletableSubject.create();

Observable.empty()
.concatMapCompletable(Functions.justFunction(cs))
.test()
.assertResult();
}

@Test
public void justScalarSource() {
final CompletableSubject cs = CompletableSubject.create();

TestObserver<Void> to = Observable.just(1)
.concatMapCompletable(Functions.justFunction(cs))
.test();

to.assertEmpty();

assertTrue(cs.hasObservers());

cs.onComplete();

to.assertResult();
}
}

0 comments on commit a5d99b7

Please sign in to comment.