Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators Skip, SkipLast, Take with time #667

Merged
merged 1 commit into from
Dec 27, 2013

Conversation

akarnokd
Copy link
Member

Rebased version, without the drain scheduler variant.

@cloudbees-pull-request-builder

RxJava-pull-requests #601 FAILURE
Looks like there's a problem with this pull request

private static final class SourceObserver<T> implements Observer<T>, Action0 {
final Observer<? super T> observer;
final Subscription cancel;
final AtomicInteger state = new AtomicInteger();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need an atomic state machine in here when there is not going to be any concurrency when the Observer is invoked?

The concurrency will happen in a very controlled place when the timer fires and you emit whatever is queued and onCompleted, but the on* events will not be invoked concurrently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take doesn't queue anything. It relays events until the timer fires, which might run at the same time as a regular onNext event. The atomic is there to prevent this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a simple compareAndSet then on being finished is it not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I use the classic get/compareAndSet, the following case might happen:

T1: if (state.get()) succeeds
T2: if (compareAndSet(true, false)) succeeds
T1: observer.onNext() executing
T2: observer.onCompleted executing

One would need to mutually exclude observer.onXXX calls, which could be done via synchronization and some overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right ...

benjchristensen added a commit that referenced this pull request Dec 27, 2013
@benjchristensen benjchristensen merged commit eb29595 into ReactiveX:master Dec 27, 2013
@akarnokd akarnokd deleted the SkipTakeTimed2 branch January 13, 2014 09:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants