Skip to content

Commit

Permalink
Merge pull request #421 from akira/dequeue_controller
Browse files Browse the repository at this point in the history
Add ability to control the dequeue
  • Loading branch information
ananthakumaran authored Aug 21, 2020
2 parents ade8398 + 7058fe5 commit 92788e7
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 105 deletions.
41 changes: 41 additions & 0 deletions lib/exq/dequeue/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Exq.Dequeue.Behaviour do
@moduledoc """
Custom concurreny or rate limiting at a queue level can be achieved
by implementing the Dequeue behaviour
The following config can be used to customize dequeue behaviour for a queue
config :exq,
queues: [{"default", {RateLimiter, options}}]
RateLimiter module should implement `Exq.Dequeue.Behaviour`. The
options supplied here would be passed as the second argument to the
`c:init/2` function.
### Life cycle
`c:init/2` will be invoked on initialization. The first argument will contain info
like queue and the second argument is user configurable.
`c:available?/1` will be invoked before each poll. If the
returned value contains `false` as the second element of the tuple,
the queue will not polled
`c:dispatched/1` will be invoked once a job is dispatched to the worker
`c:processed/1` will be invoked if a job completed successfully
`c:failed/1` will be invoked if a job failed
`c:stop/1` will be invoked when a queue is unsubscribed or before the
node terminates. Note: there is no guarantee this will be invoked if
the node terminates abruptly
"""

@callback init(info :: %{queue: String.t()}, args :: term) :: {:ok, term}
@callback stop(state :: term) :: :ok
@callback available?(state :: term) :: {:ok, boolean, term}
@callback dispatched(state :: term) :: {:ok, term}
@callback processed(state :: term) :: {:ok, term}
@callback failed(state :: term) :: {:ok, term}
end
29 changes: 29 additions & 0 deletions lib/exq/dequeue/local.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Exq.Dequeue.Local do
@behaviour Exq.Dequeue.Behaviour

defmodule State do
@moduledoc false

defstruct max: nil, current: 0
end

@impl true
def init(_, options) do
{:ok, %State{max: Keyword.fetch!(options, :concurrency)}}
end

@impl true
def stop(_), do: :ok

@impl true
def available?(state), do: {:ok, state.current < state.max, state}

@impl true
def dispatched(state), do: {:ok, %{state | current: state.current + 1}}

@impl true
def processed(state), do: {:ok, %{state | current: state.current - 1}}

@impl true
def failed(state), do: {:ok, %{state | current: state.current - 1}}
end
165 changes: 103 additions & 62 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ defmodule Exq.Manager.Server do
require Logger
use GenServer
alias Exq.Support.Config
alias Exq.Support.Opts
alias Exq.Redis.JobQueue

@backoff_mult 10
Expand All @@ -122,7 +123,7 @@ defmodule Exq.Manager.Server do
pid: nil,
node_id: nil,
namespace: nil,
work_table: nil,
dequeuers: nil,
queues: nil,
poll_timeout: nil,
scheduler_poll_timeout: nil,
Expand All @@ -135,8 +136,8 @@ defmodule Exq.Manager.Server do
GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
end

def job_terminated(exq, namespace, queue, job_serialized) do
GenServer.cast(exq, {:job_terminated, namespace, queue, job_serialized})
def job_terminated(exq, queue, success) do
GenServer.cast(exq, {:job_terminated, queue, success})
:ok
end

Expand All @@ -151,11 +152,11 @@ defmodule Exq.Manager.Server do
# Cleanup stale stats
GenServer.cast(self(), :cleanup_host_stats)

# Setup queues
work_table = setup_queues(opts)
# Setup dequeues
dequeuers = add_dequeuers(%{}, opts[:concurrency])

state = %State{
work_table: work_table,
dequeuers: dequeuers,
redis: opts[:redis],
stats: opts[:stats],
workers_sup: opts[:workers_sup],
Expand Down Expand Up @@ -221,9 +222,15 @@ defmodule Exq.Manager.Server do
{:noreply, state, 0}
end

def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do
update_worker_count(state.work_table, queue, -1)
{:noreply, state, 0}
def handle_cast({:job_terminated, queue, success}, state) do
dequeuers =
if success do
maybe_call_dequeuer(state.dequeuers, queue, :processed)
else
maybe_call_dequeuer(state.dequeuers, queue, :failed)
end

{:noreply, %{state | dequeuers: dequeuers}, 0}
end

def handle_info(:timeout, state) do
Expand All @@ -245,36 +252,61 @@ defmodule Exq.Manager.Server do
@doc """
Dequeue jobs and dispatch to workers
"""
def dequeue_and_dispatch(state), do: dequeue_and_dispatch(state, available_queues(state))
def dequeue_and_dispatch(state, []), do: {state, state.poll_timeout}

def dequeue_and_dispatch(state, queues) do
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)

job_results = jobs |> Enum.map(fn potential_job -> dispatch_job(state, potential_job) end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end)
def dequeue_and_dispatch(state) do
case available_queues(state) do
{[], state} ->
{state, state.poll_timeout}

{queues, state} ->
result =
rescue_timeout(:timeout, fn ->
Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)
end)

