Skip to content

Commit

Permalink
Additional thread-safety spec
Browse files Browse the repository at this point in the history
  • Loading branch information
kukicola committed May 1, 2024
1 parent 90321d7 commit 0473cf1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
6 changes: 4 additions & 2 deletions lib/sidekiq/debouncer/enq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Debouncer
class Enq < ::Sidekiq::Scheduled::Enq
extend LuaCommands

SET = 'debouncer'
SET = "debouncer"
LUA_ZPOPBYSCORE_WITHSCORE = File.read(File.expand_path("../lua/zpopbyscore_withscore.lua", __FILE__))
LUA_ZPOPBYSCORE_MULTI = File.read(File.expand_path("../lua/zpopbyscore_multi.lua", __FILE__))

Expand All @@ -17,9 +17,11 @@ def initialize(config)
super()
@client = Sidekiq::Client
@redis = Sidekiq.method(:redis)
@logger = Sidekiq.logger
else
super(config)
@redis = config.method(:redis)
@logger = config.logger
end
end

Expand All @@ -34,7 +36,7 @@ def enqueue_jobs

@client.push({"args" => final_args, "class" => klass, "debounce_key" => job})

logger.debug { "enqueued #{SET}: #{job}" }
@logger.debug { "enqueued #{SET}: #{job}" }
end
end
end
Expand Down
2 changes: 0 additions & 2 deletions spec/sidekiq/debouncer/middleware/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@

expect(queue.size).to eq(0)
end

# TODO: add a spec to check if enquing new job while pulling is going to work correctly
end

context "sidekiq testing fake mode" do
Expand Down
23 changes: 23 additions & 0 deletions spec/sidekiq/debouncer/middleware/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,27 @@
processor.process_one
end
end

context "multiprocess safety" do
it "is safe" do
Parallel.each((1..100).to_a, in_processes: 10) do |i|
TestWorker.perform_async("A", i)
end

expect(schedule_set.size).to eq(1)

set_item = schedule_set.first
expect(set_item.value).to eq("debounce/TestWorker/A")

expect(puller.instance_variable_get(:@enq)).to receive(:zpopbyscore_withscore).twice.and_wrap_original do |original_method, *args, **kwargs|
original_method.call(*args, **kwargs).tap { TestWorker.perform_async("A", 101) }
end

Timecop.freeze(time_start + 10 * 60)
puller.enqueue

expect(queue.size).to eq(1)
expect(queue.first.args.map { _1[1] }).to match_array((1..100).to_a)
end
end
end
1 change: 1 addition & 0 deletions spec/support/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
connection.call("FLUSHDB")
connection.call("SCRIPT", "FLUSH")
end
sidekiq_config[:error_handlers] << ->(ex, _) { raise ex }
end
end

0 comments on commit 0473cf1

Please sign in to comment.