diff --git a/lib/exq/dequeue/behaviour.ex b/lib/exq/dequeue/behaviour.ex new file mode 100644 index 00000000..cd5c7446 --- /dev/null +++ b/lib/exq/dequeue/behaviour.ex @@ -0,0 +1,41 @@ +defmodule Exq.Dequeue.Behaviour do + @moduledoc """ + Custom concurreny or rate limiting at a queue level can be achieved + by implementing the Dequeue behaviour + + The following config can be used to customize dequeue behaviour for a queue + + config :exq, + queues: [{"default", {RateLimiter, options}}] + + RateLimiter module should implement `Exq.Dequeue.Behaviour`. The + options supplied here would be passed as the second argument to the + `c:init/2` function. + + ### Life cycle + + `c:init/2` will be invoked on initialization. The first argument will contain info + like queue and the second argument is user configurable. + + `c:available?/1` will be invoked before each poll. If the + returned value contains `false` as the second element of the tuple, + the queue will not polled + + `c:dispatched/1` will be invoked once a job is dispatched to the worker + + `c:processed/1` will be invoked if a job completed successfully + + `c:failed/1` will be invoked if a job failed + + `c:stop/1` will be invoked when a queue is unsubscribed or before the + node terminates. Note: there is no guarantee this will be invoked if + the node terminates abruptly + """ + + @callback init(info :: %{queue: String.t()}, args :: term) :: {:ok, term} + @callback stop(state :: term) :: :ok + @callback available?(state :: term) :: {:ok, boolean, term} + @callback dispatched(state :: term) :: {:ok, term} + @callback processed(state :: term) :: {:ok, term} + @callback failed(state :: term) :: {:ok, term} +end diff --git a/lib/exq/dequeue/local.ex b/lib/exq/dequeue/local.ex new file mode 100644 index 00000000..f82c2eb5 --- /dev/null +++ b/lib/exq/dequeue/local.ex @@ -0,0 +1,29 @@ +defmodule Exq.Dequeue.Local do + @behaviour Exq.Dequeue.Behaviour + + defmodule State do + @moduledoc false + + defstruct max: nil, current: 0 + end + + @impl true + def init(_, options) do + {:ok, %State{max: Keyword.fetch!(options, :concurrency)}} + end + + @impl true + def stop(_), do: :ok + + @impl true + def available?(state), do: {:ok, state.current < state.max, state} + + @impl true + def dispatched(state), do: {:ok, %{state | current: state.current + 1}} + + @impl true + def processed(state), do: {:ok, %{state | current: state.current - 1}} + + @impl true + def failed(state), do: {:ok, %{state | current: state.current - 1}} +end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index d2c49cbf..8cdfaad2 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -111,6 +111,7 @@ defmodule Exq.Manager.Server do require Logger use GenServer alias Exq.Support.Config + alias Exq.Support.Opts alias Exq.Redis.JobQueue @backoff_mult 10 @@ -122,7 +123,7 @@ defmodule Exq.Manager.Server do pid: nil, node_id: nil, namespace: nil, - work_table: nil, + dequeuers: nil, queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, @@ -135,8 +136,8 @@ defmodule Exq.Manager.Server do GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name])) end - def job_terminated(exq, namespace, queue, job_serialized) do - GenServer.cast(exq, {:job_terminated, namespace, queue, job_serialized}) + def job_terminated(exq, queue, success) do + GenServer.cast(exq, {:job_terminated, queue, success}) :ok end @@ -151,11 +152,11 @@ defmodule Exq.Manager.Server do # Cleanup stale stats GenServer.cast(self(), :cleanup_host_stats) - # Setup queues - work_table = setup_queues(opts) + # Setup dequeues + dequeuers = add_dequeuers(%{}, opts[:concurrency]) state = %State{ - work_table: work_table, + dequeuers: dequeuers, redis: opts[:redis], stats: opts[:stats], workers_sup: opts[:workers_sup], @@ -221,9 +222,15 @@ defmodule Exq.Manager.Server do {:noreply, state, 0} end - def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do - update_worker_count(state.work_table, queue, -1) - {:noreply, state, 0} + def handle_cast({:job_terminated, queue, success}, state) do + dequeuers = + if success do + maybe_call_dequeuer(state.dequeuers, queue, :processed) + else + maybe_call_dequeuer(state.dequeuers, queue, :failed) + end + + {:noreply, %{state | dequeuers: dequeuers}, 0} end def handle_info(:timeout, state) do @@ -245,36 +252,61 @@ defmodule Exq.Manager.Server do @doc """ Dequeue jobs and dispatch to workers """ - def dequeue_and_dispatch(state), do: dequeue_and_dispatch(state, available_queues(state)) - def dequeue_and_dispatch(state, []), do: {state, state.poll_timeout} - - def dequeue_and_dispatch(state, queues) do - rescue_timeout({state, state.poll_timeout}, fn -> - jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues) - - job_results = jobs |> Enum.map(fn potential_job -> dispatch_job(state, potential_job) end) - - cond do - Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) -> - {state, 0} - - Enum.any?(job_results, fn status -> elem(status, 0) == :error end) -> - Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...") - {state, state.poll_timeout * @backoff_mult} - - true -> - {state, state.poll_timeout} - end - end) + def dequeue_and_dispatch(state) do + case available_queues(state) do + {[], state} -> + {state, state.poll_timeout} + + {queues, state} -> + result = + rescue_timeout(:timeout, fn -> + Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues) + end) + + case result do + :timeout -> + {state, state.poll_timeout} + + jobs -> + {state, job_results} = + Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} -> + {state, result} = dispatch_job(state, potential_job) + {state, [result | results]} + end) + + cond do + Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) -> + {state, 0} + + Enum.any?(job_results, fn status -> elem(status, 0) == :error end) -> + Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...") + {state, state.poll_timeout * @backoff_mult} + + true -> + {state, state.poll_timeout} + end + end + end end @doc """ Returns list of active queues with free workers """ def available_queues(state) do - Enum.filter(state.queues, fn q -> - [{_, concurrency, worker_count}] = :ets.lookup(state.work_table, q) - worker_count < concurrency + Enum.reduce(state.queues, {[], state}, fn q, {queues, state} -> + {available, dequeuers} = + Map.get_and_update!(state.dequeuers, q, fn {module, state} -> + {:ok, available, state} = module.available?(state) + {available, {module, state}} + end) + + state = %{state | dequeuers: dequeuers} + + if available do + {[q | queues], state} + else + {queues, state} + end end) end @@ -285,14 +317,14 @@ defmodule Exq.Manager.Server do def dispatch_job(state, potential_job) do case potential_job do {:ok, {:none, _queue}} -> - {:ok, :none} + {state, {:ok, :none}} {:ok, {job, queue}} -> - dispatch_job(state, job, queue) - {:ok, :dispatch} + state = dispatch_job(state, job, queue) + {state, {:ok, :dispatch}} {status, reason} -> - {:error, {status, reason}} + {state, {:error, {status, reason}}} end end @@ -304,7 +336,6 @@ defmodule Exq.Manager.Server do job, state.pid, queue, - state.work_table, state.stats, state.namespace, state.node_id, @@ -315,10 +346,10 @@ defmodule Exq.Manager.Server do ) Exq.Worker.Server.work(worker) - update_worker_count(state.work_table, queue, 1) + %{state | dequeuers: maybe_call_dequeuer(state.dequeuers, queue, :dispatched)} end - # Setup queues from options / configs. + # Setup dequeuers from options / configs. # The following is done: # * Sets up queues data structure with proper concurrency settings @@ -327,41 +358,51 @@ defmodule Exq.Manager.Server do # * Returns list of queues and work table # TODO: Refactor the way queues are setup - defp setup_queues(opts) do - work_table = :ets.new(:work_table, [:set, :public]) + defp add_dequeuers(dequeuers, specs) do + Enum.into(specs, dequeuers, fn {queue, {module, opts}} -> + GenServer.cast(self(), {:re_enqueue_backup, queue}) + {:ok, state} = module.init(%{queue: queue}, opts) + {queue, {module, state}} + end) + end - Enum.each(opts[:concurrency], fn queue_concurrency -> - :ets.insert(work_table, queue_concurrency) - GenServer.cast(self(), {:re_enqueue_backup, elem(queue_concurrency, 0)}) + defp remove_dequeuers(dequeuers, queues) do + Enum.reduce(queues, dequeuers, fn queue, dequeuers -> + maybe_call_dequeuer(dequeuers, queue, :stop) + |> Map.delete(queue) end) + end - work_table + defp maybe_call_dequeuer(dequeuers, queue, method) do + if Map.has_key?(dequeuers, queue) do + Map.update!(dequeuers, queue, fn {module, state} -> + case apply(module, method, [state]) do + {:ok, state} -> {module, state} + :ok -> {module, nil} + end + end) + else + dequeuers + end end defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do - queue_concurrency = {queue, concurrency, 0} - :ets.insert(state.work_table, queue_concurrency) - GenServer.cast(self(), {:re_enqueue_backup, queue}) - updated_queues = [queue | state.queues] - %{state | queues: updated_queues} + queue_concurrency = {queue, Opts.cast_concurrency(concurrency)} + + %{ + state + | queues: [queue | state.queues], + dequeuers: add_dequeuers(state.dequeuers, [queue_concurrency]) + } end defp remove_queue(state, queue) do - :ets.delete(state.work_table, queue) updated_queues = List.delete(state.queues, queue) - %{state | queues: updated_queues} + %{state | queues: updated_queues, dequeuers: remove_dequeuers(state.dequeuers, [queue])} end defp remove_all_queues(state) do - true = :ets.delete_all_objects(state.work_table) - %{state | queues: []} - end - - defp update_worker_count(work_table, queue, delta) do - :ets.update_counter(work_table, queue, {3, delta}) - rescue - # The queue has been unsubscribed - _error in ArgumentError -> :ok + %{state | queues: [], dequeuers: remove_dequeuers(state.dequeuers, state.queues)} end @doc """ diff --git a/lib/exq/middleware/manager.ex b/lib/exq/middleware/manager.ex index 4ab02d59..153ac726 100644 --- a/lib/exq/middleware/manager.ex +++ b/lib/exq/middleware/manager.ex @@ -9,19 +9,18 @@ defmodule Exq.Middleware.Manager do end def after_processed_work(pipeline) do - pipeline |> notify + pipeline |> notify(true) end def after_failed_work(pipeline) do - pipeline |> notify + pipeline |> notify(false) end - defp notify(%Pipeline{assigns: assigns} = pipeline) do + defp notify(%Pipeline{assigns: assigns} = pipeline, success) do Manager.job_terminated( assigns.manager, - assigns.namespace, assigns.queue, - assigns.job_serialized + success ) pipeline diff --git a/lib/exq/support/opts.ex b/lib/exq/support/opts.ex index 1e7a1d0a..65bf0fa7 100644 --- a/lib/exq/support/opts.ex +++ b/lib/exq/support/opts.ex @@ -83,7 +83,7 @@ defmodule Exq.Support.Opts do metadata = Exq.Worker.Metadata.server_name(opts[:name]) queue_configs = opts[:queues] || Config.get(:queues) - per_queue_concurrency = opts[:concurrency] || get_config_concurrency() + per_queue_concurrency = cast_concurrency(opts[:concurrency] || Config.get(:concurrency)) queues = get_queues(queue_configs) concurrency = get_concurrency(queue_configs, per_queue_concurrency) default_middleware = Config.get(:middleware) @@ -137,28 +137,25 @@ defmodule Exq.Support.Opts do end) end - defp get_config_concurrency() do - cast_concurrency(Config.get(:concurrency)) - end - defp get_concurrency(queue_configs, per_queue_concurrency) do Enum.map(queue_configs, fn queue_config -> case queue_config do - {queue, concurrency} -> {queue, cast_concurrency(concurrency), 0} - queue -> {queue, per_queue_concurrency, 0} + {queue, concurrency} -> {queue, cast_concurrency(concurrency)} + queue -> {queue, per_queue_concurrency} end end) end - defp cast_concurrency(:infinity), do: :infinity - defp cast_concurrency(:infinite), do: :infinity - defp cast_concurrency(x) when is_integer(x), do: x + def cast_concurrency({module, options}), do: {module, options} + def cast_concurrency(:infinity), do: {Exq.Dequeue.Local, [concurrency: :infinity]} + def cast_concurrency(:infinite), do: {Exq.Dequeue.Local, [concurrency: :infinity]} + def cast_concurrency(x) when is_integer(x), do: {Exq.Dequeue.Local, [concurrency: x]} - defp cast_concurrency(x) when is_binary(x) do + def cast_concurrency(x) when is_binary(x) do case x |> String.trim() |> String.downcase() do - "infinity" -> :infinity - "infinite" -> :infinity - x -> Coercion.to_integer(x) + "infinity" -> {Exq.Dequeue.Local, [concurrency: :infinity]} + "infinite" -> {Exq.Dequeue.Local, [concurrency: :infinity]} + x -> {Exq.Dequeue.Local, [concurrency: Coercion.to_integer(x)]} end end end diff --git a/lib/exq/worker/server.ex b/lib/exq/worker/server.ex index c93cfea9..187814e7 100644 --- a/lib/exq/worker/server.ex +++ b/lib/exq/worker/server.ex @@ -9,7 +9,6 @@ defmodule Exq.Worker.Server do * `job_serialized` - Full JSON payload of the Job. * `manager` - Manager process pid. * `queue` - The queue the job came from. - * `:work_table` - In process work ets table (TODO: Remove). * `stats` - Stats process pid. * `namespace` - Redis namespace * `host` - Host name @@ -27,7 +26,6 @@ defmodule Exq.Worker.Server do manager: nil, queue: nil, namespace: nil, - work_table: nil, stats: nil, host: nil, redis: nil, @@ -41,7 +39,6 @@ defmodule Exq.Worker.Server do job_serialized, manager, queue, - work_table, stats, namespace, host, @@ -51,8 +48,7 @@ defmodule Exq.Worker.Server do ) do GenServer.start_link( __MODULE__, - {job_serialized, manager, queue, work_table, stats, namespace, host, redis, middleware, - metadata}, + {job_serialized, manager, queue, stats, namespace, host, redis, middleware, metadata}, [] ) end @@ -68,17 +64,13 @@ defmodule Exq.Worker.Server do ## gen server callbacks ## =========================================================== - def init( - {job_serialized, manager, queue, work_table, stats, namespace, host, redis, middleware, - metadata} - ) do + def init({job_serialized, manager, queue, stats, namespace, host, redis, middleware, metadata}) do { :ok, %State{ job_serialized: job_serialized, manager: manager, queue: queue, - work_table: work_table, stats: stats, namespace: namespace, host: host, diff --git a/test/config_test.exs b/test/config_test.exs index 2c4b6be8..32be085a 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -167,7 +167,7 @@ defmodule Exq.ConfigTest do assert metadata == Exq.Worker.Metadata assert queues == ["default"] assert redis == Exq.Redis.Client - assert concurrency == [{"default", 100, 0}] + assert concurrency == [{"default", {Exq.Dequeue.Local, [concurrency: 100]}}] assert middleware == Exq.Middleware.Server assert default_middleware == [ @@ -184,18 +184,34 @@ defmodule Exq.ConfigTest do {Redix, [_redis_opts], server_opts} = Exq.Support.Opts.redis_worker_opts(mode: :default) assert server_opts[:queues] == ["default", "test1"] - assert server_opts[:concurrency] == [{"default", 1000, 0}, {"test1", 2000, 0}] + + assert server_opts[:concurrency] == [ + {"default", {Exq.Dequeue.Local, [concurrency: 1000]}}, + {"test1", {Exq.Dequeue.Local, [concurrency: 2000]}} + ] Mix.Config.persist( - exq: [queues: [{"default", "1000"}, {"test1", "infinite"}, {"test2", "infinity"}]] + exq: [ + queues: [ + {"default", "1000"}, + {"test1", "infinite"}, + {"test2", {External.BucketLimiter, %{size: 60, limit: 5}}} + ] + ] ) {Redix, [_redis_opts], server_opts} = Exq.Support.Opts.redis_worker_opts(mode: :default) assert server_opts[:concurrency] == [ - {"default", 1000, 0}, - {"test1", :infinity, 0}, - {"test2", :infinity, 0} + {"default", {Exq.Dequeue.Local, [concurrency: 1000]}}, + { + "test1", + {Exq.Dequeue.Local, [concurrency: :infinity]} + }, + { + "test2", + {External.BucketLimiter, %{size: 60, limit: 5}} + } ] end @@ -233,7 +249,7 @@ defmodule Exq.ConfigTest do {Redix, [_redis_opts], server_opts} = Exq.Support.Opts.redis_worker_opts(mode: :default) assert server_opts[:namespace] == "test" - assert server_opts[:concurrency] == [{"default", 333, 0}] + assert server_opts[:concurrency] == [{"default", {Exq.Dequeue.Local, [concurrency: 333]}}] assert server_opts[:poll_timeout] == 17 assert server_opts[:scheduler_poll_timeout] == 123 assert server_opts[:scheduler_enable] == true @@ -251,6 +267,8 @@ defmodule Exq.ConfigTest do {Redix, [_redis_opts], server_opts} = Exq.Support.Opts.redis_worker_opts(mode: :default) - assert server_opts[:concurrency] == [{"default", :infinity, 0}] + assert server_opts[:concurrency] == [ + {"default", {Exq.Dequeue.Local, [concurrency: :infinity]}} + ] end end diff --git a/test/middleware_test.exs b/test/middleware_test.exs index d41b4ec9..efb82a6f 100644 --- a/test/middleware_test.exs +++ b/test/middleware_test.exs @@ -127,7 +127,6 @@ defmodule MiddlewareTest do job = "{ \"queue\": \"default\", \"class\": \"#{class}\", \"args\": #{args}, \"jid\": \"123\" }" - work_table = :ets.new(:work_table, [:set, :public]) {:ok, stub_server} = GenServer.start_link(MiddlewareTest.StubServer, []) {:ok, metadata} = Exq.Worker.Metadata.start_link(%{}) @@ -136,7 +135,6 @@ defmodule MiddlewareTest do job, stub_server, "default", - work_table, stub_server, "exq", "localhost", diff --git a/test/worker_test.exs b/test/worker_test.exs index 8a863888..1ea48520 100644 --- a/test/worker_test.exs +++ b/test/worker_test.exs @@ -130,7 +130,7 @@ defmodule WorkerTest do {:keep_state, data} end - def connected(:cast, {:job_terminated, _, _, _}, data) do + def connected(:cast, {:job_terminated, _queue, _success}, data) do send(:workertest, :job_terminated) {:keep_state, data} end @@ -159,8 +159,6 @@ defmodule WorkerTest do Process.register(self(), :workertest) job = "{ \"queue\": \"default\", \"class\": \"#{class}\", \"args\": #{args} }" - work_table = :ets.new(:work_table, [:set, :public]) - {:ok, stub_server} = start_supervised(%{ id: WorkerTest.MockServer, @@ -198,7 +196,6 @@ defmodule WorkerTest do job, stub_server, "default", - work_table, mock_stats_server, "exq", "localhost",