Skip to content

Commit

Permalink
Refactor the reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdanRada committed Mar 26, 2021
1 parent ff290a3 commit 74da290
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ rvm:
- 2.5.3
- 2.6.5
- 2.7.1
- 3.0.0
# - 3.0.0 - celluloid/io doesn't work
- jruby-19mode
- jruby-head

Expand Down
4 changes: 2 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ task :all do |_t|
# require 'json'
# puts JSON.pretty_generate(ENV.to_hash)
if ENV['BUNDLE_GEMFILE'] =~ /gemfiles/
appraisal_name = ENV['BUNDLE_GEMFILE'].scan(/rails\_(.*)\.gemfile/).flatten.first
command_prefix = "appraisal rails-#{appraisal_name}"
appraisal_name = ENV['BUNDLE_GEMFILE'].scan(/celluloid\_(.*)\.gemfile/).flatten.first
command_prefix = "appraisal celluloid-#{appraisal_name}"
exec ("#{command_prefix} bundle install && #{command_prefix} bundle exec rspec && bundle exec rake coveralls:push ")
else
exec(' bundle exec appraisal install && bundle exec rake appraisal spec && bundle exec rake coveralls:push')
Expand Down
1 change: 1 addition & 0 deletions celluloid_pubsub.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'celluloid-websocket-client', '~> 0.0', '>= 0.0.1'
s.add_runtime_dependency 'celluloid-pmap', '~> 1.0'
s.add_runtime_dependency 'activesupport', '>= 4.0', '>= 4.0'
s.add_runtime_dependency 'timers', '~> 4.3'

s.add_development_dependency 'appraisal', '~> 2.1', '>= 2.1'
s.add_development_dependency 'rspec', '~> 3.5', '>= 3.5'
Expand Down
14 changes: 11 additions & 3 deletions examples/simple_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,17 @@ def on_close(code, reason)
# please don't use the BaseActor class to supervise actors. This is subject to change . This class is used only to test backward compatibility.
# For more information on how to supervise actors please see Celluloid wiki.
CelluloidPubsub::BaseActor.setup_actor_supervision(CelluloidPubsub::WebServer, actor_name: :web_server, args: {enable_debug: debug_enabled, spy: debug_enabled, adapter: nil,log_file_path: log_file_path, log_level: log_level })
CelluloidPubsub::BaseActor.setup_actor_supervision(FirstActor, actor_name: :first_actor, args: { enable_debug: debug_enabled, log_level: log_level })
sleep(0.1) until CelluloidPubsub::Registry.channels.include?('test_channel')
CelluloidPubsub::BaseActor.setup_actor_supervision(SecondActor, actor_name: :second_actor, args: { enable_debug: debug_enabled, log_level: log_level})
if ENV['RUN_IN_PARALLEL'].nil? || ENV['RUN_IN_PARALLEL'].to_s.downcase == 'true'
[
{ class: FirstActor, name: :first_actor },
{ class: SecondActor, name: :second_actor }
].pmap do |hash|
CelluloidPubsub::BaseActor.setup_actor_supervision(hash[:class], actor_name: hash[:name], args: { enable_debug: debug_enabled, log_level: log_level })
end
else
CelluloidPubsub::BaseActor.setup_actor_supervision(FirstActor, actor_name: :first_actor, args: { enable_debug: debug_enabled, log_level: log_level })
CelluloidPubsub::BaseActor.setup_actor_supervision(SecondActor, actor_name: :second_actor, args: { enable_debug: debug_enabled, log_level: log_level})
end

signal_received = false

Expand Down
1 change: 1 addition & 0 deletions lib/celluloid_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require 'active_support/all'
require 'json'
require 'thread'
require 'timers'
require 'celluloid/pmap'
require 'celluloid_pubsub/base_actor'
require 'reel'
Expand Down
9 changes: 3 additions & 6 deletions lib/celluloid_pubsub/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ def on_open
def on_message(data)
message = JSON.parse(data)
log_debug("#{@actor.class} received JSON #{message}")
if succesfull_subscription?(message)
send_action('successfully_registered', message['channel'])
end
if @actor.respond_to?(:async)
@actor.async.on_message(message)
else
Expand All @@ -242,14 +239,14 @@ def on_message(data)
#
# @api public
def on_close(code, reason)
connection.terminate
terminate
log_debug("#{@actor.class} dispatching on close #{code} #{reason}")
log_debug("#{self.class} dispatching on close #{code} #{reason}")
if @actor.respond_to?(:async)
@actor.async.on_close(code, reason)
else
@actor.on_close(code, reason)
end
connection.terminate
terminate
end

