Skip to content

Commit

Permalink
Allow queue to be used for task finished. (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Jul 12, 2024
1 parent 3b2ccdd commit b150514
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 116 deletions.
99 changes: 0 additions & 99 deletions fixtures/a_condition.rb

This file was deleted.

103 changes: 103 additions & 0 deletions fixtures/async/a_condition.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2022, by Samuel Williams.

require 'async/variable'

module Async
ACondition = Sus::Shared("a condition") do
let(:condition) {subject.new}

it 'can signal waiting task' do
state = nil

reactor.async do
state = :waiting
condition.wait
state = :resumed
end

expect(state).to be == :waiting

condition.signal

reactor.yield

expect(state).to be == :resumed
end

it 'should be able to signal stopped task' do
expect(condition).to be(:empty?)
expect(condition).not.to be(:waiting?)

task = reactor.async do
condition.wait
end

expect(condition).not.to be(:empty?)
expect(condition).to be(:waiting?)

task.stop

condition.signal
end

it 'resumes tasks in order' do
order = []

5.times do |i|
task = reactor.async do
condition.wait
order << i
end
end

condition.signal

reactor.yield

expect(order).to be == [0, 1, 2, 3, 4]
end

with "timeout" do
let(:ready) {Async::Variable.new(condition)}
let(:waiting) {Async::Variable.new(subject.new)}

def before
@state = nil

@task = reactor.async do |task|
task.with_timeout(0.01) do
begin
@state = :waiting
waiting.resolve

ready.wait
@state = :signalled
rescue Async::TimeoutError
@state = :timeout
end
end
end

super
end

it 'can timeout while waiting' do
@task.wait

expect(@state).to be == :timeout
end

it 'can signal while waiting' do
waiting.wait
ready.resolve

@task.wait

expect(@state).to be == :signalled
end
end
end
end
8 changes: 6 additions & 2 deletions lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ def wait
end
end

# Is any fiber waiting on this notification?
# @returns [Boolean]
# @deprecated Replaced by {#waiting?}
def empty?
@waiting.empty?
end

# @returns [Boolean] Is any fiber waiting on this notification?
def waiting?
@waiting.size > 0
end

# Signal to a given task that it should resume operations.
# @parameter value [Object | Nil] The value to return to the waiting fibers.
def signal(value = nil)
Expand Down
35 changes: 24 additions & 11 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@

module Async
# A queue which allows items to be processed in order.
#
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
#
# @public Since `stable-v1`.
class Queue < Notification
class Queue
# Create a new queue.
#
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
def initialize(parent: nil)
super()

# @parameter available [Notification] The notification to use for signaling when items are available.
def initialize(parent: nil, available: Notification.new)
@items = []
@parent = parent
@available = available
end

# @attribute [Array] The items in the queue.
Expand All @@ -38,20 +41,20 @@ def empty?
def <<(item)
@items << item

self.signal unless self.empty?
@available.signal unless self.empty?
end

# Add multiple items to the queue.
def enqueue(*items)
@items.concat(items)

self.signal unless self.empty?
@available.signal unless self.empty?
end

# Remove and return the next item from the queue.
def dequeue
while @items.empty?
self.wait
@available.wait
end

@items.shift
Expand All @@ -77,6 +80,16 @@ def each
yield item
end
end

# Signal the queue with a value, the same as {#enqueue}.
def signal(value)
self.enqueue(value)
end

# Wait for an item to be available, the same as {#dequeue}.
def wait
self.dequeue
end
end

# A queue which limits the number of items that can be enqueued.
Expand All @@ -85,12 +98,12 @@ class LimitedQueue < Queue
# Create a new limited queue.
#
# @parameter limit [Integer] The maximum number of items that can be enqueued.
def initialize(limit = 1, **options)
# @parameter full [Notification] The notification to use for signaling when the queue is full.
def initialize(limit = 1, full: Notification.new, **options)
super(**options)

@limit = limit

@full = Notification.new
@full = full
end

# @attribute [Integer] The maximum number of items that can be enqueued.
Expand Down Expand Up @@ -128,7 +141,7 @@ def enqueue(*items)
available = @limit - @items.size
@items.concat(items.shift(available))

self.signal unless self.empty?
@available.signal unless self.empty?
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
require 'sus/fixtures/async'
require 'async/condition'

require 'a_condition'
require 'async/a_condition'

describe Async::Condition do
include Sus::Fixtures::Async::ReactorContext
Expand Down Expand Up @@ -52,5 +52,5 @@
expect(consumer.status).to be == :completed
end

it_behaves_like ACondition
it_behaves_like Async::ACondition
end
4 changes: 2 additions & 2 deletions test/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
require 'sus/fixtures/async'
require 'async/notification'

require 'a_condition'
require 'async/a_condition'

describe Async::Notification do
include Sus::Fixtures::Async::ReactorContext
Expand Down Expand Up @@ -47,5 +47,5 @@
]
end

it_behaves_like ACondition
it_behaves_like Async::ACondition
end
32 changes: 32 additions & 0 deletions test/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,44 @@
end
end

with '#signal' do
it 'can signal with an item' do
queue.signal(:item)
expect(queue.dequeue).to be == :item
end
end

with '#wait' do
it 'can wait for an item' do
reactor.async do |task|
queue.enqueue(:item)
end

expect(queue.wait).to be == :item
end
end

with 'an empty queue' do
it "is expected to be empty" do
expect(queue).to be(:empty?)
end
end

with 'task finishing queue' do
it 'can signal task completion' do
3.times do
Async(finished: queue) do
:result
end
end

3.times do
task = queue.dequeue
expect(task.wait).to be == :result
end
end
end

with 'semaphore' do
let(:capacity) {2}
let(:semaphore) {Async::Semaphore.new(capacity)}
Expand Down

0 comments on commit b150514

Please sign in to comment.