Skip to content

Commit

Permalink
feat: automatically trim events
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 29, 2019
1 parent b7da7c4 commit 279bbba
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 266 deletions.
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export class Scripts {
queueKeys.wait,
queueKeys.priority,
queueKeys.events,
queueKeys.meta,
];

let remove;
Expand Down
89 changes: 47 additions & 42 deletions src/commands/addJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,60 +42,65 @@ local rcall = redis.call
local jobCounter = rcall("INCR", KEYS[4])

if ARGV[2] == "" then
jobId = jobCounter
jobIdKey = ARGV[1] .. jobId
jobId = jobCounter
jobIdKey = ARGV[1] .. jobId
else
jobId = ARGV[2]
jobIdKey = ARGV[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
return jobId .. "" -- convert to string
end
jobId = ARGV[2]
jobIdKey = ARGV[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
return jobId .. "" -- convert to string
end
end

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5],
"timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])

-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
if(delayedTimestamp ~= 0) then
local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], timestamp, jobId)
rcall("XADD", KEYS[7], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp);
rcall("XADD", KEYS[8], "*", "nextTimestamp", delayedTimestamp);
if (delayedTimestamp ~= 0) then
local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], timestamp, jobId)
rcall("XADD", KEYS[7], "*", "event", "delayed", "jobId", jobId, "delay",
delayedTimestamp)
rcall("XADD", KEYS[8], "*", "nextTimestamp", delayedTimestamp)
else
local target

-- Whe check for the meta.paused key to decide if we are paused or not
-- (since an empty list and !EXISTS are not really the same)
local paused
if rcall("HEXISTS", KEYS[3], "paused") ~= 1 then
target = KEYS[1]
paused = false
else
target = KEYS[2]
paused = true
end
local target

-- Standard or priority add
local priority = tonumber(ARGV[9])
if priority == 0 then
-- LIFO or FIFO
rcall(ARGV[10], target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[6], priority, jobId)
local count = rcall("ZCOUNT", KEYS[6], 0, priority)
-- Whe check for the meta.paused key to decide if we are paused or not
-- (since an empty list and !EXISTS are not really the same)
local paused
if rcall("HEXISTS", KEYS[3], "paused") ~= 1 then
target = KEYS[1]
paused = false
else
target = KEYS[2]
paused = true
end

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count-1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
-- Standard or priority add
local priority = tonumber(ARGV[9])
if priority == 0 then
-- LIFO or FIFO
rcall(ARGV[10], target, jobId)
else
rcall("RPUSH", target, jobId)
-- Priority add
rcall("ZADD", KEYS[6], priority, jobId)
local count = rcall("ZCOUNT", KEYS[6], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
end
end
-- Emit waiting event
rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId);
-- Emit waiting event
rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId)
end

local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents")
rcall("XTRIM", KEYS[7], "MAXLEN", "~", maxEvents)

return jobId .. "" -- convert to string
102 changes: 52 additions & 50 deletions src/commands/moveStalledJobsToWait-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,70 +20,72 @@
Events:
'stalled' with stalled job id.
]]

local rcall = redis.call
]] local rcall = redis.call

-- Check if we need to check for stalled jobs now.
if rcall("EXISTS", KEYS[5]) == 1 then
return {{}, {}}
end
if rcall("EXISTS", KEYS[5]) == 1 then return {{}, {}} end

rcall("SET", KEYS[5], ARGV[3], "PX", ARGV[4])

-- Move all stalled jobs to wait
local stalling = rcall('SMEMBERS', KEYS[1])
local stalled = {}
local failed = {}
if(#stalling > 0) then

local dst
-- wait or paused destination
if rcall("HEXISTS", KEYS[6], "paused") ~= 1 then
dst = KEYS[2]
else
dst = KEYS[7]
end

rcall('DEL', KEYS[1])

local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])

-- Remove from active list
for i, jobId in ipairs(stalling) do
local jobKey = ARGV[2] .. jobId

-- Check that the lock is also missing, then we can handle this job as really stalled.
if(rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Remove from the active queue.
local removed = rcall("LREM", KEYS[3], 1, jobId)

if(removed > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
rcall("XADD", KEYS[8], "*", "event", "failed", "jobId", jobId, 'prev', 'active');
table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", dst, jobId)
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId, 'prev', 'active');

-- Emit the stalled event
rcall("XADD", KEYS[8], "*", "event", "stalled", "jobId", jobId);
table.insert(stalled, jobId)
if (#stalling > 0) then

local dst
-- wait or paused destination
if rcall("HEXISTS", KEYS[6], "paused") ~= 1 then
dst = KEYS[2]
else
dst = KEYS[7]
end

rcall('DEL', KEYS[1])

local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])

-- Remove from active list
for i, jobId in ipairs(stalling) do
local jobKey = ARGV[2] .. jobId

-- Check that the lock is also missing, then we can handle this job as really stalled.
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Remove from the active queue.
local removed = rcall("LREM", KEYS[3], 1, jobId)

if (removed > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter",
1)
if (stalledCount > MAX_STALLED_JOB_COUNT) then
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason",
"job stalled more than allowable limit")
rcall("XADD", KEYS[8], "*", "event", "failed", "jobId",
jobId, 'prev', 'active')
table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", dst, jobId)
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId",
jobId, 'prev', 'active')

-- Emit the stalled event
rcall("XADD", KEYS[8], "*", "event", "stalled", "jobId",
jobId)
table.insert(stalled, jobId)
end
end
end
end
end
end
end

-- Mark potentially stalled jobs
local active = rcall('LRANGE', KEYS[3], 0, -1)
if(#active > 0) then
rcall('SADD', KEYS[1], unpack(active))
end
if (#active > 0) then rcall('SADD', KEYS[1], unpack(active)) end

local maxEvents = rcall("HGET", KEYS[6], "opts.maxLenEvents")
rcall("XTRIM", KEYS[8], "MAXLEN", "~", maxEvents)

return {failed, stalled}
87 changes: 0 additions & 87 deletions src/commands/moveToFinished-6.lua

This file was deleted.

Loading

0 comments on commit 279bbba

Please sign in to comment.