Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization Behavior #998

Closed
benjchristensen opened this issue Mar 28, 2014 · 18 comments
Closed

Serialization Behavior #998

benjchristensen opened this issue Mar 28, 2014 · 18 comments

Comments

@benjchristensen
Copy link
Member

Opening this issue to capture, document and discuss how serialization (serialize(), merge(), flatMap(), mergeMap()) behaves and is implemented.

Prior to 0.17.1 all serialization was done via blocking synchronization. This is how Rx.Net does it so we adopted the same pattern. This however breaks the model of Rx which is supposed to be non-blocking and be usable in non-blocking environments (think NIO event loops like Netty, Vert.x and Node.js-style apps). Blocking a thread while merging can significantly impact throughput in an application based on event-loops.

The migration to doing "serialization" instead of "synchronization" came with trade-offs.

Back Pressure

To be non-blocking means it must becomes async and allow threads to deliver their notifications, queueing if necessary and return. This can result in buffer-bloat and typical back-pressure problems. Solutions to this are being explored internally and with other teams/companies and will result in changes in the future.

Concurrency

One way of solving the problem without blocking is similar to observeOn and everything gets dropped into a queue and another thread pulls off the queue.

This however means that we are injecting additional concurrency in places it is not expected and generally not desired.

The current implementation does not do this. It uses the threads that are pushing events through and "steals" a single thread at a time to push through whatever is in the queue and then itself and then return to do its own work.

Thread Starvation

Stealing threads opens up the possibility of thread starvation. If a thread loops continually to drain the queue and the queue always is getting filled by other threads it will never be released to do its own work. This would means that the events intended for it to deliver would never be delivered as it is always busy delivering events on behalf of other threads.

Delayed Delivery

To prevent thread starvation the current implementation only allows draining the queue once. This can be increased to multiple iterations, but at some point it stops draining and returns and allows another thread to "win" and start draining.

During the time gap between the draining thread finishing and and a new thread taking over there may be a delay where events stay in the queue. This can delay delivery of events.

In a fast moving stream this is not a problem as another thread immediately takes over. In an intermittent stream however this can possibly mean long, non-determistic delays.

Possible Improvements

There are a few ways to improve this situation without reverting back to blocking.

Metrics could be kept to know if a stream is fast moving and thus thread-starvation is an issue and draining should be handed off to another thread. If starvation is not an issue then the queue could be fully drained before returning. This is still not perfect and would still risk one of the two occurring, but could probably solve most cases. The difficult is doing so without significantly impacting normal performance.

Another option is conditionally scheduling delivery onto another Scheduler when starvation is happening. This would allow most cases to be done by stealing threads, but flip to an observeOn style model if contention and/or starvation is happening.

Next steps

If the current functionality is breaking your use cases, you may want to stay on 0.17.0 while working with us to improve. Most use cases have shown to work fine with 0.17.1 behavior and the non-blocking and deadlock-free characteristics are necessary.

I welcome discussion, unit tests, pull requests and assistance on this.

Existing performance tests (that need work):

Existing unit tests:

@Strilanc
Copy link

I recommend the following:

  • Steal the current thread, as currently happens, but continue draining even if work is queued while you're draining.
  • If some limit is exceeded, either in number of work items or time duration, spin off a thread or other async-execution-thingy to handle draining the queue from then on.

This guarantees items can't sit indefinitely in the queue, and will not live lock of callers indefinitely.

@benjchristensen
Copy link
Member Author

spin off a thread or other async-execution-thingy to handle draining the queue from then on.

So it's clear what this means, the Rx contract is to never inject concurrency unless a Scheduler overload is offered that controls what Scheduler is used.

This means every flatMap, mergeMap, merge and mergeDelayError method needs an overload with a Scheduler. This is around 30 methods.

@Strilanc
Copy link

The livelock is only an issue when there's already concurrency (watch out for re-entrancy though), since other threads have to be adding data... but I suppose if every one of them was in some context the async-thingy we spun off might violate that.

What about some sort of hand-off? After the limit is exceeded, the current consumer sets a flag that others check when they're producing. The first producer that comes along is then forced to block so the consumer can hand-off to them after it finishes the next job. So we double-pay the latency of a job, once on the current consumer and once on blocking the to-be consumer, but gain not blocking anything indefinitely.

@benjchristensen
Copy link
Member Author

The first producer that comes along is then forced to block so the consumer can hand-off to them after it finishes the next job.

The problem I see with this is that we have no idea how quickly the onNext will complete and thus can block for non-deterministic periods of time. On event-loops that's a real problem. We could argue that if anyone is doing something like that they are "doing the wrong thing" but I'm not sure we can.

@benjchristensen
Copy link
Member Author

