Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to control the dequeue #421

Merged
merged 9 commits into from
Aug 21, 2020
41 changes: 41 additions & 0 deletions lib/exq/dequeue/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Exq.Dequeue.Behaviour do
@moduledoc """
Custom concurreny or rate limiting at a queue level can be achieved
by implementing the Dequeue behaviour

The following config can be used to customize dequeue behaviour for a queue

config :exq,
queues: [{"default", {RateLimiter, options}}]

RateLimiter module should implement `Exq.Dequeue.Behaviour`. The
options supplied here would be passed as the second argument to the
`c:init/2` function.

### Life cycle

`c:init/2` will be invoked on initialization. The first argument will contain info
like queue and the second argument is user configurable.

`c:available?/1` will be invoked before each poll. If the
returned value contains `false` as the second element of the tuple,
the queue will not polled

`c:dispatched/1` will be invoked once a job is dispatched to the worker

`c:processed/1` will be invoked if a job completed successfully

`c:failed/1` will be invoked if a job failed

`c:stop/1` will be invoked when a queue is unsubscribed or before the
node terminates. Note: there is no guarantee this will be invoked if
the node terminates abruptly
"""

@callback init(info :: %{queue: String.t()}, args :: term) :: {:ok, term}
@callback stop(state :: term) :: :ok
@callback available?(state :: term) :: {:ok, boolean, term}
@callback dispatched(state :: term) :: {:ok, term}
@callback processed(state :: term) :: {:ok, term}
@callback failed(state :: term) :: {:ok, term}
end
29 changes: 29 additions & 0 deletions lib/exq/dequeue/local.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Exq.Dequeue.Local do
@behaviour Exq.Dequeue.Behaviour

defmodule State do
@moduledoc false

defstruct max: nil, current: 0
end

@impl true
def init(_, options) do
{:ok, %State{max: Keyword.fetch!(options, :concurrency)}}
end

@impl true
def stop(_), do: :ok

@impl true
def available?(state), do: {:ok, state.current < state.max, state}

@impl true
def dispatched(state), do: {:ok, %{state | current: state.current + 1}}

@impl true
def processed(state), do: {:ok, %{state | current: state.current - 1}}

@impl true
def failed(state), do: {:ok, %{state | current: state.current - 1}}
end
165 changes: 103 additions & 62 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ defmodule Exq.Manager.Server do
require Logger
use GenServer
alias Exq.Support.Config
alias Exq.Support.Opts
alias Exq.Redis.JobQueue

@backoff_mult 10
Expand All @@ -122,7 +123,7 @@ defmodule Exq.Manager.Server do
pid: nil,
node_id: nil,
namespace: nil,
work_table: nil,
dequeuers: nil,
queues: nil,
poll_timeout: nil,
scheduler_poll_timeout: nil,
Expand All @@ -135,8 +136,8 @@ defmodule Exq.Manager.Server do
GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
end

def job_terminated(exq, namespace, queue, job_serialized) do
GenServer.cast(exq, {:job_terminated, namespace, queue, job_serialized})
def job_terminated(exq, queue, success) do
GenServer.cast(exq, {:job_terminated, queue, success})
:ok
end

Expand All @@ -151,11 +152,11 @@ defmodule Exq.Manager.Server do
# Cleanup stale stats
GenServer.cast(self(), :cleanup_host_stats)

# Setup queues
work_table = setup_queues(opts)
# Setup dequeues
dequeuers = add_dequeuers(%{}, opts[:concurrency])

state = %State{
work_table: work_table,
dequeuers: dequeuers,
redis: opts[:redis],
stats: opts[:stats],
workers_sup: opts[:workers_sup],
Expand Down Expand Up @@ -221,9 +222,15 @@ defmodule Exq.Manager.Server do
{:noreply, state, 0}
end

def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do
update_worker_count(state.work_table, queue, -1)
{:noreply, state, 0}
def handle_cast({:job_terminated, queue, success}, state) do
dequeuers =
if success do
maybe_call_dequeuer(state.dequeuers, queue, :processed)
else
maybe_call_dequeuer(state.dequeuers, queue, :failed)
end

{:noreply, %{state | dequeuers: dequeuers}, 0}
end

def handle_info(:timeout, state) do
Expand All @@ -245,36 +252,61 @@ defmodule Exq.Manager.Server do
@doc """
Dequeue jobs and dispatch to workers
"""
def dequeue_and_dispatch(state), do: dequeue_and_dispatch(state, available_queues(state))
def dequeue_and_dispatch(state, []), do: {state, state.poll_timeout}

def dequeue_and_dispatch(state, queues) do
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)

job_results = jobs |> Enum.map(fn potential_job -> dispatch_job(state, potential_job) end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end)
def dequeue_and_dispatch(state) do
case available_queues(state) do
{[], state} ->
{state, state.poll_timeout}

{queues, state} ->
result =
rescue_timeout(:timeout, fn ->
Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)
end)

case result do
:timeout ->
{state, state.poll_timeout}

