Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs #1955

Merged
merged 7 commits into from
Feb 3, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 26 additions & 88 deletions src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.concurrent.BlockingQueue;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.internal.util.BackpressureDrainManager;

/**
* Operator that blocks the producer thread in case a backpressure is needed.
Expand All @@ -38,42 +38,25 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
return s;
}

static final class BlockingSubscriber<T> extends Subscriber<T> {
static final class BlockingSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
final NotificationLite<T> nl = NotificationLite.instance();
final BlockingQueue<Object> queue;
final Subscriber<? super T> child;
/** Guarded by this. */
long requestedCount;
/** Guarded by this. */
boolean emitting;
volatile boolean terminated;
/** Set before terminated, read after terminated. */
Throwable exception;
final BackpressureDrainManager manager;
public BlockingSubscriber(int max, Subscriber<? super T> child) {
this.queue = new ArrayBlockingQueue<Object>(max);
this.child = child;
this.manager = new BackpressureDrainManager(this);
}
void init() {
child.add(this);
child.setProducer(new Producer() {
@Override
public void request(long n) {
synchronized (BlockingSubscriber.this) {
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
requestedCount = Long.MAX_VALUE;
} else {
requestedCount += n;
}
}
drain();
}
});
child.setProducer(manager);
}
@Override
public void onNext(T t) {
try {
queue.put(nl.next(t));
drain();
manager.drain();
} catch (InterruptedException ex) {
if (!isUnsubscribed()) {
onError(ex);
Expand All @@ -82,76 +65,31 @@ public void onNext(T t) {
}
@Override
public void onError(Throwable e) {
if (!terminated) {
exception = e;
terminated = true;
drain();
}
manager.terminateAndDrain(e);
}
@Override
public void onCompleted() {
terminated = true;
drain();
manager.terminateAndDrain();
}
void drain() {
long n;
synchronized (this) {
if (emitting) {
return;
}
emitting = true;
n = requestedCount;
}
boolean skipFinal = false;
try {
while (true) {
int emitted = 0;
while (n > 0) {
Object o = queue.poll();
if (o == null) {
if (terminated) {
if (exception != null) {
child.onError(exception);
} else {
child.onCompleted();
}
return;
}
break;
} else {
child.onNext(nl.getValue(o));
n--;
emitted++;
}
}
synchronized (this) {
// if no backpressure below
if (requestedCount == Long.MAX_VALUE) {
// no new data arrived since the last poll
if (queue.peek() == null) {
skipFinal = true;
emitting = false;
return;
}
n = Long.MAX_VALUE;
} else {
if (emitted == 0) {
skipFinal = true;
emitting = false;
return;
}
requestedCount -= emitted;
n = requestedCount;
}
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
@Override
public boolean accept(Object value) {
return nl.accept(child, value);
}
@Override
public void complete(Throwable exception) {
if (exception != null) {
child.onError(exception);
} else {
child.onCompleted();
}
}
@Override
public Object peek() {
return queue.peek();
}
@Override
public Object poll() {
return queue.poll();
}
}
}
188 changes: 87 additions & 101 deletions src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -25,11 +24,10 @@
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.BackpressureDrainManager;

public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {

private final NotificationLite<T> on = NotificationLite.instance();

private final Long capacity;
private final Action0 onOverflow;

Expand All @@ -52,122 +50,110 @@ public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// TODO get a different queue implementation
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
final AtomicLong wip = new AtomicLong();
final AtomicLong requested = new AtomicLong();

child.setProducer(new Producer() {

@Override
public void request(long n) {
if (requested.getAndAdd(n) == 0) {
pollQueue(wip, requested, capacity, queue, child);
}
}

});
// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
Subscriber<T> parent = new Subscriber<T>() {
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow);

private AtomicBoolean saturated = new AtomicBoolean(false);
// if child unsubscribes it should unsubscribe the parent, but not the other way around
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this use case why would we ever be unsubscribing early? We can emit an onError, but I don't see the need to decouple the subscription.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertCapacity calls unsubscribe and would disrupt downstream.

child.add(parent);
child.setProducer(parent.manager());

@Override
public void onStart() {
request(Long.MAX_VALUE);
}
return parent;
}
private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
// TODO get a different queue implementation
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
private final Long baseCapacity;
private final AtomicLong capacity;
private final Subscriber<? super T> child;
private final AtomicBoolean saturated = new AtomicBoolean(false);
private final BackpressureDrainManager manager;
private final NotificationLite<T> on = NotificationLite.instance();
private final Action0 onOverflow;

public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) {
this.child = child;
this.baseCapacity = capacity;
this.capacity = capacity != null ? new AtomicLong(capacity) : null;
this.onOverflow = onOverflow;
this.manager = new BackpressureDrainManager(this);
}
@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
if (!saturated.get()) {
queue.offer(on.completed());
pollQueue(wip, requested, capacity, queue, child);
}
@Override
public void onCompleted() {
if (!saturated.get()) {
manager.terminateAndDrain();
}
}

@Override
public void onError(Throwable e) {
if (!saturated.get()) {
queue.offer(on.error(e));
pollQueue(wip, requested, capacity, queue, child);
}
@Override
public void onError(Throwable e) {
if (!saturated.get()) {
manager.terminateAndDrain(e);
}
}

@Override
public void onNext(T t) {
if (!assertCapacity()) {
return;
}
queue.offer(on.next(t));
pollQueue(wip, requested, capacity, queue, child);
@Override
public void onNext(T t) {
if (!assertCapacity()) {
return;
}
queue.offer(on.next(t));
manager.drain();
}

private boolean assertCapacity() {
if (capacity == null) {
return true;
}

long currCapacity;
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(new MissingBackpressureException(
"Overflowed buffer of "
+ OperatorOnBackpressureBuffer.this.capacity));
if (onOverflow != null) {
onOverflow.call();
}
}
return false;
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
return true;
@Override
public boolean accept(Object value) {
return on.accept(child, value);
}
@Override
public void complete(Throwable exception) {
if (exception != null) {
child.onError(exception);
} else {
child.onCompleted();
}
};

// if child unsubscribes it should unsubscribe the parent, but not the other way around
child.add(parent);
}
@Override
public Object peek() {
return queue.peek();
}
@Override
public Object poll() {
return queue.poll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add if (capacity != null) capacity.incrementAndGet(); here.

}

return parent;
}
private boolean assertCapacity() {
if (capacity == null) {
return true;
}

private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
if (requested.get() > 0) {
// only one draining at a time
try {
/*
* This needs to protect against concurrent execution because `request` and `on*` events can come concurrently.
*/
if (wip.getAndIncrement() == 0) {
while (true) {
if (requested.getAndDecrement() != 0) {
Object o = queue.poll();
if (o == null) {
// nothing in queue
requested.incrementAndGet();
return;
}
if (capacity != null) { // it's bounded
capacity.incrementAndGet();
}
on.accept(child, o);
} else {
// we hit the end ... so increment back to 0 again
requested.incrementAndGet();
return;
long currCapacity;
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(new MissingBackpressureException(
"Overflowed buffer of "
+ baseCapacity));
if (onOverflow != null) {
onOverflow.call();
}
}
return false;
}

} finally {
wip.decrementAndGet();
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
return true;
}
protected Producer manager() {
return manager;
}
}
}
Loading