private
Expand Down
6 changes: 5 additions & 1 deletion lib/celluloid_pubsub/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def setup_celluloid_logger
def setup_celluloid_exception_handler
Celluloid.task_class = defined?(Celluloid::TaskThread) ? Celluloid::TaskThread : Celluloid::Task::Threaded
Celluloid.exception_handler do |ex|
puts ex unless filtered_error?(ex)
unless filtered_error?(ex)
puts ex
puts ex.backtrace
puts ex.cause
end
end
end

Expand Down
42 changes: 17 additions & 25 deletions lib/celluloid_pubsub/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Reactor
include CelluloidPubsub::BaseActor

# available actions that can be delegated
AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish successfully_registered unsubscribe_all].freeze
AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze

# The websocket connection received from the server
# @return [Reel::WebSocket] websocket connection
Expand Down Expand Up @@ -306,19 +306,13 @@ def subscribe(channel, message)
# to that channel before the actor subscribed
#
# @param [String] channel
# @param [Object] _json_data
#
# @return [void]
#
# @api public
def successfully_registered(channel, _json_data)
@server.mutex.synchronize do
if (messages = unpublished_messages(channel)).present?
messages.pmap do |message|
@websocket << message
end
clear_unpublished_messages(channel)
end
def successfully_registered(channel)
return if (messages = unpublished_messages(channel)).blank?
messages.each do |msg|
@websocket << msg.to_json
end
end

Expand Down Expand Up @@ -399,14 +393,12 @@ def publish(current_topic, json_data)
#
# @api public
def server_publish_event(current_topic, message)
@server.mutex.synchronize do
if @server.subscribers[current_topic].present?
(@server.subscribers[current_topic] || []).dup.pmap do |hash|
hash[:reactor].websocket << message
end
else
save_unpublished_message(channel, message)
if (subscribers = @server.subscribers[current_topic]).present?
subscribers.dup.pmap do |hash|
hash[:reactor].websocket << message
end
else
save_unpublished_message(current_topic, message)
end
end

Expand All @@ -419,7 +411,9 @@ def server_publish_event(current_topic, message)
#
# @api public
def save_unpublished_message(current_topic, message)
(CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
@server.timers_mutex.synchronize do
(CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
end
end

# unsubscribes all actors from all channels and terminates the current actor
Expand Down Expand Up @@ -457,12 +451,10 @@ def unsubscribe_from_channel(channel)
#
# @api public
def server_kill_reactors(channel)
@server.mutex.synchronize do
(@server.subscribers[channel] || []).dup.pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
end
(@server.subscribers[channel] || []).dup.pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
end
end
end
Expand Down
35 changes: 29 additions & 6 deletions lib/celluloid_pubsub/web_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class WebServer < Reel::Server::HTTP
# The name of the default adapter
CLASSIC_ADAPTER = 'classic'

attr_accessor :server_options, :subscribers, :mutex
attr_accessor :server_options, :subscribers, :mutex, :timers_mutex

finalizer :shutdown
# receives a list of options that are used to configure the webserver
Expand All @@ -49,6 +49,7 @@ def initialize(options = {})
@server_options = parse_options(options)
@subscribers = {}
@mutex = Mutex.new
@timers_mutex = Mutex.new
setup_celluloid_logger
log_debug "CelluloidPubsub::WebServer example starting on #{hostname}:#{port}"
super(hostname, port, { spy: spy, backlog: backlog }, &method(:on_connection))
Expand Down Expand Up @@ -95,11 +96,11 @@ def self.socket_families
# @api public
def self.find_unused_port
@@unused_port ||= begin
socket = open_socket_on_unused_port
port = socket.addr[1]
socket.close
port
end
socket = open_socket_on_unused_port
port = socket.addr[1]
socket.close
port
end
end
# rubocop:enable Style/ClassVars

Expand All @@ -109,9 +110,15 @@ def self.find_unused_port
# @api public
def run
@spy = Celluloid.logger if spy
async.bind_timers
loop { async.handle_connection @server.accept }
end

def bind_timers(run = false)
try_sending_unpublished if run
after(0.1) { bind_timers(true) }
end

# the method will return true if redis can be used otherwise false
#
#
Expand Down Expand Up @@ -304,6 +311,22 @@ def route_websocket(reactor, socket)
socket.close
end
end
# this method will know when a client has successfully registered
# and will write to the socket all messages that were published
# to that channel before the actor subscribed
#
# @return [void]
#
# @api publicsCelluloidPubsub::Registry.messages
def try_sending_unpublished
CelluloidPubsub::Registry.messages.keys.each do |channel|
next if (clients = subscribers[channel]).blank?
clients.pmap do |hash|
hash[:reactor].successfully_registered(channel)
end
clients.last[:reactor].clear_unpublished_messages(channel)
end
end

# If the message can be parsed into a Hash it will respond to the reactor's websocket connection with the same message in JSON format
# otherwise will try send the message how it is and escaped into JSON format
Expand Down

0 comments on commit 74da290

Please sign in to comment.