The backpressure (#1000) solution will affect the design of this. In particular it will mean the buffer becomes bounded and may naturally prevent thread starvation as the fast producers will park themselves. The addition of backpressure machinery may or may not necessitate the additional concurrent consumer.

@benjchristensen
Copy link
Member Author

I suggest changing the current implementation to tradeoff for risking starvation rather than delaying delivery. The starvation problem seems less likely to occur and a symptom of other problems (lack of backpressure?).

Or should we leave this as is until we get the backpressure stuff in place?

I'm not all that comfortable yet adding more threads to deal with serializing other threads. It feels wrong and makes for a confusing API (particularly on flatMap) since a Scheduler would exist but not be used for real work, just sometimes pushing notifications.

@headinthebox
Copy link
Contributor

suggest changing the current implementation to tradeoff for risking starvation

I would vote for that as a temporary fix until we have the back pressure stuff in place.

@Strilanc
Copy link

Sounds acceptable to me.

@davidmoten
Copy link
Collaborator

And me. Interesting discussion.

@benjchristensen
Copy link
Member Author

Please review changes on this pull request: #999

In particular this commit: benjchristensen@5b317ad

@petermd
Copy link
Contributor

petermd commented Apr 1, 2014

(moving comment from pull-request)

apologies for joining this a little late, but had a couple of observations/questions

  • in theory this wasn't a problem for Vert.x because as long as all the events that trigger the callbacks come from the Vert.x event loop those calls are already serialized (so there is no contention)
  • it does imply that all code using RxJava in Vert.x does not use any other threads/schedulers that might cause it to block or switch threads. in the Vert.x model the design requires you to delegate slow/blocking operations to separate dedicated worker-verticles that use their own thread-pool (and communication with those threads is via an async event bus mechanism)
  • this means that there is no "slow operation" that can happen via onNext, each processing pipeline should complete quickly (otherwise the event loop will starve other Verticles tied to the same Thread)

From what I understand of the current solution, this means that in Vert.x the short-circuit where no queue is created will always be executed and there will be minimal impact.

It also means that Vert.x is relying on not introducing Concurrency without an explicitly provided Scheduler (aside: I assume naively this never happens?) as the user would also need a matching observeOn() with a customer Scheduler to ensure the thread isolation is preserved.

@benjchristensen
Copy link
Member Author

Hi Peter,

as long as all the events that trigger the callbacks come from the Vert.x event loop those calls are already serialized

This assumes that multiple network requests on different event loops are never happening. Perhaps in Vert.x this is prevented, but it is definitely not the case in many apps using NIO/Netty where multiple network calls on separate event loops can all be merged together.

it does imply that all code using RxJava in Vert.x does not use any other threads/schedulers that might cause it to block or switch threads.

I don't understand how synchronization/serialization relates to this comment, can you please elaborate?

this means that there is no "slow operation" that can happen via onNext, each processing pipeline should complete quickly (otherwise the event loop will starve other Verticles tied to the same Thread)

A pipeline processing quickly is not sufficient since Rx supports streams, not just scalar responses, thus a stream from one thread can starve out another stream on a different thread if merged together. Each onNext may be fast, but when merging 2 or more threads together, one thread that is emitting many onNext notifications rapidly (such as in a tight-loop) can result in other threads being blocked. Even if each onNext is fast, all other event loops can end up waiting for a non-deterministic time until that thread yields or is parked by the OS and lets other threads work.

We are seeking a solution where instead of the other thread(s) being blocked, they can asynchronously deliver their results and move on to continue processing other work (if they are event loops). To complete this solution we need back pressure (#1000) so as to park/pause streams that are too fast for the consumer (which in this case could just be the merge bottleneck).

this means that in Vert.x the short-circuit where no queue is created will always be executed and there will be minimal impact

If everything is always done on the same event-loop, yes, you would have the best-case scenario and should see little-to-no change.

It also means that Vert.x is relying on not introducing Concurrency without an explicitly provided Scheduler

This isn't quite right. Rx never injects concurrency without an operator asking for it, and in all those cases where it is required there is an overload that accepts a Scheduler to change the default. If an operator is used that requires concurrency though it uses a default Scheduler (such as Schedulers.computation()).

This is why I'm against using a separate thread in merge/serialize to drain, as it injects concurrency where it's not expected. Operators that do inject concurrency because it's needed by definition are things like observeOn, 'interval', 'timer', 'buffer' and window involving time, timeout, throttle, sample and surely others. All of these have a Scheduler scheduler overload.

If you want to force all of these to always run on the Vert.x event-loop(s) then you may want to leverage the RxJavaDefaultSchedulers plugin (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java#L187) to override the system defaults and make everything always run on a Vert.x event loop.

@petermd
Copy link
Contributor

petermd commented Apr 1, 2014

Hi Ben,

(aware this might be too Vert.x specific for this issue so if you'd prefer to move to a different issue / group thread just lmk)

This assumes that multiple network requests on different event loops are never happening. Perhaps in Vert.x this is prevented, but it is definitely not the case in many apps using NIO/Netty where multiple network calls on separate event loops can all be merged together.

Right. This is particular to Vert.x where all callbacks to requests from a Verticle instance are serialized back to that instance on the same event loop thread.

it does imply that all code using RxJava in Vert.x does not use any other threads/schedulers that might cause it to block or switch threads.

I don't understand how synchronization/serialization relates to this comment, can you please elaborate?

It doesn't directly relate, other than that both thread-stealing and injecting concurrency would most likely break in Vert.x. The current solution only works because the existing serialization prevents the problem case happening (something I wasn't 100% on and wanted to confirm).

We are seeking a solution where instead of the other thread(s) being blocked, they can asynchronously deliver their results and move on to continue processing other work (if they are event loops). To complete this solution we need back pressure (#1000) so as to park/pause streams that are too fast for the consumer (which in this case could just be the merge bottleneck).

Understood - and back pressure/throttling is the only real solution.

If everything is always done on the same event-loop, yes, you would have the best-case scenario and should see little-to-no change.

Yep. Just wanted to confirm that.

This isn't quite right. Rx never injects concurrency without an operator asking for it

That was my understanding, but my early reading of the original request was that it was being proposed as a solution - glad you've decided against it.

If you want to force all of these to always run on the Vert.x event-loop(s) then you may want to leverage the RxJavaDefaultSchedulers plugin (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java#L187) to override the system defaults and make everything always run on a Vert.x event loop.

The problem is that the blocking operations cannot run on the event loop thread, but the subscriptions would have to - and more specifically run on the same thread that triggered the operator (not any event loop thread).

I think that might require a custom scheduler that has to be passed into Operators and/or observeOn() (might use registerDefaultSchedulers() to block any code from using a default scheduler as a developer aid though)

@benjchristensen
Copy link
Member Author

The current solution only works because the existing serialization prevents the problem case happening (something I wasn't 100% on and wanted to confirm).

How is this different than if synchronization was used? In that case if multiple threads existed, each would push their notifications through.

In short, if inside a Vert.x vertical there is ever a case when multiple threads are being merged together, both serialize and synchronize would be problematic to Vert.x.

The problem is that the blocking operations cannot run on the event loop thread

What blocking operations are you referring to? The only blocking operations I'm aware of (after eliminating synchronize) are those inside BlockingObservable (which if anyone uses they get what they ask for).

@petermd
Copy link
Contributor

petermd commented Apr 1, 2014

How is this different than if synchronization was used? In that case if multiple threads existed, each would push their notifications through.
In short, if inside a Vert.x vertical there is ever a case when multiple threads are being merged together, both serialize and synchronize would be problematic to Vert.x.

The only case was where serialization always deferred to a seperate thread for delivery.

What blocking operations are you referring to? The only blocking operations I'm aware of (after eliminating synchronize) are those inside BlockingObservable (which if anyone uses they get what they ask for).

Indeed. Even a tight-loop or CPU intensive code would be problematic as the threading model means that occupying a thread will starve other operations that are pinned to the same thread (even if there are other cores available)

@benjchristensen
Copy link
Member Author

Even a tight-loop or CPU intensive code would be problematic

Yes, but this is typical of any event-loop system. The goal is for RxJava to have nothing that would block a thread (park/sleep/wait) but that does not prevent a developer from writing computationally intensive code that saturates a thread/loop.

If someone is going to do computationally intensive work they would need to use subscribeOn/observeOn to move it off then back on the event-loop, just as they would need to do background processing when interacting with Vert.x without Rx, correct?

@petermd
Copy link
Contributor

petermd commented Apr 1, 2014

Yes, but this is typical of any event-loop system. The goal is for RxJava to have nothing that would block a thread (park/sleep/wait) but that does not prevent a developer from writing computationally intensive code that saturates a thread/loop.

Splitting hairs at this stage - but yes. A system that pins work to the same thread is just more susceptible to becoming unbalanced vs one that runs events on a pool of threads.

If someone is going to do computationally intensive work they would need to use subscribeOn/observeOn to move it off then back on the event-loop, just as they would need to do background processing when interacting with Vert.x without Rx, correct?

Vert.x has a separate model for this - a WorkerVerticle. it uses a separate thread-pool and interfaces via the async event bus (so no additional thread synchronization is needed)

That said, it is quite coarse grained, so providing support for subscribeOn()/observeOn() might be worthwhile.

@benjchristensen
Copy link
Member Author

We have removed SynchronizedObserver and have been using SerializedObserver for several releases now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants