Skip to content

Commit

Permalink
wip: Implement cooldown controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ixti committed Nov 19, 2023
1 parent 84d8670 commit a80dcb6
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 3 deletions.
28 changes: 25 additions & 3 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

require "sidekiq"

require_relative "./throttled/version"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/middleware"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"

# @see https://github.com/mperham/sidekiq/
Expand Down Expand Up @@ -39,7 +41,27 @@ module Sidekiq
# end
# end
module Throttled
MUTEX = Mutex.new
private_constant :MUTEX

@config = Config.new.freeze
@cooldown = Cooldown[@config]

class << self
# @api internal
attr_reader :cooldown

def configure
MUTEX.synchronize do
config = @config.dup

yield config

@config = config.freeze
@cooldown = Cooldown[@config]
end
end

def setup!
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
Expand Down
43 changes: 43 additions & 0 deletions lib/sidekiq/throttled/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

module Sidekiq
module Throttled
class Config
# Period in seconds to exclude queue from polling in case it returned
# {#cooldown_threshold} throttled jobs in a row.
#
# Set this to `0.0` to disable cooldown completely.
#
# @return [Float]
attr_reader :cooldown_period

# Amount of throttled jobs returned from the same queue subsequently after
# which queue will be excluded from polling for the durations of
# {#cooldown_period}.
#
# @return [Integer]
attr_reader :cooldown_threshold

def initialize
@cooldown_period = 2.0
@cooldown_threshold = 1
end

# @!attribute [w] cooldown_period
def cooldown_period=(value)
raise ArgumentError, "cooldown period must be a non-negative Float" unless value.is_a?(Float) && 0.0 <= value

@cooldown_period = value
end

# @!attribute [w] cooldown_threshold
def cooldown_threshold=(value)
unless value.is_a?(Integer) && value.positive?
raise ArgumentError, "cooldown threshold must be a positive Integer"
end

@cooldown_threshold = value
end
end
end
end
40 changes: 40 additions & 0 deletions lib/sidekiq/throttled/cooldown.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

require "concurrent"

require_relative "./expirable_set"

module Sidekiq
module Throttled
class Cooldown
class << self
def [](config)
return unless config.cooldown_period.positive?

new(period: config.cooldown_period, threshold: config.cooldown_threshold)
end
end

def initialize(period:, threshold:)
@queues = ExpirableSet.new(period)
@threshold = threshold
@tracker = Concurrent::Map.new
end

def notify_throttled(queue)
@queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
end

def notify_admitted(queue)
@tracker.delete(queue)
end

# List of queues that should not be polled
#
# @return [Array<String>]
def queues
@queues.to_a
end
end
end
end
12 changes: 12 additions & 0 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ def retrieve_work
work = super

if work && Throttled.throttled?(work.job)
Throttled.cooldown&.notify_throttled(work.queue)
requeue_throttled(work)
return nil
end

Throttled.cooldown&.notify_admitted(work.queue) if work

work
end

Expand All @@ -33,6 +36,15 @@ def retrieve_work
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def queues_cmd
super - (Throttled.cooldown&.queues || [])
end
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ def perform(*); end
Sidekiq::BasicFetch.new(config.default_capsule)
end

before do
Sidekiq::Throttled.configure do |config|
config.cooldown_threshold = 2
end
end

after do
Sidekiq::Throttled.configure do |config|
config.cooldown_threshold = 1
end
end

describe "#retrieve_work" do
def enqueued_jobs(queue)
Sidekiq.redis do |conn|
Expand Down

0 comments on commit a80dcb6

Please sign in to comment.