diff --git a/README.adoc b/README.adoc index e704c168..0250dbe3 100644 --- a/README.adoc +++ b/README.adoc @@ -89,6 +89,25 @@ end ---- +=== Configuration + +[source,ruby] +---- +Sidekiq::Throttled.configure do |config| + # Period in seconds to exclude queue from polling in case it returned + # {config.cooldown_threshold} amount of throttled jobs in a row. Set + # this value to `nil` to disable cooldown manager completely. + # Default: 2.0 + config.cooldown_period = 2.0 + + # Exclude queue from polling after it returned given amount of throttled + # jobs in a row. + # Default: 1 (cooldown after first throttled job) + config.cooldown_threshold = 1 +end +---- + + === Observer You can specify an observer that will be called on throttling. To do so pass an diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 5bac71d8..e3e1c76d 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -2,11 +2,13 @@ require "sidekiq" -require_relative "./throttled/version" -require_relative "./throttled/patches/basic_fetch" -require_relative "./throttled/registry" +require_relative "./throttled/config" +require_relative "./throttled/cooldown" require_relative "./throttled/job" require_relative "./throttled/middleware" +require_relative "./throttled/patches/basic_fetch" +require_relative "./throttled/registry" +require_relative "./throttled/version" require_relative "./throttled/worker" # @see https://github.com/mperham/sidekiq/ @@ -39,7 +41,35 @@ module Sidekiq # end # end module Throttled + MUTEX = Mutex.new + private_constant :MUTEX + + @config = Config.new.freeze + @cooldown = Cooldown[@config] + class << self + # @api internal + # + # @return [Cooldown, nil] + attr_reader :cooldown + + # @example + # Sidekiq::Throttled.configure do |config| + # config.cooldown_period = nil # Disable queues cooldown manager + # end + # + # @yieldparam config [Config] + def configure + MUTEX.synchronize do + config = @config.dup + + yield config + + @config = config.freeze + @cooldown = Cooldown[@config] + end + end + def setup! Sidekiq.configure_server do |config| config.server_middleware do |chain| diff --git a/lib/sidekiq/throttled/config.rb b/lib/sidekiq/throttled/config.rb new file mode 100644 index 00000000..d75a1195 --- /dev/null +++ b/lib/sidekiq/throttled/config.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + # Configuration object. + class Config + # Period in seconds to exclude queue from polling in case it returned + # {#cooldown_threshold} amount of throttled jobs in a row. + # + # Set this to `nil` to disable cooldown completely. + # + # @return [Float, nil] + attr_reader :cooldown_period + + # Amount of throttled jobs returned from the queue subsequently after + # which queue will be excluded from polling for the durations of + # {#cooldown_period}. + # + # @return [Integer] + attr_reader :cooldown_threshold + + def initialize + @cooldown_period = 2.0 + @cooldown_threshold = 1 + end + + # @!attribute [w] cooldown_period + def cooldown_period=(value) + raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float) + raise ArgumentError, "period must be positive" unless value.nil? || value.positive? + + @cooldown_period = value + end + + # @!attribute [w] cooldown_threshold + def cooldown_threshold=(value) + raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer) + raise ArgumentError, "threshold must be positive" unless value.positive? + + @cooldown_threshold = value + end + end + end +end diff --git a/lib/sidekiq/throttled/cooldown.rb b/lib/sidekiq/throttled/cooldown.rb new file mode 100644 index 00000000..11f38bda --- /dev/null +++ b/lib/sidekiq/throttled/cooldown.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require "concurrent" + +require_relative "./expirable_set" + +module Sidekiq + module Throttled + # @api internal + # + # Queues cooldown manager. Tracks list of queues that should be temporarily + # (for the duration of {Config#cooldown_period}) excluded from polling. + class Cooldown + class << self + # Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`. + # + # @param config [Config] + # @return [Cooldown, nil] + def [](config) + new(config) if config.cooldown_period + end + end + + # @param config [Config] + def initialize(config) + @queues = ExpirableSet.new(config.cooldown_period) + @threshold = config.cooldown_threshold + @tracker = Concurrent::Map.new + end + + # Notify that given queue returned job that was throttled. + # + # @param queue [String] + # @return [void] + def notify_throttled(queue) + @queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) + end + + # Notify that given queue returned job that was not throttled. + # + # @param queue [String] + # @return [void] + def notify_admitted(queue) + @tracker.delete(queue) + end + + # List of queues that should not be polled + # + # @return [Array] + def queues + @queues.to_a + end + end + end +end diff --git a/lib/sidekiq/throttled/expirable_set.rb b/lib/sidekiq/throttled/expirable_set.rb new file mode 100644 index 00000000..4242bf0d --- /dev/null +++ b/lib/sidekiq/throttled/expirable_set.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +require "concurrent" + +module Sidekiq + module Throttled + # @api internal + # + # Set of elements with expirations. + # + # @example + # set = ExpirableSet.new(10.0) + # set.add("a") + # sleep(5) + # set.add("b") + # set.to_a # => ["a", "b"] + # sleep(5) + # set.to_a # => ["b"] + class ExpirableSet + include Enumerable + + # @param ttl [Float] expiration is seconds + # @raise [ArgumentError] if `ttl` is not positive Float + def initialize(ttl) + raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive? + + @elements = Concurrent::Map.new + @ttl = ttl + end + + # @param element [Object] + # @return [ExpirableSet] self + def add(element) + # cleanup expired elements to avoid mem-leak + horizon = now + expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) } + expired.each { |pair| @elements.delete_pair(*pair) } + + # add new element + @elements[element] = now + @ttl + + self + end + + # @yield [Object] Gives each live (not expired) element to the block + def each + return to_enum __method__ unless block_given? + + horizon = now + + @elements.each_pair do |element, sunset| + yield element unless expired?(sunset, horizon) + end + + self + end + + private + + # @return [Float] + def now + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end + + def expired?(sunset, horizon) + sunset <= horizon + end + end + end +end diff --git a/lib/sidekiq/throttled/job.rb b/lib/sidekiq/throttled/job.rb index d7401333..dca50d72 100644 --- a/lib/sidekiq/throttled/job.rb +++ b/lib/sidekiq/throttled/job.rb @@ -29,8 +29,8 @@ module Job # in order to make API inline with `include Sidekiq::Job`. # # @private - def self.included(worker) - worker.send(:extend, ClassMethods) + def self.included(base) + base.extend(ClassMethods) end # Helper methods added to the singleton class of destination diff --git a/lib/sidekiq/throttled/patches/basic_fetch.rb b/lib/sidekiq/throttled/patches/basic_fetch.rb index 603db222..0f933dc8 100644 --- a/lib/sidekiq/throttled/patches/basic_fetch.rb +++ b/lib/sidekiq/throttled/patches/basic_fetch.rb @@ -14,10 +14,13 @@ def retrieve_work work = super if work && Throttled.throttled?(work.job) + Throttled.cooldown&.notify_throttled(work.queue) requeue_throttled(work) return nil end + Throttled.cooldown&.notify_admitted(work.queue) if work + work end @@ -33,6 +36,15 @@ def retrieve_work def requeue_throttled(work) redis { |conn| conn.lpush(work.queue, work.job) } end + + # Returns list of queues to try to fetch jobs from. + # + # @note It may return an empty array. + # @param [Array] queues + # @return [Array] + def queues_cmd + super - (Throttled.cooldown&.queues || []) + end end end end diff --git a/rubocop/rspec.yml b/rubocop/rspec.yml index c129a663..8ca037ed 100644 --- a/rubocop/rspec.yml +++ b/rubocop/rspec.yml @@ -2,6 +2,10 @@ RSpec/ExampleLength: Enabled: true Max: 10 +RSpec/ExpectChange: + Enabled: true + EnforcedStyle: block + RSpec/MultipleExpectations: Enabled: false diff --git a/sidekiq-throttled.gemspec b/sidekiq-throttled.gemspec index 13159a5c..467217f3 100644 --- a/sidekiq-throttled.gemspec +++ b/sidekiq-throttled.gemspec @@ -32,6 +32,7 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 3.0" + spec.add_runtime_dependency "concurrent-ruby", ">= 1.2.0" spec.add_runtime_dependency "redis-prescription", "~> 2.2" spec.add_runtime_dependency "sidekiq", ">= 7.0" end diff --git a/spec/lib/sidekiq/throttled/config_spec.rb b/spec/lib/sidekiq/throttled/config_spec.rb new file mode 100644 index 00000000..6503f5f7 --- /dev/null +++ b/spec/lib/sidekiq/throttled/config_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/config" + +RSpec.describe Sidekiq::Throttled::Config do + subject(:config) { described_class.new } + + describe "#cooldown_period" do + subject { config.cooldown_period } + + it { is_expected.to eq 2.0 } + end + + describe "#cooldown_period=" do + it "updates #cooldown_period" do + expect { config.cooldown_period = 42.0 } + .to change { config.cooldown_period }.to(42.0) + end + + it "allows setting value to `nil`" do + expect { config.cooldown_period = nil } + .to change { config.cooldown_period }.to(nil) + end + + it "fails if given value is neither `NilClass` nor `Float`" do + expect { config.cooldown_period = 42 } + .to raise_error(TypeError, %r{unexpected type}) + end + + it "fails if given value is not positive" do + expect { config.cooldown_period = 0.0 } + .to raise_error(ArgumentError, %r{must be positive}) + end + end + + describe "#cooldown_threshold" do + subject { config.cooldown_threshold } + + it { is_expected.to eq 1 } + end + + describe "#cooldown_threshold=" do + it "updates #cooldown_threshold" do + expect { config.cooldown_threshold = 42 } + .to change { config.cooldown_threshold }.to(42) + end + + it "fails if given value is not `Integer`" do + expect { config.cooldown_threshold = 42.0 } + .to raise_error(TypeError, %r{unexpected type}) + end + + it "fails if given value is not positive" do + expect { config.cooldown_threshold = 0 } + .to raise_error(ArgumentError, %r{must be positive}) + end + end +end diff --git a/spec/lib/sidekiq/throttled/cooldown_spec.rb b/spec/lib/sidekiq/throttled/cooldown_spec.rb new file mode 100644 index 00000000..9472259b --- /dev/null +++ b/spec/lib/sidekiq/throttled/cooldown_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/cooldown" + +RSpec.describe Sidekiq::Throttled::Cooldown do + subject(:cooldown) { described_class.new(config) } + + let(:config) { Sidekiq::Throttled::Config.new } + + describe ".[]" do + subject { described_class[config] } + + it { is_expected.to be_an_instance_of described_class } + + context "when `cooldown_period` is nil" do + before { config.cooldown_period = nil } + + it { is_expected.to be_nil } + end + end + + describe "#notify_throttled" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "marks queue for exclusion once threshold is met" do + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#notify_admitted" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "resets threshold counter" do + cooldown.notify_admitted("queue:at_the_end_of") + + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#queues" do + before do + config.cooldown_period = 1.0 + config.cooldown_threshold = 1 + end + + it "keeps queue in the exclusion list for the duration of cooldown_period" do + monotonic_time = 0.0 + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + cooldown.notify_throttled("queue:at_the_end_of") + + monotonic_time += 0.9 + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:at_the_end_of", "queue:the_longest_line") + + monotonic_time += 0.1 + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + + monotonic_time += 1.0 + expect(cooldown.queues).to be_empty + end + end +end diff --git a/spec/lib/sidekiq/throttled/expirable_set_spec.rb b/spec/lib/sidekiq/throttled/expirable_set_spec.rb new file mode 100644 index 00000000..465c7ba9 --- /dev/null +++ b/spec/lib/sidekiq/throttled/expirable_set_spec.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/expirable_set" + +RSpec.describe Sidekiq::Throttled::ExpirableSet do + subject(:expirable_set) { described_class.new(2.0) } + + it { is_expected.to be_an Enumerable } + + describe ".new" do + it "raises ArgumentError if given TTL is not Float" do + expect { described_class.new(42) }.to raise_error(ArgumentError) + end + + it "raises ArgumentError if given TTL is not positive" do + expect { described_class.new(0.0) }.to raise_error(ArgumentError) + end + end + + describe "#add" do + it "returns self" do + expect(expirable_set.add("a")).to be expirable_set + end + + it "adds uniq elements to the set" do + expirable_set.add("a").add("b").add("b").add("a") + + expect(expirable_set).to contain_exactly("a", "b") + end + end + + describe "#each" do + subject { expirable_set.each } + + before do + monotonic_time = 0.0 + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + expirable_set.add("lorem") + expirable_set.add("ipsum") + + monotonic_time += 1 + + expirable_set.add("ipsum") + + monotonic_time += 1 + + expirable_set.add("dolor") + end + + it { is_expected.to be_an(Enumerator) } + it { is_expected.to contain_exactly("ipsum", "dolor") } + + context "with block given" do + it "yields each paused queue and returns self" do + yielded_elements = [] + + expect(expirable_set.each { |element| yielded_elements << element }).to be expirable_set + expect(yielded_elements).to contain_exactly("ipsum", "dolor") + end + end + end +end diff --git a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb index a814ecfc..84b10fae 100644 --- a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb @@ -2,75 +2,110 @@ require "sidekiq/throttled/patches/basic_fetch" -class ThrottledTestJob - include Sidekiq::Job - include Sidekiq::Throttled::Job +RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do + def stub_job_class(name, &block) + klass = stub_const(name, Class.new) - def perform(*); end -end + klass.include(Sidekiq::Job) + klass.include(Sidekiq::Throttled::Job) -RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do - subject(:fetch) do - config = Sidekiq::Config.new - config.queues = %w[default] - Sidekiq::BasicFetch.new(config.default_capsule) + klass.instance_exec do + def perform(*); end + end + + klass.instance_exec(&block) if block end - describe "#retrieve_work" do - def enqueued_jobs(queue) - Sidekiq.redis do |conn| - conn.lrange("queue:#{queue}", 0, -1).map do |job| - JSON.parse(job).then do |payload| - [payload["class"], payload["args"]] - end + def enqueued_jobs(queue) + Sidekiq.redis do |conn| + conn.lrange("queue:#{queue}", 0, -1).map do |job| + JSON.parse(job).then do |payload| + [payload["class"], *payload["args"]] end end end + end - before do - # Sidekiq is FIFO queue, with head on right side of the list, - # meaning jobs below will be stored in 3, 2, 1 order. - ThrottledTestJob.perform_bulk([[1], [2], [3]]) - end + let(:fetch) do + config = Sidekiq::Config.new + config.queues = %w[default critical] + Sidekiq::BasicFetch.new(config.default_capsule) + end + + before do + Sidekiq::Throttled.configure { |config| config.cooldown_period = nil } + + stub_job_class("TestJob") + stub_job_class("AnotherTestJob") { sidekiq_options(queue: :critical) } + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC).and_return(0.0) + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + TestJob.perform_async(1) + TestJob.perform_async(2) + TestJob.perform_async(3) + AnotherTestJob.perform_async(4) + end + describe "#retrieve_work" do context "when job is not throttled" do it "returns unit of work" do - expect(fetch.retrieve_work).to be_an_instance_of(Sidekiq::BasicFetch::UnitOfWork) + expect(Array.new(4) { fetch.retrieve_work }).to all be_an_instance_of(Sidekiq::BasicFetch::UnitOfWork) end end - context "when job was throttled due to concurrency" do - before do - ThrottledTestJob.sidekiq_throttle(concurrency: { limit: 1 }) + shared_examples "requeues throttled job" do + it "returns nothing" do fetch.retrieve_work - end - it "returns nothing" do expect(fetch.retrieve_work).to be_nil end it "pushes job back to the head of the queue" do + fetch.retrieve_work + expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + .to change { enqueued_jobs("default") }.to([["TestJob", 2], ["TestJob", 3]]) + .and(keep_unchanged { enqueued_jobs("critical") }) end - end - context "when job was throttled due to threshold" do - before do - ThrottledTestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) - fetch.retrieve_work - end + context "when queue cooldown kicks in" do + before do + Sidekiq::Throttled.configure do |config| + config.cooldown_period = 2.0 + config.cooldown_threshold = 1 + end - it "returns nothing" do - expect(fetch.retrieve_work).to be_nil - end + fetch.retrieve_work + end - it "pushes job back to the head of the queue" do - expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + it "updates cooldown queues" do + expect { fetch.retrieve_work } + .to change { enqueued_jobs("default") }.to([["TestJob", 2], ["TestJob", 3]]) + .and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:default"])) + end + + it "excludes the queue from polling" do + fetch.retrieve_work + + expect { fetch.retrieve_work } + .to change { enqueued_jobs("critical") }.to([]) + .and(keep_unchanged { enqueued_jobs("default") }) + end end end + + context "when job was throttled due to concurrency" do + before { TestJob.sidekiq_throttle(concurrency: { limit: 1 }) } + + include_examples "requeues throttled job" + end + + context "when job was throttled due to threshold" do + before { TestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) } + + include_examples "requeues throttled job" + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ce733462..80c1093f 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -15,6 +15,14 @@ @strategies.clear @aliases.clear end + + # Reset config + Sidekiq::Throttled.configure do |throttled_config| + defaults = Sidekiq::Throttled::Config.new + + throttled_config.cooldown_period = defaults.cooldown_period + throttled_config.cooldown_threshold = defaults.cooldown_threshold + end end # rspec-expectations config goes here. You can use an alternate