diff --git a/fixtures/a_condition.rb b/fixtures/a_condition.rb deleted file mode 100644 index fa090660..00000000 --- a/fixtures/a_condition.rb +++ /dev/null @@ -1,99 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2018-2022, by Samuel Williams. - -require 'async/variable' - -ACondition = Sus::Shared("a condition") do - let(:condition) {subject.new} - - it 'can signal waiting task' do - state = nil - - reactor.async do - state = :waiting - condition.wait - state = :resumed - end - - expect(state).to be == :waiting - - condition.signal - - reactor.yield - - expect(state).to be == :resumed - end - - it 'should be able to signal stopped task' do - expect(condition).to be(:empty?) - - task = reactor.async do - condition.wait - end - - expect(condition).not.to be(:empty?) - - task.stop - - condition.signal - end - - it 'resumes tasks in order' do - order = [] - - 5.times do |i| - task = reactor.async do - condition.wait - order << i - end - end - - condition.signal - - reactor.yield - - expect(order).to be == [0, 1, 2, 3, 4] - end - - with "timeout" do - let(:ready) {Async::Variable.new(condition)} - let(:waiting) {Async::Variable.new(subject.new)} - - def before - @state = nil - - @task = reactor.async do |task| - task.with_timeout(0.01) do - begin - @state = :waiting - waiting.resolve - - ready.wait - @state = :signalled - rescue Async::TimeoutError - @state = :timeout - end - end - end - - super - end - - it 'can timeout while waiting' do - @task.wait - - expect(@state).to be == :timeout - end - - it 'can signal while waiting' do - waiting.wait - ready.resolve - - @task.wait - - expect(@state).to be == :signalled - end - end -end diff --git a/fixtures/async/a_condition.rb b/fixtures/async/a_condition.rb new file mode 100644 index 00000000..cdff9a91 --- /dev/null +++ b/fixtures/async/a_condition.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2022, by Samuel Williams. + +require 'async/variable' + +module Async + ACondition = Sus::Shared("a condition") do + let(:condition) {subject.new} + + it 'can signal waiting task' do + state = nil + + reactor.async do + state = :waiting + condition.wait + state = :resumed + end + + expect(state).to be == :waiting + + condition.signal + + reactor.yield + + expect(state).to be == :resumed + end + + it 'should be able to signal stopped task' do + expect(condition).to be(:empty?) + expect(condition).not.to be(:waiting?) + + task = reactor.async do + condition.wait + end + + expect(condition).not.to be(:empty?) + expect(condition).to be(:waiting?) + + task.stop + + condition.signal + end + + it 'resumes tasks in order' do + order = [] + + 5.times do |i| + task = reactor.async do + condition.wait + order << i + end + end + + condition.signal + + reactor.yield + + expect(order).to be == [0, 1, 2, 3, 4] + end + + with "timeout" do + let(:ready) {Async::Variable.new(condition)} + let(:waiting) {Async::Variable.new(subject.new)} + + def before + @state = nil + + @task = reactor.async do |task| + task.with_timeout(0.01) do + begin + @state = :waiting + waiting.resolve + + ready.wait + @state = :signalled + rescue Async::TimeoutError + @state = :timeout + end + end + end + + super + end + + it 'can timeout while waiting' do + @task.wait + + expect(@state).to be == :timeout + end + + it 'can signal while waiting' do + waiting.wait + ready.resolve + + @task.wait + + expect(@state).to be == :signalled + end + end + end +end diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 703e8be4..c3577ff8 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -40,12 +40,16 @@ def wait end end - # Is any fiber waiting on this notification? - # @returns [Boolean] + # @deprecated Replaced by {#waiting?} def empty? @waiting.empty? end + # @returns [Boolean] Is any fiber waiting on this notification? + def waiting? + @waiting.size > 0 + end + # Signal to a given task that it should resume operations. # @parameter value [Object | Nil] The value to return to the waiting fibers. def signal(value = nil) diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 77a8f9ad..439143f8 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -9,16 +9,19 @@ module Async # A queue which allows items to be processed in order. + # + # It has a compatible interface with {Notification} and {Condition}, except that it's multi-value. + # # @public Since `stable-v1`. - class Queue < Notification + class Queue # Create a new queue. # # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. - def initialize(parent: nil) - super() - + # @parameter available [Notification] The notification to use for signaling when items are available. + def initialize(parent: nil, available: Notification.new) @items = [] @parent = parent + @available = available end # @attribute [Array] The items in the queue. @@ -38,20 +41,20 @@ def empty? def <<(item) @items << item - self.signal unless self.empty? + @available.signal unless self.empty? end # Add multiple items to the queue. def enqueue(*items) @items.concat(items) - self.signal unless self.empty? + @available.signal unless self.empty? end # Remove and return the next item from the queue. def dequeue while @items.empty? - self.wait + @available.wait end @items.shift @@ -77,6 +80,16 @@ def each yield item end end + + # Signal the queue with a value, the same as {#enqueue}. + def signal(value) + self.enqueue(value) + end + + # Wait for an item to be available, the same as {#dequeue}. + def wait + self.dequeue + end end # A queue which limits the number of items that can be enqueued. @@ -85,12 +98,12 @@ class LimitedQueue < Queue # Create a new limited queue. # # @parameter limit [Integer] The maximum number of items that can be enqueued. - def initialize(limit = 1, **options) + # @parameter full [Notification] The notification to use for signaling when the queue is full. + def initialize(limit = 1, full: Notification.new, **options) super(**options) @limit = limit - - @full = Notification.new + @full = full end # @attribute [Integer] The maximum number of items that can be enqueued. @@ -128,7 +141,7 @@ def enqueue(*items) available = @limit - @items.size @items.concat(items.shift(available)) - self.signal unless self.empty? + @available.signal unless self.empty? end end diff --git a/test/async/condition.rb b/test/async/condition.rb index 1e7a5401..cba08188 100644 --- a/test/async/condition.rb +++ b/test/async/condition.rb @@ -7,7 +7,7 @@ require 'sus/fixtures/async' require 'async/condition' -require 'a_condition' +require 'async/a_condition' describe Async::Condition do include Sus::Fixtures::Async::ReactorContext @@ -52,5 +52,5 @@ expect(consumer.status).to be == :completed end - it_behaves_like ACondition + it_behaves_like Async::ACondition end diff --git a/test/async/notification.rb b/test/async/notification.rb index 846d853b..2cb24069 100644 --- a/test/async/notification.rb +++ b/test/async/notification.rb @@ -6,7 +6,7 @@ require 'sus/fixtures/async' require 'async/notification' -require 'a_condition' +require 'async/a_condition' describe Async::Notification do include Sus::Fixtures::Async::ReactorContext @@ -47,5 +47,5 @@ ] end - it_behaves_like ACondition + it_behaves_like Async::ACondition end diff --git a/test/async/queue.rb b/test/async/queue.rb index a8fb82e6..a2a4d204 100644 --- a/test/async/queue.rb +++ b/test/async/queue.rb @@ -87,12 +87,44 @@ end end + with '#signal' do + it 'can signal with an item' do + queue.signal(:item) + expect(queue.dequeue).to be == :item + end + end + + with '#wait' do + it 'can wait for an item' do + reactor.async do |task| + queue.enqueue(:item) + end + + expect(queue.wait).to be == :item + end + end + with 'an empty queue' do it "is expected to be empty" do expect(queue).to be(:empty?) end end + with 'task finishing queue' do + it 'can signal task completion' do + 3.times do + Async(finished: queue) do + :result + end + end + + 3.times do + task = queue.dequeue + expect(task.wait).to be == :result + end + end + end + with 'semaphore' do let(:capacity) {2} let(:semaphore) {Async::Semaphore.new(capacity)}