Skip to content

Commit

Permalink
Adds coverage for job retries (#321)
Browse files Browse the repository at this point in the history
* Adds coverage for that retrying jobs is working

- Minor refactoring to simplify locksmith a little
- Adds TODO for moving the touching of the grabbed token to LUA

* Fix the retry spec and move it

* Fix deleting old keys w/coverage

* Rubo👮‍♀️
  • Loading branch information
mhenrixon committed Aug 9, 2018
1 parent 0ba100d commit 8402dbf
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 48 deletions.
17 changes: 4 additions & 13 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module SidekiqUniqueJobs
# Lock manager class that handles all the various locks
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class Locksmith # rubocop:disable ClassLength
class Locksmith
include SidekiqUniqueJobs::Connection

# @param [Hash] item a Sidekiq job hash
Expand All @@ -20,17 +20,6 @@ def initialize(item, redis_pool = nil)
@redis_pool = redis_pool
end

# Creates the necessary keys in redis to attempt a lock
# @return [String] the Sidekiq job_id
def create
Scripts.call(
:create,
redis_pool,
keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration],
)
end

# Checks if the exists key is created in redis
# @return [true, false]
def exists?
Expand Down Expand Up @@ -66,7 +55,9 @@ def delete!
# @yield the block to execute if a lock is successful
# @return the Sidekiq job_id (jid)
def lock(timeout = nil, &block)
create
Scripts.call(:lock, redis_pool,
keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration])

grab_token(timeout) do |token|
touch_grabbed_token(token)
Expand Down
7 changes: 6 additions & 1 deletion lib/sidekiq_unique_jobs/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def keys_with_ttl(pattern = SCAN_PATTERN, count = DEFAULT_COUNT)
# @return [Integer] the number of keys deleted
def del(pattern = SCAN_PATTERN, count = 0)
raise ArgumentError, 'Please provide a number of keys to delete greater than zero' if count.zero?
pattern = "#{pattern}:*" unless pattern.end_with?(':*')
pattern = suffix(pattern)

log_debug { "Deleting keys by: #{pattern}" }
keys, time = timed { keys(pattern, count) }
Expand Down Expand Up @@ -87,6 +87,11 @@ def prefix(key)
"#{unique_prefix}:#{key}"
end

def suffix(key)
return "#{key}*" unless key.end_with?(':*')
key
end

def unique_prefix
SidekiqUniqueJobs.config.unique_prefix
end
Expand Down
12 changes: 12 additions & 0 deletions redis/create.lua → redis/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ local unique_digest = KEYS[5]
local job_id = ARGV[1]
local expiration = tonumber(ARGV[2])

local function current_time()
local time = redis.call('time')
local s = time[1]
local ms = time[2]
local number = tonumber((s .. '.' .. ms))

return number
end

-- redis.log(redis.LOG_DEBUG, "create.lua - investigate possibility of locking jid: " .. job_id)

local stored_token = redis.call('GET', exists_key)
Expand All @@ -34,6 +43,9 @@ end

redis.call('SADD', unique_keys, unique_digest)
redis.call('DEL', grabbed_key)
-- TODO: Move this to LUA when redis 3.2 is the least supported
-- redis.call('HSET', grabbed_key, job_id, current_time())
---------------------------------------------------------------
redis.call('DEL', available_key)
redis.call('RPUSH', available_key, job_id)

Expand Down
52 changes: 52 additions & 0 deletions spec/integration/sidekiq/retry_set_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe Sidekiq::RetrySet, redis: :redis do
let(:locksmith) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:args) { [1, 2] }
let(:worker_class) { MyUniqueJob }
let(:jid) { 'ajobid' }
let(:lock) { :until_executed }
let(:lock_expiration) { 7_200 }
let(:queue) { :customqueue }
let(:retry_at) { Time.now.to_f + 360 }
let(:unique_digest) { 'uniquejobs:9e9b5ce5d423d3ea470977004b50ff84' }
let(:item) do
{
'args' => args,
'class' => worker_class,
'failed_at' => Time.now.to_f,
'jid' => jid,
'lock' => lock,
'lock_expiration' => lock_expiration,
'queue' => queue,
'retry_at' => retry_at,
'retry_count' => 1,
'unique_digest' => unique_digest,
}
end

before do
zadd('retry', retry_at.to_s, Sidekiq.dump_json(item))
expect(retry_count).to eq(1)
end

context 'when a job is locked' do
before do
expect(locksmith.lock).to eq(jid)
expect(unique_keys).to match_array(%W[
#{unique_digest}:EXISTS
#{unique_digest}:GRABBED
])
expect(ttl("#{unique_digest}:EXISTS")).to eq(lock_expiration)
expect(ttl("#{unique_digest}:GRABBED")).to eq(-1)
end

