Improve Stream from Cats Effect Queue performance #2521
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I was interfacing with some external methods following the documentation example using cats.effect.std.Queue and was surprised at the extreme slowness of the code. It seems I accidentally came across a bottleneck in the current algorithm that makes it accidentally quadratic whenever the producer submits elements very fast.
In particular, the current code performs a Chunk concatenation via
++
in order to build a chunk as large as possible with the elements available in the queue. This seems to be a massive slowdown in case the chunk gets concatenated a large amount of times (as low as n >= 10). Mainly because all the Chunk methods are using theapply(Int)
method to access the elements, which has complexity O(n) (n being the index) in the case of a queue due to its List-like structure in the case of Singletons or smaller chunks. This means methods become accidentally quadratic with this approach.To mitigate this I made a specialization for the typical case of building from a queue of single elements which will build the chunk only once after building a backing
Buffer
with the same algorithm as before.To test it out I made a benchmark and tested the previous and new approach. Speedups are quite impressive with larger values of chunks. The JMH parameters were
jmh:run -i 3 -wi 3 -f3 -t3 .*StreamBenchmark.*queue.*
Before
After
Oddly enough this PR also would relax the implicit requirements on the
Stream.fromQueueNoneTerminated
method by not needing theFunctor
implicit, but due to binary compatibility I didn't dare to remove it