Tusk is a minimal pub / sub system with multiple observer strategies. Tusk builds upon the Observer API from stdlib in order to provide a mostly consistent API for building cross thread or process pub / sub systems.
Currently, Tusk supports Redis and PostgreSQL as message bus back ends.
- Send message across processes
- Supports Redis as a message bus
- Supports PostgreSQL as a message bus
- Supports DRb as a message bus
Here is an in-memory observer example:
require 'observer'
class Timer
include Observable
def tick
changed
notify_observers
end
end
class Listener
def update; puts "got update"; end
end
timer = Timer.new
timer.add_observer Listener.new
loop { timer.tick; sleep 1; }
The down side of this example is that the Listener cannot be in a different process. We can move the Listener to a different process by using the Redis observable mixin and providing a redis connection:
require 'tusk/observable/redis'
require 'redis'
class Timer
include Tusk::Observable::Redis
def tick
changed
notify_observers
end
def connection
Thread.current[:redis] ||= ::Redis.new
end
end
class Listener
def update; puts "got update PID: #{$$}"; end
end
timer = Timer.new
fork {
timer.add_observer Listener.new
sleep
}
loop { timer.tick; sleep 1; }
PostgreSQL can also be used as the message bus:
require 'tusk/observable/pg'
require 'pg'
class Timer
include Tusk::Observable::PG
def tick
changed
notify_observers
end
def connection
Thread.current[:pg] ||= ::PG::Connection.new :dbname => 'postgres'
end
end
class Listener
def update; puts "got update PID: #{$$}"; end
end
timer = Timer.new
fork {
timer.add_observer Listener.new
sleep
}
loop { timer.tick; sleep 1; }
We can easily integrate Tusk with Active Record. Here is a User model that sends notifications when a user is created:
require 'tusk/observable/pg'
class User < ActiveRecord::Base
attr_accessible :name
extend Tusk::Observable::PG
# After users are created, notify the message bus
after_create :notify_observers
# Listeners will use the table name as the bus channel
def self.channel
table_name
end
private
def notify_observers
self.class.changed
self.class.notify_observers
end
end
The table name is used as the channel name where objects will listen. Here is a producer script:
require 'user'
loop do
User.create!(:name => 'testing')
sleep 1
end
Our consumer adds an observer to the User class:
require 'user'
class UserListener
def initialize
super
@last_id = 0
end
def update
users = User.where('id > ?', @last_id).sort_by(&:id)
@last_id = users.last.id
users.each { |u| p "user created: #{u.id}" }
end
end
User.add_observer UserListener.new
# Put the main thread to sleep
sleep
Whenever a user gets created, our consumer listener will be notified.
- PostgreSQL or Redis
- gem install tusk
(The MIT License)
Copyright (c) 2012 Aaron Patterson
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.