Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async dropping queue #4143

Merged
merged 1 commit into from
Oct 25, 2024

Conversation

iRevive
Copy link
Contributor

@iRevive iRevive commented Sep 25, 2024

Closes #4141.

Benchmarks:

Throughput              Async                       Concurrent
enqueueDequeueMany      281.571 ± 4.388 ops/s       144.713 ± 2.057 ops/s
enqueueDequeueOne       281.266 ± 4.599 ops/s       173.974 ± 3.129 ops/s

GC alloc rate           Async                       Concurrent
enqueueDequeueMany      3273.366 ± 51.235 MB/sec    5860.556 ±  82.691 MB/sec
enqueueDequeueOne       3058.866 ± 50.114 MB/sec    6784.491 ± 123.139 MB/sec

GC alloc rate norm      Async                       Concurrent
enqueueDequeueMany      12191147.081 ±  2.533 B/op  42468767.945 ± 65.343 B/op
enqueueDequeueOne       11404702.192 ± 11.867 B/op  40895828.996 ±  4.205 B/op

GC count                Async                       Concurrent
enqueueDequeueMany      898.000  counts             771.000  counts
enqueueDequeueOne       1030.000 counts             1315.000 counts

GC time                 Async                       Concurrent
enqueueDequeueMany      519.000 ms                  1130.000 ms
enqueueDequeueOne       578.000 ms                  771.000  ms

@@ -418,7 +418,11 @@ class UnboundedQueueSpec extends BaseSpec with QueueTests[Queue] {
class DroppingQueueSpec extends BaseSpec with QueueTests[Queue] {
sequential

"DroppingQueue" should {
"DroppingQueue (concurrent)" should {
droppingQueueTests(i => if (i < 1) Queue.dropping(i) else Queue.droppingForConcurrent(i))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

droppingForConcurrent doesn't enforce capacity constraints because it's used internally.

@iRevive iRevive force-pushed the async-dropping-queue branch from d7d2039 to 9a7fc5b Compare September 25, 2024 08:59
extends BaseBoundedAsyncQueue[F, A](capacity) {

def offer(a: A): F[Unit] =
F.uncancelable { _ =>
Copy link
Contributor Author

@iRevive iRevive Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fiber run loop handles the cancellation. The block has exactly one synchronous operation F.delay. The delay operation cannot be canceled in the middle, right?
So, is F.uncancelable necessary here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delay operation cannot be canceled in the middle, right?

That's right. uncancelable is not needed here :)

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is excellent, thank you!

@djspiewak djspiewak merged commit 0600ea6 into typelevel:series/3.x Oct 25, 2024
29 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dropping queue high memory usage
3 participants