Skip to content

Commit

Permalink
wip: Implement cooldown controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ixti committed Nov 19, 2023
1 parent 84d8670 commit 341faec
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 7 deletions.
19 changes: 19 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ end
----


=== Configuration

[source,ruby]
----
Sidekiq::Throttled.configure do |config|
# Period in seconds to exclude queue from polling in case it returned
# {config.cooldown_threshold} amount of throttled jobs in a row. Set
# this value to `nil` to disable cooldown manager completely.
# Default: 2.0
config.coolldown_period = 2.0
# Amount of throttled jobs returned from the queue subsequently after which
# queue will be excluded from polling.
# Default: 1 (cooldown after first throttled job)
config.cooldown_threshold = 1
end
----


=== Observer

You can specify an observer that will be called on throttling. To do so pass an
Expand Down
36 changes: 33 additions & 3 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

require "sidekiq"

require_relative "./throttled/version"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/middleware"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"

# @see https://github.com/mperham/sidekiq/
Expand Down Expand Up @@ -39,7 +41,35 @@ module Sidekiq
# end
# end
module Throttled
MUTEX = Mutex.new
private_constant :MUTEX

@config = Config.new.freeze
@cooldown = Cooldown[@config]

class << self
# @api internal
#
# @return [Cooldown, nil]
attr_reader :cooldown

# @example
# Sidekiq::Throttled.configure do |config|
# config.cooldown_period = nil # Disable queues cooldown manager
# end
#
# @yieldparam config [Config]
def configure
MUTEX.synchronize do
config = @config.dup

yield config

@config = config.freeze
@cooldown = Cooldown[@config]
end
end

def setup!
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
Expand Down
44 changes: 44 additions & 0 deletions lib/sidekiq/throttled/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

module Sidekiq
module Throttled
# Configuration object.
class Config
# Period in seconds to exclude queue from polling in case it returned
# {#cooldown_threshold} amount of throttled jobs in a row.
#
# Set this to `nil` to disable cooldown completely.
#
# @return [Float, nil]
attr_reader :cooldown_period

# Amount of throttled jobs returned from the same queue subsequently after
# which queue will be excluded from polling for the durations of
# {#cooldown_period}.
#
# @return [Integer]
attr_reader :cooldown_threshold

def initialize
@cooldown_period = 2.0
@cooldown_threshold = 1
end

# @!attribute [w] cooldown_period
def cooldown_period=(value)
raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float)
raise ArgumentError, "period must be positive" unless value.nil? || value.positive?

@cooldown_period = value
end

# @!attribute [w] cooldown_threshold
def cooldown_threshold=(value)
raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer)
raise ArgumentError, "threshold must be positive" unless value.positive?

@cooldown_threshold = value
end
end
end
end
55 changes: 55 additions & 0 deletions lib/sidekiq/throttled/cooldown.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require "concurrent"

require_relative "./expirable_set"

module Sidekiq
module Throttled
# @api internal
#
# Queues cooldown manager. Tracks list of queues that should be temporarily
# (for the duration of {Config#cooldown_period}) excluded from polling.
class Cooldown
class << self
# Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`.
#
# @param config [Config]
# @return [Cooldown, nil]
def [](config)
new(config) if config.cooldown_period
end
end

# @param config [Config]
def initialize(config)
@queues = ExpirableSet.new(config.cooldown_period)
@threshold = config.cooldown_threshold
@tracker = Concurrent::Map.new
end

# Notify that given queue returned job that was throttled.
#
# @param queue [String]
# @return [void]
def notify_throttled(queue)
@queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
end

# Notify that given queue returned job that was not throttled.
#
# @param queue [String]
# @return [void]
def notify_admitted(queue)
@tracker.delete(queue)
end

# List of queues that should not be polled
#
# @return [Array<String>]
def queues
@queues.to_a
end
end
end
end
2 changes: 2 additions & 0 deletions lib/sidekiq/throttled/expirable_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

module Sidekiq
module Throttled
# @api internal
#
# Set of elements with expirations.
#
# @example
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ def retrieve_work
work = super