case result do
:timeout ->
{state, state.poll_timeout}

jobs ->
{state, job_results} =
Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} ->
{state, result} = dispatch_job(state, potential_job)
{state, [result | results]}
end)

cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}

Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}

true ->
{state, state.poll_timeout}
end
end
end
end

@doc """
Returns list of active queues with free workers
"""
def available_queues(state) do
Enum.filter(state.queues, fn q ->
[{_, concurrency, worker_count}] = :ets.lookup(state.work_table, q)
worker_count < concurrency
Enum.reduce(state.queues, {[], state}, fn q, {queues, state} ->
{available, dequeuers} =
Map.get_and_update!(state.dequeuers, q, fn {module, state} ->
{:ok, available, state} = module.available?(state)
{available, {module, state}}
end)

state = %{state | dequeuers: dequeuers}

if available do
{[q | queues], state}
else
{queues, state}
end
end)
end

Expand All @@ -285,14 +317,14 @@ defmodule Exq.Manager.Server do
def dispatch_job(state, potential_job) do
case potential_job do
{:ok, {:none, _queue}} ->
{:ok, :none}
{state, {:ok, :none}}

{:ok, {job, queue}} ->
dispatch_job(state, job, queue)
{:ok, :dispatch}
state = dispatch_job(state, job, queue)
{state, {:ok, :dispatch}}

{status, reason} ->
{:error, {status, reason}}
{state, {:error, {status, reason}}}
end
end

Expand All @@ -304,7 +336,6 @@ defmodule Exq.Manager.Server do
job,
state.pid,
queue,
state.work_table,
state.stats,
state.namespace,
state.node_id,
Expand All @@ -315,10 +346,10 @@ defmodule Exq.Manager.Server do
)

Exq.Worker.Server.work(worker)
update_worker_count(state.work_table, queue, 1)
%{state | dequeuers: maybe_call_dequeuer(state.dequeuers, queue, :dispatched)}
end

# Setup queues from options / configs.
# Setup dequeuers from options / configs.

# The following is done:
# * Sets up queues data structure with proper concurrency settings
Expand All @@ -327,41 +358,51 @@ defmodule Exq.Manager.Server do
# * Returns list of queues and work table
# TODO: Refactor the way queues are setup

defp setup_queues(opts) do
work_table = :ets.new(:work_table, [:set, :public])
defp add_dequeuers(dequeuers, specs) do
Enum.into(specs, dequeuers, fn {queue, {module, opts}} ->
GenServer.cast(self(), {:re_enqueue_backup, queue})
{:ok, state} = module.init(%{queue: queue}, opts)
{queue, {module, state}}
end)
end

Enum.each(opts[:concurrency], fn queue_concurrency ->
:ets.insert(work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, elem(queue_concurrency, 0)})
defp remove_dequeuers(dequeuers, queues) do
Enum.reduce(queues, dequeuers, fn queue, dequeuers ->
maybe_call_dequeuer(dequeuers, queue, :stop)
|> Map.delete(queue)
end)
end

work_table
defp maybe_call_dequeuer(dequeuers, queue, method) do
if Map.has_key?(dequeuers, queue) do
Map.update!(dequeuers, queue, fn {module, state} ->
case apply(module, method, [state]) do
{:ok, state} -> {module, state}
:ok -> {module, nil}
end
end)
else
dequeuers
end
end

defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
queue_concurrency = {queue, concurrency, 0}
:ets.insert(state.work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, queue})
updated_queues = [queue | state.queues]
%{state | queues: updated_queues}
queue_concurrency = {queue, Opts.cast_concurrency(concurrency)}

%{
state
| queues: [queue | state.queues],
dequeuers: add_dequeuers(state.dequeuers, [queue_concurrency])
}
end

defp remove_queue(state, queue) do
:ets.delete(state.work_table, queue)
updated_queues = List.delete(state.queues, queue)
%{state | queues: updated_queues}
%{state | queues: updated_queues, dequeuers: remove_dequeuers(state.dequeuers, [queue])}
end

defp remove_all_queues(state) do
true = :ets.delete_all_objects(state.work_table)
%{state | queues: []}
end

defp update_worker_count(work_table, queue, delta) do
:ets.update_counter(work_table, queue, {3, delta})
rescue
# The queue has been unsubscribed
_error in ArgumentError -> :ok
%{state | queues: [], dequeuers: remove_dequeuers(state.dequeuers, state.queues)}
end

@doc """
Expand Down
9 changes: 4 additions & 5 deletions lib/exq/middleware/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ defmodule Exq.Middleware.Manager do
end

def after_processed_work(pipeline) do
pipeline |> notify
pipeline |> notify(true)
end

def after_failed_work(pipeline) do
pipeline |> notify
pipeline |> notify(false)
end

defp notify(%Pipeline{assigns: assigns} = pipeline) do
defp notify(%Pipeline{assigns: assigns} = pipeline, success) do
Manager.job_terminated(
assigns.manager,
assigns.namespace,
assigns.queue,
assigns.job_serialized
success
)

pipeline
Expand Down
Loading

0 comments on commit 92788e7

Please sign in to comment.