Skip to content

Commit 4340061

Browse files
authored
Merge pull request #180 from quirrel-dev/override-jobs-during-execution
overriding jobs during execution should preserve the overridden schedule
2 parents 7695192 + fe83314 commit 4340061

File tree

3 files changed

+86
-3
lines changed

3 files changed

+86
-3
lines changed

src/shared/acknowledge.lua

+9-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@ redis.call("ZREM", "processing", jobQueue .. ":" .. jobId)
1212
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "acknowledged")
1313
redis.call("PUBLISH", "acknowledged", jobQueue .. ":" .. jobId)
1414

15-
if rescheduleFor == '' then
15+
local wasOverridenDuringExecution = false
16+
if redis.call("ZSCORE", "queue", jobQueue .. ":" .. jobId) then
17+
wasOverridenDuringExecution = true
18+
elseif redis.call("ZSCORE", "blocked:" .. jobQueue, jobId) then
19+
wasOverridenDuringExecution = true
20+
end
21+
22+
if wasOverridenDuringExecution then
23+
elseif rescheduleFor == '' then
1624
redis.call("DEL", jobTableJobKey)
1725
redis.call("SREM", "queues:" .. jobQueue, jobId)
1826
else
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { expect } from "chai";
2+
import { makeWorkerEnv } from "./support";
3+
import { delay, describeAcrossBackends, makeSignal } from "../util";
4+
5+
describeAcrossBackends("Repro: Override during Execution", (backend) => {
6+
const executionStarted = makeSignal();
7+
const jobWasOverriden = makeSignal();
8+
const secondJobWasCalled = makeSignal();
9+
const env = makeWorkerEnv(backend, async (job) => {
10+
if (job.payload === "wait") {
11+
executionStarted.signal();
12+
await jobWasOverriden;
13+
}
14+
15+
if (job.payload === "enqueue-second") {
16+
await env.producer.enqueue({
17+
queue: job.queue,
18+
id: job.id,
19+
payload: "second",
20+
exclusive: true,
21+
override: true,
22+
});
23+
}
24+
25+
if (job.payload === "second") {
26+
secondJobWasCalled.signal();
27+
}
28+
29+
return false;
30+
});
31+
beforeEach(env.setup);
32+
afterEach(env.teardown);
33+
34+
describe("when job is overriden while being executed", () => {
35+
it("is executed along the new (overriden) schedule", async () => {
36+
const queue = "override-during";
37+
const id = "foo";
38+
await env.producer.enqueue({
39+
queue,
40+
id,
41+
schedule: { type: "every", meta: "100" },
42+
payload: "wait",
43+
});
44+
45+
await executionStarted;
46+
47+
const newRunAt = new Date(Date.now() + 10000000);
48+
await env.producer.enqueue({
49+
queue,
50+
id,
51+
runAt: newRunAt,
52+
override: true,
53+
payload: "",
54+
});
55+
56+
jobWasOverriden.signal();
57+
await delay(10);
58+
59+
const job = await env.producer.findById(queue, id);
60+
expect(+job.runAt).to.equal(+newRunAt);
61+
});
62+
it("doesnt stop next job from being executed (repro quirrel#739)", async () => {
63+
const queue = "doesntstopnext";
64+
const id = "foo";
65+
await env.producer.enqueue({
66+
queue,
67+
id,
68+
payload: "enqueue-second",
69+
exclusive: true,
70+
});
71+
72+
await secondJobWasCalled
73+
});
74+
});
75+
});

test/functional/support.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export function makeProducerEnv(
7272
return env;
7373
}
7474

75-
type WorkerFailPredicate = (job: Job<string>) => boolean;
75+
type WorkerFailPredicate = (job: Job<string>) => boolean | Promise<boolean>;
7676

7777
type JobListener = (job: Job<string>) => void;
7878

@@ -126,7 +126,7 @@ export function makeWorkerEnv(
126126
await delay(1);
127127
}
128128

129-
if (fail(job)) {
129+
if (await fail(job)) {
130130
await workerEnv.worker.acknowledger.reportFailure(
131131
ackDescriptor,
132132
job,

0 commit comments

Comments
 (0)