Skip to content

Commit

Permalink
Merge pull request ReactiveX#269 from benjchristensen/observeOn
Browse files Browse the repository at this point in the history
Fix concurrency bug in ScheduledObserver
  • Loading branch information
benjchristensen committed May 11, 2013
2 parents 1bfe561 + 94c950a commit dbd6a43
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions rxjava-core/src/main/java/rx/operators/ScheduledObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Action0;

/* package */class ScheduledObserver<T> implements Observer<T> {
private final Observer<T> underlying;
private final Scheduler scheduler;

private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
private final AtomicInteger counter = new AtomicInteger(0);

public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
this.underlying = underlying;
this.scheduler = scheduler;
Expand All @@ -41,38 +46,50 @@ public void onError(final Exception e) {
}

@Override
public void onNext(final T v) {
enqueue(new Notification<T>(v));
public void onNext(final T args) {
enqueue(new Notification<T>(args));
}

private void enqueue(final Notification<T> notification) {
private void enqueue(Notification<T> notification) {
// this must happen before 'counter' is used to provide synchronization between threads
queue.offer(notification);

Schedulers.currentThread().schedule(new Action0() {
// we now use counter to atomically determine if we need to start processing or not
// it will be 0 if it's the first notification or the scheduler has finished processing work
// and we need to start doing it again
if (counter.getAndIncrement() == 0) {
processQueue();
}
}

private void processQueue() {
scheduler.schedule(new Action0() {
@Override
public void call() {
Notification<T> not = queue.poll();

scheduler.schedule(new Action0() {
@Override
public void call() {
switch (notification.getKind()) {
case OnNext:
underlying.onNext(notification.getValue());
break;
case OnError:
underlying.onError(notification.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + notification);
switch (not.getKind()) {
case OnNext:
underlying.onNext(not.getValue());
break;
case OnError:
underlying.onError(not.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + not);

}
}
});
}
}

});
};
// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
scheduler.schedule(this);
}

}
});
}
}

0 comments on commit dbd6a43

Please sign in to comment.