it 'can be put back on queue' do
expect { described_class.new.retry_all }
.to change { queue_count(queue) }
.from(0).to(1)
end
end
end
6 changes: 3 additions & 3 deletions spec/integration/sidekiq_unique_jobs/locksmith_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
it 'disappears without a trace when calling `delete!`' do
original_key_size = keys.size

locksmith_one.create
locksmith_one.lock
locksmith_one.delete!

expect(keys.size).to eq(original_key_size)
Expand Down Expand Up @@ -162,7 +162,7 @@

it 'expires keys' do
Sidekiq.redis(&:flushdb)
locksmith_one.create
locksmith_one.lock
keys = unique_keys
expect(unique_keys).not_to include(keys)
end
Expand All @@ -181,7 +181,7 @@
# it_behaves_like 'a lock'

# it 'can dynamically add resources' do
# locksmith_one.create
# locksmith_one.lock

# 3.times do
# locksmith_one.unlock
Expand Down
4 changes: 4 additions & 0 deletions spec/support/sidekiq_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def unique_keys
keys('uniquejobs:*')
end

def zadd(queue, timestamp, item)
redis { |conn| conn.zadd(queue, timestamp, item) }
end

def zcard(queue)
redis { |conn| conn.zcard(queue) }
end
Expand Down
106 changes: 75 additions & 31 deletions spec/unit/sidekiq_unique_jobs/util_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
RSpec.describe SidekiqUniqueJobs::Util, redis: :redis do
let(:item_hash) do
{
'class' => 'MyUniqueJob',
'args' => [[1, 2]],
'at' => 1_492_341_850.358196,
'retry' => true,
'queue' => 'customqueue',
'unique' => :until_executed,
'expiration' => 7200,
'retry_count' => 10,
'jid' => jid,
'created_at' => 1_492_341_790.358217,
'class' => 'MyUniqueJob',
'args' => [[1, 2]],
'at' => 1_492_341_850.358196,
'retry' => true,
'queue' => 'customqueue',
'lock' => :until_executed,
'lock_expiration' => 7200,
'retry_count' => 10,
'jid' => jid,
'created_at' => 1_492_341_790.358217,
}
end

Expand All @@ -24,49 +24,93 @@
my_item
end

let(:unique_key) { item['unique_digest'] }
let(:jid) { 'e3049b05b0bd9c809182bbe0' }
let(:lock) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:unique_digest) { item['unique_digest'] }
let(:jid) { 'e3049b05b0bd9c809182bbe0' }
let(:lock) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:expected_keys) do
%W[
#{unique_key}:EXISTS
#{unique_key}:GRABBED
#{unique_digest}:EXISTS
#{unique_digest}:GRABBED
]
end

shared_context 'with an old lock' do
before do
result = SidekiqUniqueJobs::Scripts.call(
:acquire_lock,
nil,
keys: [unique_digest],
argv: [jid, 7200],
)
expect(result).to eq(1)
expect(described_class.keys).to include(unique_digest)
end
end

describe '.keys' do
subject(:keys) { described_class.keys }

before do
lock.lock(0)
context 'when old lock exists' do
include_context 'with an old lock'

it { is_expected.to match_array([unique_digest]) }
end

it { is_expected.to match_array(expected_keys) }
context 'when new lock exists' do
before do
lock.lock(0)
end

it { is_expected.to match_array(expected_keys) }
end
end

describe '.del' do
subject(:del) { described_class.del(pattern, 100) }

before do
lock.lock(0)
end
context 'when an old lock exists' do
include_context 'with an old lock'

it { expect(described_class.keys).to match_array(expected_keys) }
it { expect(described_class.keys).to match_array([unique_digest]) }

context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }
context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }

it { is_expected.to eq(2) }
end
it { is_expected.to eq(1) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

context 'when pattern is a specific key' do
let(:pattern) { unique_key }
context 'when pattern is a specific key' do
let(:pattern) { unique_digest }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
it { is_expected.to eq(1) }
it { expect { del }.to change(described_class, :keys).to([]) }
end
end

after { lock.delete }
context 'when a new lock exists' do
before do
lock.lock(0)
end

it { expect(described_class.keys).to match_array(expected_keys) }

context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

context 'when pattern is a specific key' do
let(:pattern) { unique_digest }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

after { lock.delete }
end
end

describe '.prefix' do
Expand Down

0 comments on commit 8402dbf

Please sign in to comment.