Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactivestreams StreamSubscriber blocks (creates thread) each time it is called by upstream #2628

Closed
bastewart opened this issue Sep 22, 2021 · 20 comments
Labels

Comments

@bastewart
Copy link
Contributor

I have not had time to prove this via a minimisation but this came from a discussion on discord and @djspiewak thinks this is correct.

StreamSubscriber, which implements reactivestreams Subscriber, uses dispatcher.unsafeRunSync each time it is called by the upstream source. Under cats-effect 3 this leads to a new thread (a HelperThread) being created as unsafeRunSync leads to a scala.concurrent.blocking call.

This means at least 2 new threads for each stream instance created (onSubscribe and onComplete) and potentially n for each element in the stream.

In real-terms this leads to a large number of threads being created in applications which use StreamSubscriber to convert a reactivestream to an fs2.Stream. For example the http4s JDK client will create 2+n threads for each response body it handles.

This was identified on fs2 3.1.2, cats-effect 3.2.8, cats 2.6.1 and Scala 2.13.6.

@bastewart bastewart added the bug label Sep 22, 2021
@vasilmkd
Copy link
Member

This is indeed correct and I'm working on resolving this. It is not exactly a bug, since not spawning threads can lead to a deadlock of the compute pool, but maybe it is worth being slightly less pedantic about it. For now, you can revert to cats-effect 3.2.7 which doesn't contain the change.

In terms of a better remedy, Cats Effect 3.3.0 will be the release with a comprehensive overhaul of the blocking support, but maybe I can backport some changes to 3.2.10 for example. I cannot promise anything though.

@bastewart
Copy link
Contributor Author

Yeah, agree on it not being a bug per-se, it's more a regression (I don't think I can choose the label though!).

Confirming what I said on discord but CE 3.2.7 still shows the same high rate of thread creation, so presumably has been around for longer than #2312.

There's no pressing need for any remedy from my end. It does seem to be something that's in consideration at the moment so happy to wait for CE 3.3.0!

@vasilmkd
Copy link
Member

vasilmkd commented Sep 22, 2021

The PR that you linked is not related to Cats Effect 3.2.7 and the PR predates the release. Otoh, CE2 did not show this behavior because there was no effort to respect scala.concurrent.blocking in CE2 at all. If you want to restore that behavior immediately, you can create a FixedThreadPoolExecutor and use it as the compute pool.

https://github.com/typelevel/cats-effect/blob/series/3.x/benchmarks/src/main/scala/cats/effect/benchmarks/WorkStealingBenchmark.scala

That executor is used by CE2.

@bastewart
Copy link
Contributor Author

bastewart commented Sep 22, 2021

Sorry, I linked the wrong PR. I forgot I wasn't in cats-effect so just linked the number, not the repo as well. I should have linked this which was introduced in 3.2.8 but doesn't seem to change behaviour from my end: typelevel/cats-effect#2312

If you want to restore that behavior immediately, you can create a FixedThreadPoolExecutor and use it as the compute pool.

Thanks, but I don't need to restore behaviour at all. I'm more worried about this as I presume this is the cause of a significant performance hit in an application (50%-100% higher CPU use) since updating to CE3. It creates ~600 threads per second which seems like the most likely cause [CE2 was < 10]. I have no need for a short-term fix though (higher CPU is fine, or we can rollback); I'm more concerned about the longer-term issues [that this feels like a regression and could be fixed in fs2 or ce] 👍

FWIW I just tested out 3.2.3 and it shows the same behaviour as well (that's the version I happened to start looking into thsi on, subsequently trying 3.2.8).

@bastewart
Copy link
Contributor Author

bastewart commented Sep 22, 2021

Oh, I suppose I could try that executor to see if it restores the performance to as it was before [as a confirmation of the cause of performance hit]. I'll try that tomorrow actually, thanks!

@bastewart
Copy link
Contributor Author

@vasilmkd as discussed on discord I had a play around with the code as above, but replacing the compute pool with either the scala global or a Java fixed thread pool:

    val compute = scala.concurrent.ExecutionContext.global

or

    val compute =
      ExecutionContext.fromExecutorService(
        java.util.concurrent.Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime.availableProcessors()))
      )

