-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
observeOn on Flowable.rangeLong does not emit values on provided scheduler #5676
Comments
Forgot to mention: it's about latest release 2.1.5 and I can't reproduce it without going parallel() |
Hi. On one hand, using On the other hand, preventing fusion via I'll post a fix to |
Aha, that makes sense, thanks for the clarification! |
See #5677 for the fix. |
Many thanks for your quick reaction! |
Yes it does but it doesnot say it cannot be called from different threads, so isn't it essential to add volatile keyword to it? |
The term "sequential manner" means that if multiple threads would call |
Well but Java memory model has quite specific memory visibility guarantees and without volatile keyword, instruction reordering and caching can lead to inconsistent state of index variable vidible to other thread calling it right after another. Yes with correct behavior of observeOn its not an issue at all bit in other rare cases it could be isn't it? In fact I am noy using Flowable.range in production code and used it to give an example and its the way I learn RxJava 2 after couple of years living with version 1 :) |
The parent queue interface was designed after the Single-Producer-Single-Consumer queues like those in JCTools. There, the requirement is that It works just the same as the non-fused On rare cases, such as the |
I do appreciate your detailed explanations. |
Operators fusion in RxJava 2 is a really nice feature but I have found a case when an implementation of Flowable.rangeLong (and I assume there is similar behavior in neighbor standard flowables) fused together with operator observeOn() breaks the semantics of the operator observeOn().
Basically, no emissions from FlowableRangeLong flowable is done on the scheduler provided in observeOn() but rather on the thread which calls request() in the chain.
Here is the code snipped which reproduces the issue.
Output is as follows:
Maybe I don't get something and have not read documentation carefully enough, but to me it looks like the fusion between operators breaks semantics of ObserveOn.
Moreover, while digging in the source code of related classes in RxJava, I have found suspicious code in the FlowableRangeLong.BaseRangeSubscription.poll() method: it's potentially can be called from different threads but field "index" is not guarded anyhow from contended access so can produce wrong values if instructions reordering or caching is in place on a processor.
The text was updated successfully, but these errors were encountered: