Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UntilExpired #278

Merged
merged 10 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .simplecov
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ SimpleCov.start do
add_filter '/spec/'
add_filter '/bin/'
add_filter '/gemfiles/'
add_group 'Workers', 'examples/'
add_filter '/examples/'

add_group 'Client', 'lib/sidekiq_unique_jobs/client'
add_group 'Locks', 'lib/sidekiq_unique_jobs/lock'
add_group 'Server', 'lib/sidekiq_unique_jobs/server'
Expand Down
1 change: 1 addition & 0 deletions Guardfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ guard :rspec, cmd: "env COV=false bundle exec rspec" do
rspec = dsl.rspec
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/unit/#{m[1]}_spec.rb" }
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/integration/#{m[1]}_spec.rb" }
watch(%r{^examples/(.+)\.rb$}) { |m| "spec/examples/#{m[1]}_spec.rb" }
watch(rspec.spec_helper) { rspec.spec_dir }
watch(rspec.spec_support) { rspec.spec_dir }
watch(rspec.spec_files)
Expand Down
2 changes: 1 addition & 1 deletion examples/until_and_while_executing_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class UntilAndWhileExecutingJob
include Sidekiq::Worker

sidekiq_options queue: :working, unique: :until_and_while_executing, lock_timeout: 1
sidekiq_options queue: :working, unique: :until_and_while_executing, lock_timeout: 0

def perform(one)
[one]
Expand Down
2 changes: 1 addition & 1 deletion examples/until_expired_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class UntilExpiredJob
include Sidekiq::Worker
sidekiq_options unique: :until_expired, lock_expiration: 10 * 60
sidekiq_options unique: :until_expired, lock_expiration: 1, lock_timeout: 0

def perform(one)
TestClass.run(one)
Expand Down
4 changes: 0 additions & 4 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ def console
end

no_commands do
def logger
SidekiqUniqueJobs.logger
end

def console_class
require 'pry'
Pry
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def unlock
end

def execute(callback)
return unless locked?
yield if block_given?
callback.call
end
Expand Down
6 changes: 2 additions & 4 deletions lib/sidekiq_unique_jobs/unique_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,12 @@ def filtered_args(args)
when Symbol
filter_by_symbol(json_args)
else
log_debug { "#{__method__} arguments not filtered (using all arguments for uniqueness)" }
log_debug("#{__method__} arguments not filtered (using all arguments for uniqueness)")
json_args
end
end

def filter_by_proc(args)
return args if unique_args_method.nil?

unique_args_method.call(args)
end

Expand All @@ -95,7 +93,7 @@ def filter_by_symbol(args)

worker_class.send(unique_args_method, args)
rescue ArgumentError => ex
log_fatal ex
log_fatal(ex)
args
end

Expand Down
4 changes: 0 additions & 4 deletions lib/sidekiq_unique_jobs/unlockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,5 @@ def delete(item)
SidekiqUniqueJobs::UniqueArgs.digest(item)
SidekiqUniqueJobs::Locksmith.new(item).delete!
end

def logger
SidekiqUniqueJobs.logger
end
end
end
5 changes: 0 additions & 5 deletions lib/sidekiq_unique_jobs/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ def current_time
Time.now
end

def prefix_keys(keys)
keys = Array(keys).compact
keys.map { |key| prefix(key) }
end

def prefix(key)
return key if unique_prefix.nil?
return key if key.start_with?("#{unique_prefix}:")
Expand Down
3 changes: 2 additions & 1 deletion spec/examples/until_expired_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
it_behaves_like 'sidekiq with options' do
let(:options) do
{
'lock_expiration' => 600,
'lock_expiration' => 1,
'lock_timeout' => 0,
'retry' => true,
'unique' => :until_expired,
}
Expand Down
106 changes: 106 additions & 0 deletions spec/integration/sidekiq_unique_jobs/lock/until_expired_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe SidekiqUniqueJobs::Lock::UntilExpired, redis: :redis do
include SidekiqHelpers

let(:process_one) { described_class.new(item_one) }
let(:process_two) { described_class.new(item_two) }

