Skip to content

Commit

Permalink
Merge pull request #1446 from benjchristensen/zip-backpressure
Browse files Browse the repository at this point in the history
Zip with Backpressure Support
  • Loading branch information
benjchristensen committed Jul 17, 2014
2 parents cb75b97 + b068f5d commit 201f54d
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public void onNext(T t) {
child.onNext(false);
child.onCompleted();
unsubscribe();
} else {
// if we drop values we must replace them upstream as downstream won't receive and request more
request(1);
}
}

Expand Down
242 changes: 153 additions & 89 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func2;
import rx.functions.Func3;
Expand All @@ -33,6 +35,7 @@
import rx.functions.Func9;
import rx.functions.FuncN;
import rx.functions.Functions;
import rx.internal.util.RxRingBuffer;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -48,7 +51,9 @@
* <p>
* The resulting Observable returned from zip will invoke <code>onNext</code> as many times as the
* number of <code>onNext</code> invocations of the source Observable that emits the fewest items.
* @param <R> the result type
*
* @param <R>
* the result type
*/
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
/*
Expand Down Expand Up @@ -104,69 +109,106 @@ public OperatorZip(Func9 f) {

@SuppressWarnings("rawtypes")
@Override
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
return new Subscriber<Observable[]>(observer) {
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
final Zip<R> zipper = new Zip<R>(child, zipFunction);
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
child.setProducer(producer);
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);
return subscriber;
}

boolean started = false;
private final class ZipSubscriber extends Subscriber<Observable[]> {

@Override
public void onCompleted() {
if (!started) {
// this means we have not received a valid onNext before termination so we emit the onCompleted
observer.onCompleted();
}
}
final Subscriber<? super R> child;
final Zip<R> zipper;
final ZipProducer<R> producer;

@Override
public void onError(Throwable e) {
observer.onError(e);
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper, ZipProducer<R> producer) {
super(child);
this.child = child;
this.zipper = zipper;
this.producer = producer;
}

boolean started = false;

@Override
public void onCompleted() {
if (!started) {
// this means we have not received a valid onNext before termination so we emit the onCompleted
child.onCompleted();
}
}

@Override
public void onNext(Observable[] observables) {
if (observables == null || observables.length == 0) {
observer.onCompleted();
} else {
started = true;
new Zip<R>(observables, observer, zipFunction).zip();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(Observable[] observables) {
if (observables == null || observables.length == 0) {
child.onCompleted();
} else {
started = true;
zipper.start(observables, producer);
}
}

}

private static final class ZipProducer<R> extends AtomicLong implements Producer {

private Zip<R> zipper;

public ZipProducer(Zip<R> zipper) {
this.zipper = zipper;
}

@Override
public void request(long n) {
addAndGet(n);
// try and claim emission if no other threads are doing so
zipper.tick();
}

};
}

static final NotificationLite<Object> on = NotificationLite.instance();
private static final class Zip<R> {
@SuppressWarnings("rawtypes")
final Observable[] os;
final Object[] observers;
final Observer<? super R> observer;
final FuncN<? extends R> zipFunction;
final CompositeSubscription childSubscription = new CompositeSubscription();
private final Observer<? super R> child;
private final FuncN<? extends R> zipFunction;
private final CompositeSubscription childSubscription = new CompositeSubscription();

volatile long counter;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER
= AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");

static final int THRESHOLD = (int) (RxRingBuffer.SIZE * 0.7);
int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block

/* initialized when started in `start` */
private Object[] observers;
private AtomicLong requested;

@SuppressWarnings("rawtypes")
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
this.os = os;
this.observer = observer;
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
this.child = child;
this.zipFunction = zipFunction;
child.add(childSubscription);
}

@SuppressWarnings("unchecked")
public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) {
observers = new Object[os.length];
this.requested = requested;
for (int i = 0; i < os.length; i++) {
InnerObserver io = new InnerObserver();
InnerSubscriber io = new InnerSubscriber();
observers[i] = io;
childSubscription.add(io);
}

observer.add(childSubscription);
}

@SuppressWarnings("unchecked")
public void zip() {
for (int i = 0; i < os.length; i++) {
os[i].unsafeSubscribe((InnerObserver) observers[i]);
os[i].unsafeSubscribe((InnerSubscriber) observers[i]);
}
}

