-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reactivestreams StreamSubscriber
blocks (creates thread) each time it is called by upstream
#2628
Comments
This is indeed correct and I'm working on resolving this. It is not exactly a bug, since not spawning threads can lead to a deadlock of the compute pool, but maybe it is worth being slightly less pedantic about it. For now, you can revert to In terms of a better remedy, Cats Effect |
Yeah, agree on it not being a bug per-se, it's more a regression (I don't think I can choose the label though!). Confirming what I said on discord but CE There's no pressing need for any remedy from my end. It does seem to be something that's in consideration at the moment so happy to wait for CE |
The PR that you linked is not related to Cats Effect 3.2.7 and the PR predates the release. Otoh, CE2 did not show this behavior because there was no effort to respect That executor is used by CE2. |
Sorry, I linked the wrong PR. I forgot I wasn't in
Thanks, but I don't need to restore behaviour at all. I'm more worried about this as I presume this is the cause of a significant performance hit in an application (50%-100% higher CPU use) since updating to CE3. It creates ~600 threads per second which seems like the most likely cause [CE2 was < 10]. I have no need for a short-term fix though (higher CPU is fine, or we can rollback); I'm more concerned about the longer-term issues [that this feels like a regression and could be fixed in fs2 or ce] 👍 FWIW I just tested out |
Oh, I suppose I could try that executor to see if it restores the performance to as it was before [as a confirmation of the cause of performance hit]. I'll try that tomorrow actually, thanks! |
@vasilmkd as discussed on discord I had a play around with the code as above, but replacing the compute pool with either the scala global or a Java fixed thread pool: val compute = scala.concurrent.ExecutionContext.global or val compute =
ExecutionContext.fromExecutorService(
java.util.concurrent.Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime.availableProcessors()))
) Interestingly the The Java pool unfortunately led to a non-functional application. I wondered if it was because the app runs in kubernetes so set the fixed pool to the exact request CPU ( |
The Java pool gets blocked because all threads are blocked on All in all, the reactive streams interop will require changes. That's my opinion. |
Ah, that makes sense, thanks.
@SystemFw wondered if it needed a rethink on discord as well (before you joined the conversation). Thanks for your help! |
This should be fixed in #2632 on fs2 side. |
@notxcain @bastewart Would you like me to release a snapshot version with these changes so that you can test whether this resolves the issues discussed here? |
@vasilmkd Can you elaborate on this? The rx-stream interop was changed to not use a dispatcher, but I'm concerned that folks will have this same issue in other places where dispatchers are needed -- like in the output stream interop. |
|
|
If you don't mind that would be really helpful. If it's a pain I can probably build from source myself though. Thanks! |
It should be this one |
@vasilmkd really good news! This completely fixes the CPU penalty we were seeing in our application, halving CPU use compared to the current release. Thank you! In terms of cats-effect 2 to cats-effect 3 comparison CE3 now comes out needing only 75% of the CPU (35% performance uplift) when compared to CE2, so that's pretty nice! With the bug it was (as mentioned previously) 50%-100% worse, hah. E: Should go without saying it doesn't spawn so many threads anymore as well. |
That is awesome news! |
Addressed in #2632. |
First of all, awesome to hear that this is fixed :) |
I assume since the PRs were merged for both 2.5.x and main, they passed the CI. 😅 |
I have not had time to prove this via a minimisation but this came from a discussion on discord and @djspiewak thinks this is correct.
StreamSubscriber
, which implements reactivestreamsSubscriber
, usesdispatcher.unsafeRunSync
each time it is called by the upstream source. Under cats-effect 3 this leads to a new thread (aHelperThread
) being created asunsafeRunSync
leads to ascala.concurrent.blocking
call.This means at least 2 new threads for each stream instance created (
onSubscribe
andonComplete
) and potentiallyn
for each element in the stream.In real-terms this leads to a large number of threads being created in applications which use
StreamSubscriber
to convert a reactivestream to anfs2.Stream
. For example the http4s JDK client will create2+n
threads for each response body it handles.This was identified on fs2
3.1.2
, cats-effect3.2.8
, cats2.6.1
and Scala2.13.6
.The text was updated successfully, but these errors were encountered: