Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporarily skip queues on too many throttled jobs in a row #163

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ end
----


=== Configuration

[source,ruby]
----
Sidekiq::Throttled.configure do |config|
# Period in seconds to exclude queue from polling in case it returned
# {config.cooldown_threshold} amount of throttled jobs in a row. Set
# this value to `nil` to disable cooldown manager completely.
# Default: 2.0
config.cooldown_period = 2.0
# Exclude queue from polling after it returned given amount of throttled
# jobs in a row.
# Default: 1 (cooldown after first throttled job)
config.cooldown_threshold = 1
end
----


=== Observer

You can specify an observer that will be called on throttling. To do so pass an
Expand Down
36 changes: 33 additions & 3 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

require "sidekiq"

require_relative "./throttled/version"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/middleware"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"

# @see https://github.com/mperham/sidekiq/
Expand Down Expand Up @@ -39,7 +41,35 @@ module Sidekiq
# end
# end
module Throttled
MUTEX = Mutex.new
private_constant :MUTEX

@config = Config.new.freeze
@cooldown = Cooldown[@config]

class << self
# @api internal
#
# @return [Cooldown, nil]
attr_reader :cooldown

# @example
# Sidekiq::Throttled.configure do |config|
# config.cooldown_period = nil # Disable queues cooldown manager
# end
#
# @yieldparam config [Config]
def configure
MUTEX.synchronize do
config = @config.dup

yield config

@config = config.freeze
@cooldown = Cooldown[@config]
end
end

def setup!
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
Expand Down
44 changes: 44 additions & 0 deletions lib/sidekiq/throttled/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

module Sidekiq
module Throttled
# Configuration object.
class Config
# Period in seconds to exclude queue from polling in case it returned
# {#cooldown_threshold} amount of throttled jobs in a row.
#
# Set this to `nil` to disable cooldown completely.
#
# @return [Float, nil]
attr_reader :cooldown_period

# Amount of throttled jobs returned from the queue subsequently after
# which queue will be excluded from polling for the durations of
# {#cooldown_period}.
#
# @return [Integer]
attr_reader :cooldown_threshold

def initialize
@cooldown_period = 2.0
@cooldown_threshold = 1
end

# @!attribute [w] cooldown_period
def cooldown_period=(value)
raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float)
raise ArgumentError, "period must be positive" unless value.nil? || value.positive?

@cooldown_period = value
end

# @!attribute [w] cooldown_threshold
def cooldown_threshold=(value)
raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer)
raise ArgumentError, "threshold must be positive" unless value.positive?

@cooldown_threshold = value
end
end
end
end
55 changes: 55 additions & 0 deletions lib/sidekiq/throttled/cooldown.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require "concurrent"

require_relative "./expirable_set"

module Sidekiq
module Throttled
# @api internal
#
# Queues cooldown manager. Tracks list of queues that should be temporarily
# (for the duration of {Config#cooldown_period}) excluded from polling.
class Cooldown
class << self
# Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`.
#
# @param config [Config]
# @return [Cooldown, nil]
def [](config)
new(config) if config.cooldown_period
end
end

# @param config [Config]
def initialize(config)
@queues = ExpirableSet.new(config.cooldown_period)
@threshold = config.cooldown_threshold
@tracker = Concurrent::Map.new
end

# Notify that given queue returned job that was throttled.
#
# @param queue [String]
# @return [void]
def notify_throttled(queue)
@queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ)
end

# Notify that given queue returned job that was not throttled.
#
# @param queue [String]
# @return [void]
def notify_admitted(queue)
@tracker.delete(queue)
end

# List of queues that should not be polled
#
# @return [Array<String>]
def queues
@queues.to_a
end
end
end
end
70 changes: 70 additions & 0 deletions lib/sidekiq/throttled/expirable_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

require "concurrent"

