Skip to content

Commit

Permalink
Merge pull request #427 from akira/reliable-scheduler
Browse files Browse the repository at this point in the history
use lua script to schedule jobs
  • Loading branch information
ananthakumaran authored Aug 21, 2020
2 parents 548bd9e + e99a6ab commit ade8398
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 41 deletions.
64 changes: 23 additions & 41 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,53 +157,35 @@ defmodule Exq.Redis.JobQueue do
end

def scheduler_dequeue(redis, namespace, max_score) do
queues = schedule_queues(namespace)
commands = Enum.map(queues, &["ZRANGEBYSCORE", &1, 0, max_score])
resp = Connection.qp(redis, commands)
schedule_queues(namespace)
|> Enum.map(
&do_scheduler_dequeue(redis, namespace, &1, max_score, Config.get(:scheduler_page_size), 0)
)
|> Enum.sum()
end

defp do_scheduler_dequeue(redis, namespace, queue, max_score, limit, acc) do
case Script.eval!(redis, :scheduler_dequeue, [queue], [
limit,
max_score,
full_key(namespace, "")
]) do
{:ok, count} ->
if count == limit do
do_scheduler_dequeue(redis, namespace, queue, max_score, limit, count + acc)
else
count + acc
end

case resp do
{:error, reason} ->
[{:error, reason}]
Logger.warn(
"Error dequeueing jobs from scheduler queue #{queue} - #{Kernel.inspect(reason)}"
)

{:ok, responses} ->
queues
|> Enum.zip(responses)
|> Enum.reduce(0, fn {queue, response}, acc ->
case response do
jobs when is_list(jobs) ->
deq_count = scheduler_dequeue_requeue(jobs, redis, namespace, queue, 0)
deq_count + acc

%Redix.Error{} = reason ->
Logger.error("Redis error scheduler dequeue #{Kernel.inspect(reason)}}.")
acc
end
end)
0
end
end

def scheduler_dequeue_requeue([], _redis, _namespace, _schedule_queue, count), do: count

def scheduler_dequeue_requeue([job_serialized | t], redis, namespace, schedule_queue, count) do
resp = Connection.zrem(redis, schedule_queue, job_serialized)

count =
case resp do
{:ok, 1} ->
enqueue(redis, namespace, job_serialized)
count + 1

{:ok, _} ->
count

{:error, reason} ->
Logger.error("Redis error scheduler dequeue #{Kernel.inspect(reason)}}.")
count
end

scheduler_dequeue_requeue(t, redis, namespace, schedule_queue, count)
end

def full_key("", key), do: key
def full_key(nil, key), do: key

Expand Down
13 changes: 13 additions & 0 deletions lib/exq/redis/script.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ defmodule Exq.Redis.Script do
end

@scripts %{
scheduler_dequeue:
Prepare.script("""
local schedule_queue = KEYS[1]
local limit, max_score, namespace_prefix = tonumber(ARGV[1]), tonumber(ARGV[2]), ARGV[3]
local jobs = redis.call('ZRANGEBYSCORE', schedule_queue, 0, max_score, 'LIMIT', 0, limit)
for _, job in ipairs(jobs) do
local job_queue = cjson.decode(job)['queue']
redis.call('ZREM', schedule_queue, job)
redis.call('SADD', namespace_prefix .. 'queues', job_queue)
redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job)
end
return #jobs
"""),
mlpop_rpush:
Prepare.script("""
local from, to = KEYS[1], KEYS[2]
Expand Down
1 change: 1 addition & 0 deletions lib/exq/support/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Exq.Support.Config do
scheduler_enable: true,
concurrency: 100,
scheduler_poll_timeout: 200,
scheduler_page_size: 10,
poll_timeout: 100,
genserver_timeout: 5000,
shutdown_timeout: 5000,
Expand Down
16 changes: 16 additions & 0 deletions test/job_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,22 @@ defmodule JobQueueTest do
assert_dequeue_job(["default"], false)
end

test "scheduler_dequeue dequeues more than 10 jobs " do
now = DateTime.utc_now()

for _ <- 1..15 do
JobQueue.enqueue_at(:testredis, "test", "default", now, MyWorker, [], [])
end

assert JobQueue.scheduler_dequeue(:testredis, "test") == 15

for _ <- 1..15 do
assert_dequeue_job(["default"], true)
end

assert_dequeue_job(["default"], false)
end

test "full_key" do
assert JobQueue.full_key("exq", "k1") == "exq:k1"
assert JobQueue.full_key("", "k1") == "k1"
Expand Down

0 comments on commit ade8398

Please sign in to comment.