diff --git a/.travis.yml b/.travis.yml index aff278d..b859aa3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ rvm: - 2.5.3 - 2.6.5 - 2.7.1 - - 3.0.0 + # - 3.0.0 - celluloid/io doesn't work - jruby-19mode - jruby-head diff --git a/Rakefile b/Rakefile index 301566b..29c0fc5 100644 --- a/Rakefile +++ b/Rakefile @@ -20,8 +20,8 @@ task :all do |_t| # require 'json' # puts JSON.pretty_generate(ENV.to_hash) if ENV['BUNDLE_GEMFILE'] =~ /gemfiles/ - appraisal_name = ENV['BUNDLE_GEMFILE'].scan(/rails\_(.*)\.gemfile/).flatten.first - command_prefix = "appraisal rails-#{appraisal_name}" + appraisal_name = ENV['BUNDLE_GEMFILE'].scan(/celluloid\_(.*)\.gemfile/).flatten.first + command_prefix = "appraisal celluloid-#{appraisal_name}" exec ("#{command_prefix} bundle install && #{command_prefix} bundle exec rspec && bundle exec rake coveralls:push ") else exec(' bundle exec appraisal install && bundle exec rake appraisal spec && bundle exec rake coveralls:push') diff --git a/celluloid_pubsub.gemspec b/celluloid_pubsub.gemspec index 22c81b4..2188a58 100644 --- a/celluloid_pubsub.gemspec +++ b/celluloid_pubsub.gemspec @@ -26,6 +26,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'celluloid-websocket-client', '~> 0.0', '>= 0.0.1' s.add_runtime_dependency 'celluloid-pmap', '~> 1.0' s.add_runtime_dependency 'activesupport', '>= 4.0', '>= 4.0' + s.add_runtime_dependency 'timers', '~> 4.3' s.add_development_dependency 'appraisal', '~> 2.1', '>= 2.1' s.add_development_dependency 'rspec', '~> 3.5', '>= 3.5' diff --git a/examples/simple_test.rb b/examples/simple_test.rb index c2a14f1..28d21a2 100644 --- a/examples/simple_test.rb +++ b/examples/simple_test.rb @@ -58,9 +58,17 @@ def on_close(code, reason) # please don't use the BaseActor class to supervise actors. This is subject to change . This class is used only to test backward compatibility. # For more information on how to supervise actors please see Celluloid wiki. CelluloidPubsub::BaseActor.setup_actor_supervision(CelluloidPubsub::WebServer, actor_name: :web_server, args: {enable_debug: debug_enabled, spy: debug_enabled, adapter: nil,log_file_path: log_file_path, log_level: log_level }) -CelluloidPubsub::BaseActor.setup_actor_supervision(FirstActor, actor_name: :first_actor, args: { enable_debug: debug_enabled, log_level: log_level }) -sleep(0.1) until CelluloidPubsub::Registry.channels.include?('test_channel') -CelluloidPubsub::BaseActor.setup_actor_supervision(SecondActor, actor_name: :second_actor, args: { enable_debug: debug_enabled, log_level: log_level}) +if ENV['RUN_IN_PARALLEL'].nil? || ENV['RUN_IN_PARALLEL'].to_s.downcase == 'true' + [ + { class: FirstActor, name: :first_actor }, + { class: SecondActor, name: :second_actor } + ].pmap do |hash| + CelluloidPubsub::BaseActor.setup_actor_supervision(hash[:class], actor_name: hash[:name], args: { enable_debug: debug_enabled, log_level: log_level }) + end +else + CelluloidPubsub::BaseActor.setup_actor_supervision(FirstActor, actor_name: :first_actor, args: { enable_debug: debug_enabled, log_level: log_level }) + CelluloidPubsub::BaseActor.setup_actor_supervision(SecondActor, actor_name: :second_actor, args: { enable_debug: debug_enabled, log_level: log_level}) +end signal_received = false diff --git a/lib/celluloid_pubsub.rb b/lib/celluloid_pubsub.rb index c54b80e..3e873dd 100644 --- a/lib/celluloid_pubsub.rb +++ b/lib/celluloid_pubsub.rb @@ -9,6 +9,7 @@ require 'active_support/all' require 'json' require 'thread' +require 'timers' require 'celluloid/pmap' require 'celluloid_pubsub/base_actor' require 'reel' diff --git a/lib/celluloid_pubsub/client.rb b/lib/celluloid_pubsub/client.rb index 0501d1f..8237afe 100644 --- a/lib/celluloid_pubsub/client.rb +++ b/lib/celluloid_pubsub/client.rb @@ -222,9 +222,6 @@ def on_open def on_message(data) message = JSON.parse(data) log_debug("#{@actor.class} received JSON #{message}") - if succesfull_subscription?(message) - send_action('successfully_registered', message['channel']) - end if @actor.respond_to?(:async) @actor.async.on_message(message) else @@ -242,14 +239,14 @@ def on_message(data) # # @api public def on_close(code, reason) - connection.terminate - terminate - log_debug("#{@actor.class} dispatching on close #{code} #{reason}") + log_debug("#{self.class} dispatching on close #{code} #{reason}") if @actor.respond_to?(:async) @actor.async.on_close(code, reason) else @actor.on_close(code, reason) end + connection.terminate + terminate end private diff --git a/lib/celluloid_pubsub/helper.rb b/lib/celluloid_pubsub/helper.rb index b003738..43519de 100644 --- a/lib/celluloid_pubsub/helper.rb +++ b/lib/celluloid_pubsub/helper.rb @@ -109,7 +109,11 @@ def setup_celluloid_logger def setup_celluloid_exception_handler Celluloid.task_class = defined?(Celluloid::TaskThread) ? Celluloid::TaskThread : Celluloid::Task::Threaded Celluloid.exception_handler do |ex| - puts ex unless filtered_error?(ex) + unless filtered_error?(ex) + puts ex + puts ex.backtrace + puts ex.cause + end end end diff --git a/lib/celluloid_pubsub/reactor.rb b/lib/celluloid_pubsub/reactor.rb index 1ac5b1f..720c80d 100644 --- a/lib/celluloid_pubsub/reactor.rb +++ b/lib/celluloid_pubsub/reactor.rb @@ -19,7 +19,7 @@ class Reactor include CelluloidPubsub::BaseActor # available actions that can be delegated - AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish successfully_registered unsubscribe_all].freeze + AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze # The websocket connection received from the server # @return [Reel::WebSocket] websocket connection @@ -306,19 +306,13 @@ def subscribe(channel, message) # to that channel before the actor subscribed # # @param [String] channel - # @param [Object] _json_data - # # @return [void] # # @api public - def successfully_registered(channel, _json_data) - @server.mutex.synchronize do - if (messages = unpublished_messages(channel)).present? - messages.pmap do |message| - @websocket << message - end - clear_unpublished_messages(channel) - end + def successfully_registered(channel) + return if (messages = unpublished_messages(channel)).blank? + messages.each do |msg| + @websocket << msg.to_json end end @@ -399,14 +393,12 @@ def publish(current_topic, json_data) # # @api public def server_publish_event(current_topic, message) - @server.mutex.synchronize do - if @server.subscribers[current_topic].present? - (@server.subscribers[current_topic] || []).dup.pmap do |hash| - hash[:reactor].websocket << message - end - else - save_unpublished_message(channel, message) + if (subscribers = @server.subscribers[current_topic]).present? + subscribers.dup.pmap do |hash| + hash[:reactor].websocket << message end + else + save_unpublished_message(current_topic, message) end end @@ -419,7 +411,9 @@ def server_publish_event(current_topic, message) # # @api public def save_unpublished_message(current_topic, message) - (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message + @server.timers_mutex.synchronize do + (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message + end end # unsubscribes all actors from all channels and terminates the current actor @@ -457,12 +451,10 @@ def unsubscribe_from_channel(channel) # # @api public def server_kill_reactors(channel) - @server.mutex.synchronize do - (@server.subscribers[channel] || []).dup.pmap do |hash| - reactor = hash[:reactor] - reactor.websocket.close - Celluloid::Actor.kill(reactor) - end + (@server.subscribers[channel] || []).dup.pmap do |hash| + reactor = hash[:reactor] + reactor.websocket.close + Celluloid::Actor.kill(reactor) end end end diff --git a/lib/celluloid_pubsub/web_server.rb b/lib/celluloid_pubsub/web_server.rb index 4b41e34..4d5bf09 100644 --- a/lib/celluloid_pubsub/web_server.rb +++ b/lib/celluloid_pubsub/web_server.rb @@ -28,7 +28,7 @@ class WebServer < Reel::Server::HTTP # The name of the default adapter CLASSIC_ADAPTER = 'classic' - attr_accessor :server_options, :subscribers, :mutex + attr_accessor :server_options, :subscribers, :mutex, :timers_mutex finalizer :shutdown # receives a list of options that are used to configure the webserver @@ -49,6 +49,7 @@ def initialize(options = {}) @server_options = parse_options(options) @subscribers = {} @mutex = Mutex.new + @timers_mutex = Mutex.new setup_celluloid_logger log_debug "CelluloidPubsub::WebServer example starting on #{hostname}:#{port}" super(hostname, port, { spy: spy, backlog: backlog }, &method(:on_connection)) @@ -95,11 +96,11 @@ def self.socket_families # @api public def self.find_unused_port @@unused_port ||= begin - socket = open_socket_on_unused_port - port = socket.addr[1] - socket.close - port - end + socket = open_socket_on_unused_port + port = socket.addr[1] + socket.close + port + end end # rubocop:enable Style/ClassVars @@ -109,9 +110,15 @@ def self.find_unused_port # @api public def run @spy = Celluloid.logger if spy + async.bind_timers loop { async.handle_connection @server.accept } end + def bind_timers(run = false) + try_sending_unpublished if run + after(0.1) { bind_timers(true) } + end + # the method will return true if redis can be used otherwise false # # @@ -304,6 +311,22 @@ def route_websocket(reactor, socket) socket.close end end + # this method will know when a client has successfully registered + # and will write to the socket all messages that were published + # to that channel before the actor subscribed + # + # @return [void] + # + # @api publicsCelluloidPubsub::Registry.messages + def try_sending_unpublished + CelluloidPubsub::Registry.messages.keys.each do |channel| + next if (clients = subscribers[channel]).blank? + clients.pmap do |hash| + hash[:reactor].successfully_registered(channel) + end + clients.last[:reactor].clear_unpublished_messages(channel) + end + end # If the message can be parsed into a Hash it will respond to the reactor's websocket connection with the same message in JSON format # otherwise will try send the message how it is and escaped into JSON format