Skip to content

Commit

Permalink
feat!: Revive queues cooldown manager
Browse files Browse the repository at this point in the history
Brings back feature that was removed in:

  e5ac585

See README.adoc for possible configuration

Resolve: #160
Resolve: #157
Co-authored-by: Alexandr Elhovenko <freemanoid321@gmail.com>
Signed-off-by: Alexey Zapparov <alexey@zapparov.com>
  • Loading branch information
ixti and freemanoid committed Nov 20, 2023
1 parent 0b42a9e commit 736acc3
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 7 deletions.
19 changes: 19 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ end
----


=== Configuration

[source,ruby]
----
Sidekiq::Throttled.configure do |config|
# Period in seconds to exclude queue from polling in case it returned
# {config.cooldown_threshold} amount of throttled jobs in a row. Set
# this value to `nil` to disable cooldown manager completely.
# Default: 2.0
config.cooldown_period = 2.0
# Exclude queue from polling after it returned given amount of throttled
# jobs in a row.
# Default: 1 (cooldown after first throttled job)
config.cooldown_threshold = 1
end
----


=== Observer

You can specify an observer that will be called on throttling. To do so pass an
Expand Down
36 changes: 33 additions & 3 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

require "sidekiq"

require_relative "./throttled/version"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/middleware"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"

# @see https://github.com/mperham/sidekiq/
Expand Down Expand Up @@ -39,7 +41,35 @@ module Sidekiq
# end
# end
module Throttled
MUTEX = Mutex.new
private_constant :MUTEX

@config = Config.new.freeze
@cooldown = Cooldown[@config]

class << self
# @api internal
#
# @return [Cooldown, nil]
attr_reader :cooldown

# @example
# Sidekiq::Throttled.configure do |config|
# config.cooldown_period = nil # Disable queues cooldown manager
# end
#
# @yieldparam config [Config]
def configure
MUTEX.synchronize do
config = @config.dup

yield config

@config = config.freeze
@cooldown = Cooldown[@config]
end
end

def setup!
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
Expand Down
44 changes: 44 additions & 0 deletions lib/sidekiq/throttled/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

module Sidekiq
module Throttled
# Configuration object.
class Config
# Period in seconds to exclude queue from polling in case it returned
# {#cooldown_threshold} amount of throttled jobs in a row.
#
# Set this to `nil` to disable cooldown completely.
#
# @return [Float, nil]
attr_reader :cooldown_period

# Amount of throttled jobs returned from the queue subsequently after
# which queue will be excluded from polling for the durations of
# {#cooldown_period}.
#
# @return [Integer]
attr_reader :cooldown_threshold

def initialize
@cooldown_period = 2.0
@cooldown_threshold = 1
end

# @!attribute [w] cooldown_period
def cooldown_period=(value)
raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float)
raise ArgumentError, "period must be positive" unless value.nil? || value.positive?

@cooldown_period = value
end

# @!attribute [w] cooldown_threshold
def cooldown_threshold=(value)
raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer)
raise ArgumentError, "threshold must be positive" unless value.positive?

@cooldown_threshold = value
end
end
end
end
55 changes: 55 additions & 0 deletions lib/sidekiq/throttled/cooldown.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require "concurrent"

require_relative "./expirable_set"

module Sidekiq
module Throttled
# @api internal
#
# Queues cooldown manager. Tracks list of queues that should be temporarily
# (for the duration of {Config#cooldown_period}) excluded from polling.
class Cooldown
class << self
# Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`.
#
# @param config [Config]
# @return [Cooldown, nil]
def [](config)
new(config) if config.cooldown_period
end
end

# @param config [Config]
def initialize(config)
@queues = ExpirableSet.new(config.cooldown_period)
@threshold = config.cooldown_threshold
@tracker = Concurrent::Map.new
end

# Notify that given queue returned job that was throttled.
#
# @param queue [String]
# @return [void]
def notify_throttled(queue)
@queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
end

# Notify that given queue returned job that was not throttled.
#
# @param queue [String]
# @return [void]
def notify_admitted(queue)
@tracker.delete(queue)
end

