Skip to content

Commit

Permalink
Fix concurrency bug in ScheduledObserver
Browse files Browse the repository at this point in the history
This is a followup to ReactiveX@1fa6ae3 that fixed one issue (concurrency) and created another (broke Rx contract by allowing concurrent execution of onNext).

I have reverted back to the previous implementatio and then attempted to fix the concurrency issue again.

I think it ended up being a simple fix … just re-ordering the `enqueue` method to remove the race-condition between the logic protected by the AtomicInteger and adding to the queue.

It's not an atomic operation (adding then processing) so we need to just add to the queue and treat it as an async data structure and keep the AtomicInteger portion to only protecting the "process or not process" logic.

```java
        // this must happen before 'counter' is used to provide synchronization between threads
        queue.offer(notification);
```

This may still have issues but it's now working in all of my concurrency tests (the ones that broken with the original and then my modified version). The tests are not easy to build unit tests out of as they require running for many seconds and non-deterministically causing a race condition so I have not yet spend the time to try and figure out a deterministic unit test hence them not being committed.
  • Loading branch information
benjchristensen committed May 11, 2013
1 parent 1bfe561 commit 94c950a
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions rxjava-core/src/main/java/rx/operators/ScheduledObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Action0;

/* package */class ScheduledObserver<T> implements Observer<T> {
private final Observer<T> underlying;
private final Scheduler scheduler;

private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
private final AtomicInteger counter = new AtomicInteger(0);

public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
this.underlying = underlying;
this.scheduler = scheduler;
Expand All @@ -41,38 +46,50 @@ public void onError(final Exception e) {
}

@Override
public void onNext(final T v) {
enqueue(new Notification<T>(v));
public void onNext(final T args) {
enqueue(new Notification<T>(args));
}

private void enqueue(final Notification<T> notification) {
private void enqueue(Notification<T> notification) {
// this must happen before 'counter' is used to provide synchronization between threads
queue.offer(notification);

Schedulers.currentThread().schedule(new Action0() {
// we now use counter to atomically determine if we need to start processing or not
// it will be 0 if it's the first notification or the scheduler has finished processing work
// and we need to start doing it again
if (counter.getAndIncrement() == 0) {
processQueue();
}
}

private void processQueue() {
scheduler.schedule(new Action0() {
@Override
public void call() {
Notification<T> not = queue.poll();

scheduler.schedule(new Action0() {
@Override
public void call() {
switch (notification.getKind()) {
case OnNext:
underlying.onNext(notification.getValue());
break;
case OnError:
underlying.onError(notification.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + notification);
switch (not.getKind()) {
case OnNext:
underlying.onNext(not.getValue());
break;
case OnError:
underlying.onError(not.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + not);

}
}
});
}
}

});
};
// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
scheduler.schedule(this);
}

}
});
}
}

0 comments on commit 94c950a

Please sign in to comment.