Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observeOn on Flowable.rangeLong does not emit values on provided scheduler #5676

Closed
sobersanta opened this issue Oct 17, 2017 · 11 comments
Closed

Comments

@sobersanta
Copy link
Contributor

sobersanta commented Oct 17, 2017

Operators fusion in RxJava 2 is a really nice feature but I have found a case when an implementation of Flowable.rangeLong (and I assume there is similar behavior in neighbor standard flowables) fused together with operator observeOn() breaks the semantics of the operator observeOn().
Basically, no emissions from FlowableRangeLong flowable is done on the scheduler provided in observeOn() but rather on the thread which calls request() in the chain.
Here is the code snipped which reproduces the issue.

import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscription;

public class RxJavaFuseTest {
    public static void main(String[] args) {
        FlowableSubscriber sequentialSubscriber = new FlowableSubscriber() {
            private Subscription s;

            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
                this.s.request(1);
            }

            @Override
            public void onNext(Object x) {
                s.request(1);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                System.exit(0);
            }

            @Override
            public void onComplete() {
                System.out.println("Completed");
                System.exit(0);
            }
        };

        Flowable.rangeLong(0, 10)
                .observeOn(RxJavaPlugins.createSingleScheduler(r -> new Thread(r, "producer")), false, 1)
                .doOnNext(aLong -> {
                    System.out.println(aLong + " emitting on " + Thread.currentThread().getName());
                })
                .parallel(2, 1)
                .runOn(Schedulers.computation(), 1)
                .doOnNext(aLong -> System.out.println(aLong + " processing on " + Thread.currentThread().getName()))
                .sequential()
                .subscribe(sequentialSubscriber);
    }
}

Output is as follows:

0 emitting on main
0 processing on RxComputationThreadPool-1
1 emitting on main
2 emitting on main
1 processing on RxComputationThreadPool-2
2 processing on RxComputationThreadPool-1
3 emitting on main
4 emitting on main
3 processing on RxComputationThreadPool-2
5 emitting on RxComputationThreadPool-2
5 processing on RxComputationThreadPool-2
4 processing on RxComputationThreadPool-1
6 emitting on RxComputationThreadPool-2
7 emitting on RxComputationThreadPool-2
6 processing on RxComputationThreadPool-2
7 processing on RxComputationThreadPool-1
8 emitting on RxComputationThreadPool-2
9 emitting on RxComputationThreadPool-2
8 processing on RxComputationThreadPool-2
9 processing on RxComputationThreadPool-1
Completed

Maybe I don't get something and have not read documentation carefully enough, but to me it looks like the fusion between operators breaks semantics of ObserveOn.

Moreover, while digging in the source code of related classes in RxJava, I have found suspicious code in the FlowableRangeLong.BaseRangeSubscription.poll() method: it's potentially can be called from different threads but field "index" is not guarded anyhow from contended access so can produce wrong values if instructions reordering or caching is in place on a processor.

@akarnokd akarnokd added the 2.x label Oct 17, 2017
@sobersanta
Copy link
Contributor Author

Forgot to mention: it's about latest release 2.1.5 and I can't reproduce it without going parallel()

@akarnokd
Copy link
Member

akarnokd commented Oct 17, 2017

Hi.

On one hand, using observeOn to pin such source to a particular thread is not the right operator. If you use subscribeOn, the correct emission thread is encountered in doOnNext.

On the other hand, preventing fusion via hide() before parallel can get the desired thread confinement effect.

I'll post a fix to parallel indicating it is expected to be an async boundary so thread-sensitive operators such as doOnNext won't fuse.

@akarnokd akarnokd added the Bug label Oct 17, 2017
@sobersanta
Copy link
Contributor Author

Aha, that makes sense, thanks for the clarification!

@akarnokd
Copy link
Member

See #5677 for the fix.

@sobersanta
Copy link
Contributor Author

Many thanks for your quick reaction!
Does it make any sense what I have written at the bottom of my original message about possible memory visibility issues with FlowableRangeLong.BaseRangeSubscription.index field being modified in poll()? Or poll() is supposed to be called in a sync mode only?

@akarnokd
Copy link
Member

The QueueSubscription interface defines the access mode. poll is called in a sequential manner so index update is not a problem.

Closing via #5677

@sobersanta
Copy link
Contributor Author

Yes it does but it doesnot say it cannot be called from different threads, so isn't it essential to add volatile keyword to it?

@akarnokd
Copy link
Member

The term "sequential manner" means that if multiple threads would call poll, they have to take turns and do such calls in a non-overlapping manner, ensuring visibility of changes poll made. In observeOn, the poll is called from the guaranteed one thread at once drain loops.

@sobersanta
Copy link
Contributor Author

Well but Java memory model has quite specific memory visibility guarantees and without volatile keyword, instruction reordering and caching can lead to inconsistent state of index variable vidible to other thread calling it right after another. Yes with correct behavior of observeOn its not an issue at all bit in other rare cases it could be isn't it? In fact I am noy using Flowable.range in production code and used it to give an example and its the way I learn RxJava 2 after couple of years living with version 1 :)

@akarnokd
Copy link
Member

The parent queue interface was designed after the Single-Producer-Single-Consumer queues like those in JCTools.

There, the requirement is that poll is called from a single thread at a time and the fused "queues" must honor this requirement. Since index in fused mode is only updated from within poll and poll is serialized by a pair of full barrier atomic add operations, any change to index will be written back into memory before an atomic decrement and read from it after the atomic increment of the drain loop.

It works just the same as the non-fused request execution where the atomic transition from 0 to N is guaranteed to happen only to one thread and the atomic transition attempt from N back to 0 will enable a subsequent thread to do the 0 to N transition again. When doing the 0 to N transition, the next operation is to read the index and when doing N to 0, the prior operation is to write back the index. The full barriers of the atomics will make sure the change to index is visible to the next thread.

On rare cases, such as the TestSubscriber, poll is called from within onNext and onNext is required by the Reactive-Streams spec to be serialized the very same manner as poll requires it.

@sobersanta
Copy link
Contributor Author

I do appreciate your detailed explanations.
I think I need to learn deeper memory barrier behavior for java volatiles and cache coherency protocols...
I was convinced that writing volatile variable leads to invalidation of corresponding individual cache lines and forbidding certain instruction reordering which not necessarily means ensuring volatile writes to other locally used non-volatile variables, so each of them has to be marked as volatile to ensure inter-thread visibility guarantees...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants