Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overriding jobs during execution should preserve the overridden schedule #180

Merged
merged 4 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/shared/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@ redis.call("ZREM", "processing", jobQueue .. ":" .. jobId)
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "acknowledged")
redis.call("PUBLISH", "acknowledged", jobQueue .. ":" .. jobId)

if rescheduleFor == '' then
local wasOverridenDuringExecution = false
if redis.call("ZSCORE", "queue", jobQueue .. ":" .. jobId) then
wasOverridenDuringExecution = true
elseif redis.call("ZSCORE", "blocked:" .. jobQueue, jobId) then
wasOverridenDuringExecution = true
end

if wasOverridenDuringExecution then
elseif rescheduleFor == '' then
redis.call("DEL", jobTableJobKey)
redis.call("SREM", "queues:" .. jobQueue, jobId)
else
Expand Down
75 changes: 75 additions & 0 deletions test/functional/repro-override-during-execution.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { expect } from "chai";
import { makeWorkerEnv } from "./support";
import { delay, describeAcrossBackends, makeSignal } from "../util";

describeAcrossBackends("Repro: Override during Execution", (backend) => {
const executionStarted = makeSignal();
const jobWasOverriden = makeSignal();
const secondJobWasCalled = makeSignal();
const env = makeWorkerEnv(backend, async (job) => {
if (job.payload === "wait") {
executionStarted.signal();
await jobWasOverriden;
}

if (job.payload === "enqueue-second") {
await env.producer.enqueue({
queue: job.queue,
id: job.id,
payload: "second",
exclusive: true,
override: true,
});
}

if (job.payload === "second") {
secondJobWasCalled.signal();
}

return false;
});
beforeEach(env.setup);
afterEach(env.teardown);

describe("when job is overriden while being executed", () => {
it("is executed along the new (overriden) schedule", async () => {
const queue = "override-during";
const id = "foo";
await env.producer.enqueue({
queue,
id,
schedule: { type: "every", meta: "100" },
payload: "wait",
});

await executionStarted;

const newRunAt = new Date(Date.now() + 10000000);
await env.producer.enqueue({
queue,
id,
runAt: newRunAt,
override: true,
payload: "",
});

jobWasOverriden.signal();
await delay(10);

const job = await env.producer.findById(queue, id);
expect(+job.runAt).to.equal(+newRunAt);
});
it("doesnt stop next job from being executed (repro quirrel#739)", async () => {
const queue = "doesntstopnext";
const id = "foo";
await env.producer.enqueue({
queue,
id,
payload: "enqueue-second",
exclusive: true,
});

await secondJobWasCalled
});
});
});
4 changes: 2 additions & 2 deletions test/functional/support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export function makeProducerEnv(
return env;
}

type WorkerFailPredicate = (job: Job<string>) => boolean;
type WorkerFailPredicate = (job: Job<string>) => boolean | Promise<boolean>;

type JobListener = (job: Job<string>) => void;

Expand Down Expand Up @@ -126,7 +126,7 @@ export function makeWorkerEnv(
await delay(1);
}

if (fail(job)) {
if (await fail(job)) {
await workerEnv.worker.acknowledger.reportFailure(
ackDescriptor,
job,
Expand Down