Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore #4

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ GEM
daemons (1.1.8)
diff-lcs (1.1.3)
eventmachine (0.12.10)
json (1.7.3)
json (1.7.4)
rake (0.9.2.2)
rspec (2.10.0)
rspec-core (~> 2.10.0)
rspec-expectations (~> 2.10.0)
rspec-mocks (~> 2.10.0)
rspec-core (2.10.1)
rspec-expectations (2.10.0)
rspec (2.11.0)
rspec-core (~> 2.11.0)
rspec-expectations (~> 2.11.0)
rspec-mocks (~> 2.11.0)
rspec-core (2.11.1)
rspec-expectations (2.11.2)
diff-lcs (~> 1.1.3)
rspec-mocks (2.10.1)
rspec-mocks (2.11.1)

PLATFORMS
ruby
Expand Down
6 changes: 6 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ Options:
- :timeout => The number of seconds to wait for a lock to become available (default: wait forever).
- :queue_max => If the lock queue length is greater than :queue_max then don't wait for the lock (default: infinite).

#### How to use n Semaphore

To allow multiple clients to use the same lock you can use n Semaphore functionality.
Officer can use a n Semaphore in stead of a Mutex by adding #n to the lock name.

client.lock 'some_lock_name#2' # will create a 2 semaphore

### Unlock

Expand Down
1 change: 1 addition & 0 deletions lib/officer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
require 'officer/runner'
require 'officer/server'
require 'officer/client'
require 'officer/semaphore'
4 changes: 2 additions & 2 deletions lib/officer/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def unbind
end

module LockStoreCallbacks
def acquired name
def acquired name, lock_id
@timers.delete(name).cancel if @timers[name]

send_result 'acquired', :name => name
send_result 'acquired', :name => name, :id => lock_id.to_s
end

def already_acquired name
Expand Down
95 changes: 73 additions & 22 deletions lib/officer/lock_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ def to_host_a

class Lock
attr_reader :name
attr_reader :size
attr_reader :queue
attr_reader :lock_ids

def initialize name
def initialize name, size = 1
@name = name
@size = size.to_i
@lock_ids = (0..@size-1).to_a
@queue = LockQueue.new
end
end
Expand All @@ -22,7 +26,9 @@ class LockStore
def initialize
@locks = {} # name => Lock
@connections = {} # Connection => Set(name, ...)
@locked_connections = {} # {lock_id => Connection}
@acquire_counter = 0
@mutex = Mutex.new
end

def log_state
Expand Down Expand Up @@ -52,6 +58,8 @@ def log_state
end

def acquire name, connection, options={}
name, size = split_name(name)

if options[:queue_max]
lock = @locks[name]

Expand All @@ -63,57 +71,68 @@ def acquire name, connection, options={}

@acquire_counter += 1

lock = @locks[name] ||= Lock.new(name)
lock = @locks[name] ||= Lock.new(name, size)

if lock.queue.include? connection
lock.queue.first == connection ? connection.already_acquired(name) : connection.queued(name, options)
if lock.queue[0..lock.size-1].include?(connection)
return connection.already_acquired(name)
end

if lock.queue.count < lock.size
lock.queue << connection
lock_id = lock_connection(connection, lock)
(@connections[connection] ||= Set.new) << name

connection.acquired name, lock_id
else
lock.queue << connection
(@connections[connection] ||= Set.new) << name

lock.queue.count == 1 ? connection.acquired(name) : connection.queued(name, options)
connection.queued(name, options)
end
end

def release name, connection, options={}
name, size = split_name(name)

if options[:callback].nil?
options[:callback] = true
end

lock = @locks[name]
names = @connections[connection]

# Client should only be able to release a lock that
# exists and that it has previously queued.
if lock.nil? || !names.include?(name)
if lock.nil? || names.nil? || !names.include?(name)
connection.release_failed(name) if options[:callback]
return
end

# If connecton has the lock, release it and let the next
# connection know that it has acquired the lock.
if lock.queue.first == connection
lock.queue.shift
connection.released name if options[:callback]
if index = lock.queue.index(connection)
if index < lock.size
lock.queue.delete_at(index)
release_connection(connection, lock)

connection.released name if options[:callback]

if next_connection = lock.queue[lock.size-1]
lock_id = lock_connection(next_connection, lock)
next_connection.acquired name, lock_id
end
@locks.delete name if lock.queue.count == 0

if next_connection = lock.queue.first
next_connection.acquired name
# If the connection is queued and doesn't have the lock,
# dequeue it and leave the other connections alone.
else
@locks.delete name
lock.queue.delete connection
connection.released name
end