if work && Throttled.throttled?(work.job)
Throttled.cooldown&.notify_throttled(work.queue)
requeue_throttled(work)
return nil
end

Throttled.cooldown&.notify_admitted(work.queue) if work

work
end

Expand All @@ -33,6 +36,15 @@ def retrieve_work
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def queues_cmd
super - (Throttled.cooldown&.queues || [])
end
end
end
end
Expand Down
58 changes: 58 additions & 0 deletions spec/lib/sidekiq/throttled/config_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require "sidekiq/throttled/config"

RSpec.describe Sidekiq::Throttled::Config do
subject(:config) { described_class.new }

describe "#cooldown_period" do
subject { config.cooldown_period }

it { is_expected.to eq 2.0 }
end

describe "#cooldown_period=" do
it "updates #cooldown_period" do
expect { config.cooldown_period = 42.0 }
.to change { config.cooldown_period }.to(42.0)
end

it "allows setting value to `nil`" do
expect { config.cooldown_period = nil }
.to change { config.cooldown_period }.to(nil)
end

it "fails if given value is neither `NilClass` nor `Float`" do
expect { config.cooldown_period = 42 }
.to raise_error(TypeError, /unexpected type/)
end

it "fails if given value is not positive" do
expect { config.cooldown_period = 0.0 }
.to raise_error(ArgumentError, /must be positive/)
end
end

describe "#cooldown_threshold" do
subject { config.cooldown_threshold }

it { is_expected.to eq 1 }
end

describe "#cooldown_threshold=" do
it "updates #cooldown_threshold" do
expect { config.cooldown_threshold = 42 }
.to change { config.cooldown_threshold }.to(42)
end

it "fails if given value is not `Integer`" do
expect { config.cooldown_threshold = 42.0 }
.to raise_error(TypeError, /unexpected type/)
end

it "fails if given value is not positive" do
expect { config.cooldown_threshold = 0 }
.to raise_error(ArgumentError, /must be positive/)
end
end
end
83 changes: 83 additions & 0 deletions spec/lib/sidekiq/throttled/cooldown_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# frozen_string_literal: true

require "sidekiq/throttled/cooldown"

RSpec.describe Sidekiq::Throttled::Cooldown do
subject(:cooldown) { described_class.new(config) }

let(:config) { Sidekiq::Throttled::Config.new }

describe ".[]" do
subject { described_class[config] }

it { is_expected.to be_an_instance_of described_class }

context "when `cooldown_period` is nil" do
before { config.cooldown_period = nil }

it { is_expected.to be_nil }
end
end

describe "#notify_throttled" do
before do
config.cooldown_threshold = 5

(config.cooldown_threshold - 1).times do
cooldown.notify_throttled("queue:the_longest_line")
end
end

it "marks queue for exclusion once threshold is met" do
cooldown.notify_throttled("queue:the_longest_line")

expect(cooldown.queues).to contain_exactly("queue:the_longest_line")
end
end

describe "#notify_admitted" do
before do
config.cooldown_threshold = 5

(config.cooldown_threshold - 1).times do
cooldown.notify_throttled("queue:at_the_end_of")
cooldown.notify_throttled("queue:the_longest_line")
end
end

it "resets threshold counter" do
cooldown.notify_admitted("queue:at_the_end_of")

cooldown.notify_throttled("queue:at_the_end_of")
cooldown.notify_throttled("queue:the_longest_line")

expect(cooldown.queues).to contain_exactly("queue:the_longest_line")
end
end

describe "#queues" do
before do
config.cooldown_period = 1.0
config.cooldown_threshold = 1
end

it "keeps queue in the exclusion list for the duration of cooldown_period" do
monotonic_time = 0.0

allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time }

cooldown.notify_throttled("queue:at_the_end_of")

monotonic_time += 0.9
cooldown.notify_throttled("queue:the_longest_line")

expect(cooldown.queues).to contain_exactly("queue:at_the_end_of", "queue:the_longest_line")

monotonic_time += 0.1
expect(cooldown.queues).to contain_exactly("queue:the_longest_line")

monotonic_time += 1.0
expect(cooldown.queues).to be_empty
end
end
end
Loading

0 comments on commit 341faec

Please sign in to comment.