Skip to content

Commit

Permalink
fix: Adjust queue name handling in re_enqueue_throttled and reschedul…
Browse files Browse the repository at this point in the history
…e_throttled methods (#6)
  • Loading branch information
mnovelo authored Dec 17, 2024
1 parent 41da721 commit 1a19ac5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
14 changes: 9 additions & 5 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,38 @@ def calc_target_queue(work) # rubocop:disable Metrics/MethodLength
when NilClass
work.queue
when String, Symbol
requeue_to.to_s
requeue_to
else
raise ArgumentError, "Invalid argument for `to`"
end

target = work.queue if target.nil? || target.empty?

target.to_s.gsub("queue:", "")
target.to_s
end

# Push the job back to the head of the queue.
# The queue name is expected to include the "queue:" prefix, so we add it if it's missing.
def re_enqueue_throttled(work, target_queue)
target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:")

case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
work.queue = target_queue if target_queue
work.queue = target_queue
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
# The queue name is expected to include the "queue:" prefix, so we add it if it's missing.
target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:")
Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) }
end
end

# Reschedule the job to be executed later in the target queue.
# The queue name should NOT include the "queue:" prefix, so we remove it if it's present.
def reschedule_throttled(work, target_queue)
target_queue = target_queue.gsub(/^queue:/, "")
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]
Expand Down
24 changes: 22 additions & 2 deletions spec/lib/sidekiq/throttled/strategy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,17 @@ def scheduled_redis_item_and_score
it "returns false and does not reschedule the job" do
expect(Sidekiq::Client).not_to receive(:enqueue_to_in)
expect(work).not_to receive(:acknowledge)
expect(subject.send(:reschedule_throttled, work, requeue_to: "default")).to be_falsey
expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey
end
end

context "when target_queue has the 'queue:' prefix" do
let(:target_queue) { "queue:default" }

it "reschedules the job to the specified queue" do
expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything)
expect(work).to receive(:acknowledge)
subject.send(:reschedule_throttled, work, target_queue)
end
end
end
Expand Down Expand Up @@ -998,7 +1008,17 @@ def scheduled_redis_item_and_score
it "returns false and does not reschedule the job" do
expect(Sidekiq::Client).not_to receive(:enqueue_to_in)
expect(work).not_to receive(:acknowledge)
expect(subject.send(:reschedule_throttled, work, requeue_to: "default")).to be_falsey
expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey
end
end

context "when target_queue has the 'queue:' prefix" do
let(:target_queue) { "queue:default" }

it "reschedules the job to the specified queue" do
expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything)
expect(work).to receive(:acknowledge)
subject.send(:reschedule_throttled, work, target_queue)
end
end
end
Expand Down

0 comments on commit 1a19ac5

Please sign in to comment.