# If the connection is queued and doesn't have the lock,
# dequeue it and leave the other connections alone.
else
lock.queue.delete connection
connection.released name
names.delete name
end

names.delete name
end

def reset connection
# names = @connections[connection] || []
names = @connections[connection] || Set.new

names.each do |name|
Expand All @@ -125,6 +144,8 @@ def reset connection
end

def timeout name, connection
name, size = split_name(name)

lock = @locks[name]
names = @connections[connection]

Expand Down Expand Up @@ -169,6 +190,36 @@ def close_idle_connections(max_idle)
end
end
end

protected
def split_name(name)
if name.include?("#")
name_array = name.split("#")
size = (name_array.last.to_i > 0 && name_array.size > 1) ? name_array.last.to_i : 1
else
size = 1
end
[name, size]
end

def lock_connection(connection, lock)
@mutex.synchronize do
@locked_connections[lock.lock_ids.first] = connection
lock_id = lock.lock_ids.shift
lock.lock_ids.push(lock_id)
lock_id
end
end

def release_connection(connection, lock)
@mutex.synchronize do
if lock_id = @locked_connections.key(connection)
lock.lock_ids.delete(lock_id)
lock.lock_ids.insert(0, lock_id)
lock_id
end
end
end
end

end
27 changes: 27 additions & 0 deletions lib/officer/semaphore.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Officer
class Semaphore
attr_accessor :size

def initialize(host, port, name, size)
size ||= 1

@client = Officer::Client.new(:host => host, :port => port)
@name = "#{name}##{size}"
@size = size
end

def lock
@client.lock(@name)
end

def unlock
@client.unlock(@name)
end

def synchronize
@client.with_lock @name do
yield
end
end
end
end
2 changes: 1 addition & 1 deletion lib/officer/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Officer
VERSION = "0.10.1"
VERSION = "0.10.2"
end
65 changes: 63 additions & 2 deletions spec/integration/officer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,14 @@
end

it "should allow a client to request and release a lock" do
@client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock"})
@client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock", "id" => "0"})
@client.my_locks.should eq({"value"=>["testlock"], "result"=>"my_locks"})
@client.unlock("testlock")
@client.my_locks.should eq({"value"=>[], "result"=>"my_locks"})
end

it "should inform the client they already have a lock if they previously locked it" do
@client.lock("testlock")
@client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock", "id" => "0"})
@client.lock("testlock").should eq({"result" => "already_acquired", "name" => "testlock"})
end

Expand Down Expand Up @@ -309,5 +309,66 @@
JSON.parse(@socket.gets("\n").chomp).should eq({"result" => "released", "name" => "testlock"})
end
end

describe "NEW: server support for multiple locks" do
before do
@client1 = Officer::Client.new
@client2 = Officer::Client.new
@client3 = Officer::Client.new

@client1_src_port = @client1.instance_variable_get('@socket').addr[1]
@client2_src_port = @client2.instance_variable_get('@socket').addr[1]
@client3_src_port = @client2.instance_variable_get('@socket').addr[1]
end

after do
@client1.send("disconnect")
@client1 = nil
@client2.send("disconnect")
@client2 = nil
@client3.send("disconnect")
@client3 = nil
end

it "should allow a client to obtaining and releasing the number of allowed locks" do
@client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"})
@client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"})
@client1.locks.should eq({"value"=>{"testlock#2"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]}, "result"=>"locks"})
@client2.locks.should eq({"value"=>{"testlock#2"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]}, "result"=>"locks"})
@client1.my_locks.should eq({"value"=>["testlock#2"], "result"=>"my_locks"})
@client2.my_locks.should eq({"value"=>["testlock#2"], "result"=>"my_locks"})
@client1.unlock("testlock#2")
@client2.unlock("testlock#2")
@client1.locks.should eq({"value"=>{}, "result"=>"locks"})
end

it "should not allow a client to request a lock that is already acquired" do
@client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"})
@client1.lock("testlock#2").should eq({"result" => "already_acquired", "name" => "testlock#2"})
@client2.unlock("testlock#2")
end

it "should allow timeout if number of allowed connections is reached" do
@client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"})
@client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"})
@client3.lock("testlock#2", :timeout => 0).should eq(
{"result"=>"timed_out", "name"=>"testlock#2", "queue"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]}
)
end

it "should queue a connection if number of allowed connections is reached and allow connection if a lock is released" do
t = Thread.new do
@client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"})
@client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"})
@client3.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"})
end

t2 = Thread.new do
sleep 2
@client2.unlock("testlock#2")
end
t.join
end
end
end
end