Skip to content

Commit

Permalink
restrict the scope of rescue timeout to redis operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Jul 11, 2020
1 parent 632267e commit 0a19c91
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -258,27 +258,34 @@ defmodule Exq.Manager.Server do
{state, state.poll_timeout}

{queues, state} ->
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)

{state, job_results} =
Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} ->
{state, result} = dispatch_job(state, potential_job)
{state, [result | results]}
end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end)
result =
rescue_timeout(:timeout, fn ->
Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)
end)

case result do
:timeout ->
{state, state.poll_timeout}

jobs ->
{state, job_results} =
Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} ->
{state, result} = dispatch_job(state, potential_job)
{state, [result | results]}
end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end
end
end

Expand Down

0 comments on commit 0a19c91

Please sign in to comment.