Interestingly the global pool did not show excess thread creation. I think you were expecting that it would create lots of threads? Unfortunately it didn't seem to help with the CE3 performance regression (which I've re-tested and actually "only" seems to be 25-50%, not 50-100%). Potentially the threads aren't the cause of that then.

The Java pool unfortunately led to a non-functional application. I wondered if it was because the app runs in kubernetes so set the fixed pool to the exact request CPU (2) and it still didn't work. I didn't have time to dig into why unfortunately!

@vasilmkd
Copy link
Member

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

All in all, the reactive streams interop will require changes. That's my opinion.

@bastewart
Copy link
Contributor Author

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

Ah, that makes sense, thanks.

All in all, the reactive streams interop will require changes. That's my opinion.

@SystemFw wondered if it needed a rethink on discord as well (before you joined the conversation).

Thanks for your help!

@notxcain
Copy link
Contributor

This should be fixed in #2632 on fs2 side.

@vasilmkd
Copy link
Member

@notxcain @bastewart Would you like me to release a snapshot version with these changes so that you can test whether this resolves the issues discussed here?

@mpilquist
Copy link
Member

All in all, the reactive streams interop will require changes. That's my opinion.

@vasilmkd Can you elaborate on this? The rx-stream interop was changed to not use a dispatcher, but I'm concerned that folks will have this same issue in other places where dispatchers are needed -- like in the output stream interop.

@vasilmkd
Copy link
Member

Dispatcher#unsafeRunSync() is implemented in terms of Await.result, which is wrapped in scala.concurrent.blocking. This signals any pool that respects this to spawn new threads in preparation, both the default CE3 pool and EC.global. There's not much we can do here. The solution is to avoid unsafeRunSync() on the compute pool, which is the age old recommendation. It solves both the safety and the bad performance issues.

@vasilmkd
Copy link
Member

scala.concurrent.blocking works fine in these cases, it just seems that people think that it somehow has no performance overhead, when it is in fact mainly a safety construct, while spawning threads, context switching and everything it entails are very expensive operations.

@bastewart
Copy link
Contributor Author

@notxcain @bastewart Would you like me to release a snapshot version with these changes so that you can test whether this resolves the issues discussed here?

If you don't mind that would be really helpful. If it's a pain I can probably build from source myself though. Thanks!

@vasilmkd
Copy link
Member

It should be this one "co.fs2" %% "fs2-core" % "3.1-11-d8073c7".

@bastewart
Copy link
Contributor Author

bastewart commented Sep 24, 2021

It should be this one "co.fs2" %% "fs2-core" % "3.1-11-d8073c7".

@vasilmkd really good news! This completely fixes the CPU penalty we were seeing in our application, halving CPU use compared to the current release. Thank you!

In terms of cats-effect 2 to cats-effect 3 comparison CE3 now comes out needing only 75% of the CPU (35% performance uplift) when compared to CE2, so that's pretty nice! With the bug it was (as mentioned previously) 50%-100% worse, hah.

E: Should go without saying it doesn't spawn so many threads anymore as well.

@vasilmkd
Copy link
Member

That is awesome news!

@vasilmkd
Copy link
Member

Addressed in #2632.

@SystemFw
Copy link
Collaborator

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

All in all, the reactive streams interop will require changes. That's my opinion.

First of all, awesome to hear that this is fixed :)
Now, I need to look at the reactive spec again, but iirc without unsafeRunSync their test suite fails, since there are cases like cancel ; complete which assert that complete shouldn't execute (or something along those lines), and if cancel isn't synchronous, you can't guarantee that. It's been a few years since I've looked at it though

@vasilmkd
Copy link
Member

I assume since the PRs were merged for both 2.5.x and main, they passed the CI. 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants