Skip to content

Commit

Permalink
2.x: coverage and cleanup 10/12-1 (#4696)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 12, 2016
1 parent 35c2951 commit e71d371
Show file tree
Hide file tree
Showing 17 changed files with 1,372 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ public void onSubscribe(Disposable d) {
}
return;
}
if (once.get() || set.isDisposed()) {
return;
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand All @@ -36,133 +37,218 @@ public CompletableConcat(Publisher<? extends CompletableSource> sources, int pre

@Override
public void subscribeActual(CompletableObserver s) {
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
sources.subscribe(parent);
sources.subscribe(new CompletableConcatSubscriber(s, prefetch));
}

static final class CompletableConcatSubscriber
extends AtomicInteger
implements Subscriber<CompletableSource>, Disposable {
private static final long serialVersionUID = 7412667182931235013L;
private static final long serialVersionUID = 9032184911934499404L;

final CompletableObserver actual;

final int prefetch;
final SequentialDisposable sd;

final SpscArrayQueue<CompletableSource> queue;
final int limit;

final ConcatInnerObserver inner;

final AtomicBoolean once;

int sourceFused;

int consumed;

SimpleQueue<CompletableSource> queue;

Subscription s;

volatile boolean done;

final AtomicBoolean once = new AtomicBoolean();

final ConcatInnerObserver inner;
volatile boolean active;

CompletableConcatSubscriber(CompletableObserver actual, int prefetch) {
this.actual = actual;
this.prefetch = prefetch;
this.queue = new SpscArrayQueue<CompletableSource>(prefetch);
this.sd = new SequentialDisposable();
this.inner = new ConcatInnerObserver();
this.inner = new ConcatInnerObserver(this);
this.once = new AtomicBoolean();
this.limit = prefetch - (prefetch >> 2);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

long r = prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch;

if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<CompletableSource> qs = (QueueSubscription<CompletableSource>) s;

int m = qs.requestFusion(QueueSubscription.ANY);

if (m == QueueSubscription.SYNC) {
sourceFused = m;
queue = qs;
done = true;
actual.onSubscribe(this);
drain();
return;
}
if (m == QueueSubscription.ASYNC) {
sourceFused = m;
queue = qs;
actual.onSubscribe(this);
s.request(r);
return;
}
}

if (prefetch == Integer.MAX_VALUE) {
queue = new SpscLinkedArrayQueue<CompletableSource>(Flowable.bufferSize());
} else {
queue = new SpscArrayQueue<CompletableSource>(prefetch);
}

actual.onSubscribe(this);
s.request(prefetch);

s.request(r);
}
}

@Override
public void onNext(CompletableSource t) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
return;
}
if (getAndIncrement() == 0) {
next();
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
return;
}
}
drain();
}

@Override
public void onError(Throwable t) {
if (once.compareAndSet(false, true)) {
DisposableHelper.dispose(inner);
actual.onError(t);
return;
} else {
RxJavaPlugins.onError(t);
}
done = true;
RxJavaPlugins.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
if (getAndIncrement() == 0) {
next();
}
}

void innerError(Throwable e) {
s.cancel();
onError(e);
}

void innerComplete() {
if (decrementAndGet() != 0) {
next();
}
if (!done) {
s.request(1);
}
drain();
}

@Override
public void dispose() {
s.cancel();
sd.dispose();
DisposableHelper.dispose(inner);
}

@Override
public boolean isDisposed() {
return sd.isDisposed();
return DisposableHelper.isDisposed(inner.get());
}

void next() {
boolean d = done;
CompletableSource c = queue.poll();
if (c == null) {
if (d) {
if (once.compareAndSet(false, true)) {
actual.onComplete();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}

for (;;) {
if (isDisposed()) {
return;
}
RxJavaPlugins.onError(new IllegalStateException("Queue is empty?!"));
return;

if (!active) {

boolean d = done;

CompletableSource cs;

try {
cs = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
innerError(ex);
return;
}

boolean empty = cs == null;

if (d && empty) {
if (once.compareAndSet(false, true)) {
actual.onComplete();
}
return;
}

if (!empty) {
active = true;
cs.subscribe(inner);
request();
}
}

if (decrementAndGet() == 0) {
break;
}
}
}

void request() {
if (sourceFused != QueueSubscription.SYNC) {
int p = consumed + 1;
if (p == limit) {
consumed = 0;
s.request(p);
} else {
consumed = p;
}
}
}

void innerError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.cancel();
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

c.subscribe(inner);
void innerComplete() {
active = false;
drain();
}

final class ConcatInnerObserver implements CompletableObserver {
static final class ConcatInnerObserver extends AtomicReference<Disposable> implements CompletableObserver {
private static final long serialVersionUID = -5454794857847146511L;

final CompletableConcatSubscriber parent;

ConcatInnerObserver(CompletableConcatSubscriber parent) {
this.parent = parent;
}

@Override
public void onSubscribe(Disposable d) {
sd.update(d);
DisposableHelper.replace(this, d);
}

@Override
public void onError(Throwable e) {
innerError(e);
parent.innerError(e);
}

@Override
public void onComplete() {
innerComplete();
parent.innerComplete();
}
}
}
Expand Down
Loading

0 comments on commit e71d371

Please sign in to comment.