# List of queues that should not be polled
#
# @return [Array<String>]
def queues
@queues.to_a
end
end
end
end
70 changes: 70 additions & 0 deletions lib/sidekiq/throttled/expirable_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

require "concurrent"

module Sidekiq
module Throttled
# @api internal
#
# Set of elements with expirations.
#
# @example
# set = ExpirableSet.new(10.0)
# set.add("a")
# sleep(5)
# set.add("b")
# set.to_a # => ["a", "b"]
# sleep(5)
# set.to_a # => ["b"]
class ExpirableSet
include Enumerable

# @param ttl [Float] expiration is seconds
# @raise [ArgumentError] if `ttl` is not positive Float
def initialize(ttl)
raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive?

@elements = Concurrent::Map.new
@ttl = ttl
end

# @param element [Object]
# @return [ExpirableSet] self
def add(element)
# cleanup expired elements to avoid mem-leak
horizon = now
expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) }
expired.each { |pair| @elements.delete_pair(*pair) }

# add new element
@elements[element] = now + @ttl

self
end

# @yield [Object] Gives each live (not expired) element to the block
def each
return to_enum __method__ unless block_given?

horizon = now

@elements.each_pair do |element, sunset|
yield element unless expired?(sunset, horizon)
end

self
end

private

# @return [Float]
def now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end

def expired?(sunset, horizon)
sunset <= horizon
end
end
end
end
12 changes: 12 additions & 0 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ def retrieve_work
work = super

if work && Throttled.throttled?(work.job)
Throttled.cooldown&.notify_throttled(work.queue)
requeue_throttled(work)
return nil
end

Throttled.cooldown&.notify_admitted(work.queue) if work

work
end

Expand All @@ -33,6 +36,15 @@ def retrieve_work
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def queues_cmd
super - (Throttled.cooldown&.queues || [])
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions rubocop/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ RSpec/ExampleLength:
Enabled: true
Max: 10

RSpec/ExpectChange:
Enabled: true
EnforcedStyle: block

RSpec/MultipleExpectations:
Enabled: false

Expand Down
1 change: 1 addition & 0 deletions sidekiq-throttled.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = ">= 3.0"

spec.add_runtime_dependency "concurrent-ruby", ">= 1.2.0"
spec.add_runtime_dependency "redis-prescription", "~> 2.2"
spec.add_runtime_dependency "sidekiq", ">= 7.0"
end
58 changes: 58 additions & 0 deletions spec/lib/sidekiq/throttled/config_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require "sidekiq/throttled/config"

RSpec.describe Sidekiq::Throttled::Config do
subject(:config) { described_class.new }

describe "#cooldown_period" do
subject { config.cooldown_period }

it { is_expected.to eq 2.0 }
end

describe "#cooldown_period=" do
it "updates #cooldown_period" do
expect { config.cooldown_period = 42.0 }
.to change { config.cooldown_period }.to(42.0)
end

it "allows setting value to `nil`" do
expect { config.cooldown_period = nil }
.to change { config.cooldown_period }.to(nil)
end

it "fails if given value is neither `NilClass` nor `Float`" do
expect { config.cooldown_period = 42 }
.to raise_error(TypeError, %r{unexpected type})
end

it "fails if given value is not positive" do
expect { config.cooldown_period = 0.0 }
.to raise_error(ArgumentError, %r{must be positive})
end
end

describe "#cooldown_threshold" do
subject { config.cooldown_threshold }

it { is_expected.to eq 1 }
end

describe "#cooldown_threshold=" do
it "updates #cooldown_threshold" do
expect { config.cooldown_threshold = 42 }
.to change { config.cooldown_threshold }.to(42)
end

it "fails if given value is not `Integer`" do
expect { config.cooldown_threshold = 42.0 }
.to raise_error(TypeError, %r{unexpected type})
end

it "fails if given value is not positive" do
expect { config.cooldown_threshold = 0 }
.to raise_error(ArgumentError, %r{must be positive})
end
end
end
Loading

0 comments on commit 736acc3

Please sign in to comment.