jobs ->
{state, job_results} =
Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} ->
{state, result} = dispatch_job(state, potential_job)
{state, [result | results]}
end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end
end
end

@doc """
Returns list of active queues with free workers
"""
def available_queues(state) do
Enum.filter(state.queues, fn q ->
[{_, concurrency, worker_count}] = :ets.lookup(state.work_table, q)
worker_count < concurrency
Enum.reduce(state.queues, {[], state}, fn q, {queues, state} ->
{available, dequeuers} =
Map.get_and_update!(state.dequeuers, q, fn {module, state} ->
{:ok, available, state} = module.available?(state)
{available, {module, state}}
end)

state = %{state | dequeuers: dequeuers}

if available do
{[q | queues], state}
else
{queues, state}
end
end)
end

Expand All @@ -285,14 +317,14 @@ defmodule Exq.Manager.Server do
def dispatch_job(state, potential_job) do
case potential_job do
{:ok, {:none, _queue}} ->
{:ok, :none}
{state, {:ok, :none}}

{:ok, {job, queue}} ->
dispatch_job(state, job, queue)
{:ok, :dispatch}
state = dispatch_job(state, job, queue)
{state, {:ok, :dispatch}}

{status, reason} ->
{:error, {status, reason}}
{state, {:error, {status, reason}}}
end
end

Expand All @@ -304,7 +336,6 @@ defmodule Exq.Manager.Server do
job,
state.pid,
queue,
state.work_table,
state.stats,
state.namespace,
state.node_id,
Expand All @@ -315,10 +346,10 @@ defmodule Exq.Manager.Server do
)

Exq.Worker.Server.work(worker)
update_worker_count(state.work_table, queue, 1)
%{state | dequeuers: maybe_call_dequeuer(state.dequeuers, queue, :dispatched)}
end

# Setup queues from options / configs.
# Setup dequeuers from options / configs.

# The following is done:
# * Sets up queues data structure with proper concurrency settings
Expand All @@ -327,41 +358,51 @@ defmodule Exq.Manager.Server do
# * Returns list of queues and work table
# TODO: Refactor the way queues are setup

defp setup_queues(opts) do
work_table = :ets.new(:work_table, [:set, :public])
defp add_dequeuers(dequeuers, specs) do
Enum.into(specs, dequeuers, fn {queue, {module, opts}} ->
GenServer.cast(self(), {:re_enqueue_backup, queue})
{:ok, state} = module.init(%{queue: queue}, opts)
{queue, {module, state}}
end)
end

Enum.each(opts[:concurrency], fn queue_concurrency ->
:ets.insert(work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, elem(queue_concurrency, 0)})
defp remove_dequeuers(dequeuers, queues) do
Enum.reduce(queues, dequeuers, fn queue, dequeuers ->
maybe_call_dequeuer(dequeuers, queue, :stop)
|> Map.delete(queue)
end)
end

work_table
defp maybe_call_dequeuer(dequeuers, queue, method) do
if Map.has_key?(dequeuers, queue) do
Map.update!(dequeuers, queue, fn {module, state} ->
case apply(module, method, [state]) do
{:ok, state} -> {module, state}
:ok -> {module, nil}
end
end)
else
dequeuers
end
end

defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
queue_concurrency = {queue, concurrency, 0}
:ets.insert(state.work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, queue})
updated_queues = [queue | state.queues]
%{state | queues: updated_queues}
queue_concurrency = {queue, Opts.cast_concurrency(concurrency)}

%{
state
| queues: [queue | state.queues],
dequeuers: add_dequeuers(state.dequeuers, [queue_concurrency])
}
end

defp remove_queue(state, queue) do
:ets.delete(state.work_table, queue)
updated_queues = List.delete(state.queues, queue)
%{state | queues: updated_queues}
%{state | queues: updated_queues, dequeuers: remove_dequeuers(state.dequeuers, [queue])}
end

defp remove_all_queues(state) do
true = :ets.delete_all_objects(state.work_table)
%{state | queues: []}
end

defp update_worker_count(work_table, queue, delta) do
:ets.update_counter(work_table, queue, {3, delta})
rescue
# The queue has been unsubscribed
_error in ArgumentError -> :ok
%{state | queues: [], dequeuers: remove_dequeuers(state.dequeuers, state.queues)}
end

@doc """
Expand Down
9 changes: 4 additions & 5 deletions lib/exq/middleware/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ defmodule Exq.Middleware.Manager do
end

def after_processed_work(pipeline) do
pipeline |> notify
pipeline |> notify(true)
end

def after_failed_work(pipeline) do
pipeline |> notify
pipeline |> notify(false)
end

defp notify(%Pipeline{assigns: assigns} = pipeline) do
defp notify(%Pipeline{assigns: assigns} = pipeline, success) do
Manager.job_terminated(
assigns.manager,
assigns.namespace,
assigns.queue,
assigns.job_serialized
success
)

pipeline
Expand Down
Loading