module Sidekiq
module Throttled
# @api internal
#
# Set of elements with expirations.
#
# @example
# set = ExpirableSet.new(10.0)
# set.add("a")
# sleep(5)
# set.add("b")
# set.to_a # => ["a", "b"]
# sleep(5)
# set.to_a # => ["b"]
class ExpirableSet
include Enumerable

# @param ttl [Float] expiration is seconds
# @raise [ArgumentError] if `ttl` is not positive Float
def initialize(ttl)
raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive?

@elements = Concurrent::Map.new
@ttl = ttl
end

# @param element [Object]
# @return [ExpirableSet] self
def add(element)
# cleanup expired elements to avoid mem-leak
horizon = now
expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) }
expired.each { |pair| @elements.delete_pair(*pair) }

# add new element
@elements[element] = now + @ttl

self
end

# @yield [Object] Gives each live (not expired) element to the block
def each
return to_enum __method__ unless block_given?

horizon = now

@elements.each_pair do |element, sunset|
yield element unless expired?(sunset, horizon)
end

self
end

private

# @return [Float]
def now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end

def expired?(sunset, horizon)
sunset <= horizon
end
end
end
end
4 changes: 2 additions & 2 deletions lib/sidekiq/throttled/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ module Job
# in order to make API inline with `include Sidekiq::Job`.
#
# @private
def self.included(worker)
worker.send(:extend, ClassMethods)
def self.included(base)
base.extend(ClassMethods)
end

# Helper methods added to the singleton class of destination
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ def retrieve_work
work = super

if work && Throttled.throttled?(work.job)
Throttled.cooldown&.notify_throttled(work.queue)
requeue_throttled(work)
return nil
end

Throttled.cooldown&.notify_admitted(work.queue) if work

work
end

Expand All @@ -33,6 +36,15 @@ def retrieve_work
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def queues_cmd
super - (Throttled.cooldown&.queues || [])
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions rubocop/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ RSpec/ExampleLength:
Enabled: true
Max: 10

RSpec/ExpectChange:
Enabled: true
EnforcedStyle: block

RSpec/MultipleExpectations:
Enabled: false

Expand Down
1 change: 1 addition & 0 deletions sidekiq-throttled.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = ">= 3.0"

spec.add_runtime_dependency "concurrent-ruby", ">= 1.2.0"
spec.add_runtime_dependency "redis-prescription", "~> 2.2"
spec.add_runtime_dependency "sidekiq", ">= 7.0"
end
58 changes: 58 additions & 0 deletions spec/lib/sidekiq/throttled/config_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require "sidekiq/throttled/config"

RSpec.describe Sidekiq::Throttled::Config do
subject(:config) { described_class.new }

describe "#cooldown_period" do
subject { config.cooldown_period }

it { is_expected.to eq 2.0 }
end

describe "#cooldown_period=" do
it "updates #cooldown_period" do
expect { config.cooldown_period = 42.0 }
.to change { config.cooldown_period }.to(42.0)
end

it "allows setting value to `nil`" do
expect { config.cooldown_period = nil }
.to change { config.cooldown_period }.to(nil)
end

it "fails if given value is neither `NilClass` nor `Float`" do
expect { config.cooldown_period = 42 }
.to raise_error(TypeError, %r{unexpected type})
end

it "fails if given value is not positive" do
expect { config.cooldown_period = 0.0 }
.to raise_error(ArgumentError, %r{must be positive})
end
end

describe "#cooldown_threshold" do
subject { config.cooldown_threshold }

it { is_expected.to eq 1 }
end

describe "#cooldown_threshold=" do
it "updates #cooldown_threshold" do
expect { config.cooldown_threshold = 42 }
.to change { config.cooldown_threshold }.to(42)
end

it "fails if given value is not `Integer`" do
expect { config.cooldown_threshold = 42.0 }
.to raise_error(TypeError, %r{unexpected type})
end

it "fails if given value is not positive" do
expect { config.cooldown_threshold = 0 }
.to raise_error(ArgumentError, %r{must be positive})
end
end
end
Loading