Expand All @@ -179,51 +221,64 @@ public void zip() {
*/
@SuppressWarnings("unchecked")
void tick() {
if (observers == null) {
// nothing yet to do (initial request from Producer)
return;
}
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
do {
final Object[] vs = new Object[observers.length];
boolean allHaveValues = true;
for (int i = 0; i < observers.length; i++) {
Object n = ((InnerObserver) observers[i]).items.peek();

if (n == null) {
allHaveValues = false;
continue;
}
// we only emit if requested > 0
if (requested.get() > 0) {
final Object[] vs = new Object[observers.length];
boolean allHaveValues = true;
for (int i = 0; i < observers.length; i++) {
RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items;
Object n = buffer.peek();

if (n == null) {
allHaveValues = false;
continue;
}

switch (on.kind(n)) {
case OnNext:
vs[i] = on.getValue(n);
break;
case OnCompleted:
observer.onCompleted();
// we need to unsubscribe from all children since children are
// independently subscribed
childSubscription.unsubscribe();
return;
default:
// shouldn't get here
}
}
if (allHaveValues) {
try {
// all have something so emit
observer.onNext(zipFunction.call(vs));
} catch (Throwable e) {
observer.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
return;
}
// now remove them
for (Object obj : observers) {
InnerObserver io = (InnerObserver)obj;
io.items.poll();
// eagerly check if the next item on this queue is an onComplete
if (on.isCompleted(io.items.peek())) {
// it is an onComplete so shut down
observer.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
if (buffer.isCompleted(n)) {
child.onCompleted();
// we need to unsubscribe from all children since children are
// independently subscribed
childSubscription.unsubscribe();
return;
} else {
vs[i] = buffer.getValue(n);
}
}
if (allHaveValues) {
try {
// all have something so emit
child.onNext(zipFunction.call(vs));
// we emitted so decrement the requested counter
requested.decrementAndGet();
emitted++;
} catch (Throwable e) {
child.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
return;
}
// now remove them
for (Object obj : observers) {
RxRingBuffer buffer = ((InnerSubscriber) obj).items;
buffer.poll();
// eagerly check if the next item on this queue is an onComplete
if (buffer.isCompleted(buffer.peek())) {
// it is an onComplete so shut down
child.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
childSubscription.unsubscribe();
return;
}
}
if (emitted > THRESHOLD) {
for (Object obj : observers) {
((InnerSubscriber) obj).request(emitted);
}
emitted = 0;
}
}
}
Expand All @@ -235,27 +290,36 @@ void tick() {
// used to observe each Observable we are zipping together
// it collects all items in an internal queue
@SuppressWarnings("rawtypes")
final class InnerObserver extends Subscriber {
final class InnerSubscriber extends Subscriber {
// Concurrent* since we need to read it from across threads
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
final RxRingBuffer items = RxRingBuffer.getSpmcInstance();

@Override
public void onStart() {
request(RxRingBuffer.SIZE);
}

@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
items.add(on.completed());
items.onCompleted();
tick();
}

@Override
public void onError(Throwable e) {
// emit error and shut down
observer.onError(e);
// emit error immediately and shut down
child.onError(e);
}

@SuppressWarnings("unchecked")
@Override
public void onNext(Object t) {
items.add(on.next(t));
try {
items.onNext(t);
} catch (MissingBackpressureException e) {
onError(e);
}
tick();
}
};
Expand Down
26 changes: 20 additions & 6 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,37 @@ public Object poll() {
Object o;
o = queue.poll();
/*
* benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll()
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
* "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState.
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
*
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
* or needing to enqueue terminalState.
*
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
* is currently the way it is.
*
* This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list,
* such as ConcurrentLinkedQueue.
*/
if (o == null && terminalState != null && queue.size() == 0) {
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
// once emitted we clear so a poll loop will finish
terminalState = null;
}
return o;
}

public Object peek() {
if (queue == null) {
// we are unsubscribed and have released the undelrying queue
return null;
}
Object o;
o = queue.peek();
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
}
return o;
}

public boolean isCompleted(Object o) {
return on.isCompleted(o);
Expand All @@ -306,6 +316,10 @@ public boolean isError(Object o) {
return on.isError(o);
}

public Object getValue(Object o) {
return on.getValue(o);
}

public boolean accept(Object o, Observer child) {
return on.accept(child, o);
}
Expand Down
Loading

0 comments on commit 201f54d

Please sign in to comment.