let(:jid_one) { 'jid one' }
let(:jid_two) { 'jid two' }
let(:worker_class) { UntilExpiredJob }
let(:unique) { :until_expired }
let(:queue) { :rejecting }
let(:args) { %w[array of arguments] }
let(:callback) { -> {} }
let(:item_one) do
{ 'jid' => jid_one,
'class' => worker_class.to_s,
'queue' => queue,
'unique' => unique,
'args' => args }
end
let(:item_two) do
{ 'jid' => jid_two,
'class' => worker_class.to_s,
'queue' => queue,
'unique' => unique,
'args' => args }
end

before do
allow(callback).to receive(:call).and_call_original
end

describe '#execute' do
it 'process one can be locked' do
expect(process_one.lock).to eq(jid_one)
expect(process_one.locked?).to eq(true)
end

context 'when process one has locked the job' do
before do
process_one.lock
end

it 'process two cannot achieve a lock' do
expect(process_two.lock).to eq(nil)
end

it 'process two cannot execute the lock' do
unset = true
process_two.execute(callback) do
unset = false
end

expect(unset).to eq(true)
end

it 'process one can execute the job' do
set = false
process_one.execute(callback) do
set = true
end

expect(set).to eq(true)
end

it 'the job is still locked after executing' do
process_one.execute(callback) {}

expect(process_one.locked?).to eq(true)
end

it 'calls back' do
process_one.execute(callback) do
# NO OP
end

expect(callback).to have_received(:call)
end

it 'callbacks are only called once (for the locked process)' do
process_one.execute(callback) do
process_two.execute(callback) {}
end

expect(callback).to have_received(:call).once
end
end
end

describe '#unlock' do
context 'when lock is locked' do
before { process_one.lock }

it 'keeps the lock even when unlocking' do
expect(process_one.unlock).to eq(true)
expect(process_one.locked?).to eq(true)
end
end

it { expect(process_one.unlock).to eq(true) }
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,27 @@
end
end

it 'moves subsequent jobs to dead queue' do
process_one.execute(callback) do
expect(dead_count).to eq(0)
expect { process_two.execute(callback) {} }
.to change { dead_count }.from(0).to(1)
shared_examples 'rejects job to deadset' do
it 'moves subsequent jobs to dead queue' do
process_one.execute(callback) do
expect(dead_count).to eq(0)
expect { process_two.execute(callback) {} }
.to change { dead_count }.from(0).to(1)
end
end
end

context 'when Sidekiq::DeadSet respond to kill' do
it_behaves_like 'rejects job to deadset'
end

context 'when Sidekiq::DeadSet does not respond to kill' do
before do
allow(process_two).to receive(:deadset_kill?).and_return(false)
end

it_behaves_like 'rejects job to deadset'
end
end
end
end
75 changes: 75 additions & 0 deletions spec/integration/sidekiq_unique_jobs/lock/while_executing_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe SidekiqUniqueJobs::Lock::WhileExecuting, redis: :redis do
include SidekiqHelpers

let(:process_one) { described_class.new(item_one) }
let(:process_two) { described_class.new(item_two) }

let(:jid_one) { 'jid one' }
let(:jid_two) { 'jid two' }
let(:worker_class) { WhileExecutingJob }
let(:unique) { :while_executing }
let(:queue) { :while_executing }
let(:args) { %w[array of arguments] }
let(:callback) { -> {} }
let(:item_one) do
{ 'jid' => jid_one,
'class' => worker_class.to_s,
'queue' => queue,
'unique' => unique,
'args' => args }
end
let(:item_two) do
{ 'jid' => jid_two,
'class' => worker_class.to_s,
'queue' => queue,
'unique' => unique,
'args' => args }
end

before do
allow(callback).to receive(:call).and_call_original
end

describe '#execute' do
it 'does not lock jobs' do
expect(process_one.lock).to eq(true)
expect(process_one.locked?).to eq(false)

expect(process_two.lock).to eq(true)
expect(process_two.locked?).to eq(false)
end

context 'when job is executing' do
it 'locks the process' do
process_one.execute(callback) do
expect(process_one.locked?).to eq(true)
end
end

it 'calls back' do
process_one.execute(callback) do
# NO OP
end
expect(callback).to have_received(:call)
end

it 'prevents other processes from executing' do
process_one.execute(callback) do
expect(process_two.lock).to eq(true)
expect(process_two.locked?).to eq(false)
unset = true
process_two.execute(callback) do
unset = false
end
expect(unset).to eq(true)
end

expect(callback).to have_received(:call).once
end
end
end
end
Loading