Skip to content

Commit

Permalink
Notifier waits to retry listening when database is unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jul 19, 2021
1 parent 7f0fc11 commit cbe1480
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
1 change: 1 addition & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Job < Object.const_get(GoodJob.active_record_parent_class)
DEFAULT_PRIORITY = 0

self.table_name = 'good_jobs'.freeze
self.advisory_lockable_column = 'id'.freeze

attr_readonly :serialized_params

Expand Down
14 changes: 11 additions & 3 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Notifier
max_queue: 1,
fallback_policy: :discard,
}.freeze
# Seconds to wait if database cannot be connected to
RECONNECT_INTERVAL = 5
# Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1

Expand Down Expand Up @@ -114,7 +116,13 @@ def listen_observer(_time, _result, thread_error)
ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error })
end

listen unless shutdown?
return if shutdown?

if thread_error.is_a?(ActiveRecord::ConnectionNotEstablished) || thread_error.is_a?(ActiveRecord::StatementInvalid)
listen(delay: RECONNECT_INTERVAL)
else
listen
end
end

private
Expand All @@ -125,8 +133,8 @@ def create_executor
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end

def listen
future = Concurrent::Future.new(args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
Expand Down

0 comments on commit cbe1480

Please sign in to comment.