Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Stream from Cats Effect Queue performance #2521

Merged
merged 4 commits into from
Aug 4, 2021

Conversation

jordiolivares
Copy link
Contributor

I was interfacing with some external methods following the documentation example using cats.effect.std.Queue and was surprised at the extreme slowness of the code. It seems I accidentally came across a bottleneck in the current algorithm that makes it accidentally quadratic whenever the producer submits elements very fast.

In particular, the current code performs a Chunk concatenation via ++ in order to build a chunk as large as possible with the elements available in the queue. This seems to be a massive slowdown in case the chunk gets concatenated a large amount of times (as low as n >= 10). Mainly because all the Chunk methods are using the apply(Int) method to access the elements, which has complexity O(n) (n being the index) in the case of a queue due to its List-like structure in the case of Singletons or smaller chunks. This means methods become accidentally quadratic with this approach.

To mitigate this I made a specialization for the typical case of building from a queue of single elements which will build the chunk only once after building a backing Buffer with the same algorithm as before.

To test it out I made a benchmark and tested the previous and new approach. Speedups are quite impressive with larger values of chunks. The JMH parameters were jmh:run -i 3 -wi 3 -f3 -t3 .*StreamBenchmark.*queue.*

Before

[info] Benchmark               (n)   Mode  Cnt      Score     Error  Units
[info] StreamBenchmark.queue     1  thrpt    9  63732.239 ± 1684.040  ops/s
[info] StreamBenchmark.queue    10  thrpt    9  13753.944 ± 969.140  ops/s
[info] StreamBenchmark.queue   100  thrpt    9    170.208 ±   3.808  ops/s
[info] StreamBenchmark.queue  1000  thrpt    9      2.302 ±   0.067  ops/s

Note that the n = 10000 case was killed as it was being killed
by the timeout of JMH set at 10 minutes by default

After

[info] Benchmark                (n)   Mode  Cnt      Score     Error  Units
[info] StreamBenchmark.queue      1  thrpt    9  68063.002 ± 607.598  ops/s
[info] StreamBenchmark.queue     10  thrpt    9  23959.130 ± 327.189  ops/s
[info] StreamBenchmark.queue    100  thrpt    9   2839.898 ±  65.819  ops/s
[info] StreamBenchmark.queue   1000  thrpt    9    211.958 ±  11.139  ops/s
[info] StreamBenchmark.queue  10000  thrpt    9      7.345 ±   0.514  ops/s

Oddly enough this PR also would relax the implicit requirements on the Stream.fromQueueNoneTerminated method by not needing the Functor implicit, but due to binary compatibility I didn't dare to remove it

@jordiolivares jordiolivares changed the title Improve Stream from Cats Effect Queue Improve Stream from Cats Effect Queue performance Aug 1, 2021
@mpilquist
Copy link
Member

Awesome work, thanks!

@jordiolivares
Copy link
Contributor Author

Seems like I didn't think of Scala 2.12 when using the Builder, let me quickly fix it

Copy link
Collaborator

@SystemFw SystemFw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for this!

@SystemFw SystemFw merged commit cb074ba into typelevel:main Aug 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants