Skip to content

Commit 7ecddbc

Browse files
authored
Merge pull request #125 from quirrel-dev/stale-checker-test
Bugfix: Re-Enqueue repeated jobs if orphaned
2 parents df9cb2c + ad31f2c commit 7ecddbc

File tree

7 files changed

+109
-26
lines changed

7 files changed

+109
-26
lines changed

src/activity/activity.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export class Activity<ScheduleType extends string> implements Closable {
100100
options: SubscriptionOptions = {}
101101
) {
102102
this.redis = redisFactory();
103-
this.producer = new Producer<ScheduleType>(redisFactory);
103+
this.producer = new Producer<ScheduleType>(redisFactory, null as any);
104104

105105
this.redis.on("pmessage", (_pattern, channel, message) =>
106106
this.handleMessage(channel, message)

src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export default class Owl<ScheduleType extends string> {
5454
public createProducer() {
5555
return new Producer<ScheduleType>(
5656
this.redisFactory,
57+
this.scheduleMap,
5758
this.onError,
5859
this.staleCheckerConfig,
5960
this.logger

src/producer/producer.ts

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from "../encodeRedisKey";
1111
import { defineLocalCommands } from "../redis-commands";
1212
import type { Logger } from "pino";
13+
import { ScheduleMap } from "..";
1314

1415
declare module "ioredis" {
1516
interface Commands {
@@ -46,6 +47,7 @@ export class Producer<ScheduleType extends string> implements Closable {
4647

4748
constructor(
4849
redisFactory: () => Redis,
50+
scheduleMap: ScheduleMap<ScheduleType>,
4951
onError?: OnError<ScheduleType>,
5052
staleCheckerConfig?: StaleCheckerConfig,
5153
private readonly logger?: Logger
@@ -64,6 +66,7 @@ export class Producer<ScheduleType extends string> implements Closable {
6466
this.redis,
6567
this.acknowledger,
6668
this,
69+
scheduleMap,
6770
staleCheckerConfig,
6871
this.logger
6972
);

src/shared/stale-checker.ts

+37-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,27 @@
11
import type { Redis } from "ioredis";
22
import type { Logger } from "pino";
3+
import { ScheduleMap } from "..";
34
import { Closable } from "../Closable";
45
import { tenantToRedisPrefix } from "../encodeRedisKey";
56
import type { Producer } from "../producer/producer";
67
import { computeTimestampForNextRetry } from "../worker/retry";
8+
import { getNextExecutionDate } from "../worker/worker";
79
import type { Acknowledger } from "./acknowledger";
810
import { scanTenantsForProcessing } from "./scan-tenants";
911

12+
function valueAndScoreToObj(arr: (string | number)[]) {
13+
const result: { value: string; score: number }[] = [];
14+
15+
for (let i = 0; i < arr.length; i += 2) {
16+
result.push({
17+
value: String(arr[i]),
18+
score: Number(arr[i + 1]),
19+
});
20+
}
21+
22+
return result;
23+
}
24+
1025
const oneMinute = 60 * 1000;
1126

1227
export interface StaleCheckerConfig {
@@ -23,6 +38,7 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
2338
private readonly redis: Redis,
2439
private readonly acknowledger: Acknowledger<ScheduleType>,
2540
private readonly producer: Producer<ScheduleType>,
41+
private readonly scheduleMap: ScheduleMap<ScheduleType>,
2642
config: StaleCheckerConfig = {},
2743
private readonly logger?: Logger
2844
) {
@@ -53,7 +69,7 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
5369
) {
5470
const result = await this.redis
5571
.pipeline()
56-
.zrangebyscore(key, min, max)
72+
.zrangebyscore(key, min, max, "WITHSCORES")
5773
.zremrangebyscore(key, min, max)
5874
.exec();
5975

@@ -67,7 +83,7 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
6783
throw remRangeByScoreResult[0];
6884
}
6985

70-
return rangeByScoreResult[1] as string[];
86+
return valueAndScoreToObj(rangeByScoreResult[1]);
7187
}
7288

7389
private parseJobDescriptor(descriptor: string) {
@@ -103,14 +119,18 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
103119

104120
const staleJobs = await this.producer.findJobs(
105121
tenant,
106-
staleJobDescriptors.map(this.parseJobDescriptor)
122+
staleJobDescriptors.map(({ value }) =>
123+
this.parseJobDescriptor(value)
124+
)
107125
);
108126

109127
const pipeline = this.redis.pipeline();
110128

111129
const error = "Job Timed Out";
112130

113-
for (const job of staleJobs) {
131+
for (let i = 0; i < staleJobs.length; i++) {
132+
const job = staleJobs[0];
133+
const score = staleJobDescriptors[0].score;
114134
if (!job) {
115135
this.logger?.error(
116136
{ tenant },
@@ -125,6 +145,18 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
125145
job.count
126146
);
127147

148+
// TODO: duplicated logic in producer, please extract
149+
let nextExecutionDate: number | undefined = undefined;
150+
151+
if (!job.schedule?.times || job.count < job.schedule?.times) {
152+
nextExecutionDate = getNextExecutionDate(
153+
this.scheduleMap,
154+
job.schedule?.type,
155+
job.schedule?.meta ?? "",
156+
new Date(score)
157+
);
158+
}
159+
128160
this.logger?.trace(
129161
{ tenant, job },
130162
"Stale-Checker: Adding Failure report to pipeline"
@@ -136,6 +168,7 @@ export class StaleChecker<ScheduleType extends string> implements Closable {
136168
queueId: job.queue,
137169
jobId: job.id,
138170
timestampForNextRetry,
171+
nextExecutionDate,
139172
},
140173
job,
141174
error,

src/worker/worker.ts

+29-15
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,29 @@ function parseTenantFromChannel(topic: string) {
5555
return "";
5656
}
5757

58+
export function getNextExecutionDate<ScheduleType extends string>(
59+
scheduleMap: ScheduleMap<ScheduleType>,
60+
schedule_type: ScheduleType | undefined,
61+
schedule_meta: string,
62+
lastExecution: Date
63+
): number | undefined {
64+
if (!schedule_type) {
65+
return undefined;
66+
}
67+
68+
const scheduleFunc = scheduleMap[schedule_type];
69+
if (!scheduleFunc) {
70+
throw new Error(`Schedule ${schedule_type} not found.`);
71+
}
72+
73+
const result = scheduleFunc(lastExecution, schedule_meta);
74+
if (!result) {
75+
return undefined;
76+
}
77+
78+
return +result;
79+
}
80+
5881
export class Worker<ScheduleType extends string> implements Closable {
5982
private readonly redis;
6083
private readonly redisSub;
@@ -112,21 +135,12 @@ export class Worker<ScheduleType extends string> implements Closable {
112135
schedule_meta: string,
113136
lastExecution: Date
114137
): number | undefined {
115-
if (!schedule_type) {
116-
return undefined;
117-
}
118-
119-
const scheduleFunc = this.scheduleMap[schedule_type];
120-
if (!scheduleFunc) {
121-
throw new Error(`Schedule ${schedule_type} not found.`);
122-
}
123-
124-
const result = scheduleFunc(lastExecution, schedule_meta);
125-
if (!result) {
126-
return undefined;
127-
}
128-
129-
return +result;
138+
return getNextExecutionDate(
139+
this.scheduleMap,
140+
schedule_type,
141+
schedule_meta,
142+
lastExecution
143+
);
130144
}
131145

132146
private readonly distributor = new JobDistributor(

test/functional/stale-check.test.ts

+28
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ describeAcrossBackends("stale-check", (backend) => {
4747
jobId: "stalling-job",
4848
queueId: "stally-stall",
4949
timestampForNextRetry: undefined,
50+
nextExecutionDate: undefined,
5051
},
5152
"Job Timed Out",
5253
],
@@ -105,4 +106,31 @@ describeAcrossBackends("stale-check", (backend) => {
105106
await env.producer.staleChecker.check();
106107
expect(env.errors).to.deep.equal([]);
107108
});
109+
110+
describe("when a schedule jobs goes stale", () => {
111+
it("is re-scheduled", async () => {
112+
worker = await env.owl.createWorker(async (job, ack) => {
113+
// let job be stale
114+
});
115+
116+
await env.producer.enqueue({
117+
tenant: "",
118+
id: "@cron",
119+
payload: "null",
120+
queue: "cron-job",
121+
schedule: {
122+
type: "every",
123+
meta: "1000",
124+
},
125+
});
126+
127+
await delay(200);
128+
129+
await env.producer.staleChecker.check();
130+
131+
const job = await env.producer.findById("", "cron-job", "@cron");
132+
expect(job).not.to.be.null;
133+
expect(+job.runAt).to.be.greaterThan(Date.now());
134+
});
135+
});
108136
});

test/util.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ export function describeAcrossBackends(
77
runTests: (backend: Backend) => void
88
) {
99
describe(topic, () => {
10-
describe("Redis", () => {
11-
runTests("Redis");
12-
});
10+
if (process.env.TEST_BACKEND !== "In-Memory") {
11+
describe("Redis", () => {
12+
runTests("Redis");
13+
});
14+
}
1315

14-
describe("In-Memory", () => {
15-
runTests("In-Memory");
16-
});
16+
if (process.env.TEST_BACKEND !== "Redis") {
17+
describe("In-Memory", () => {
18+
runTests("In-Memory");
19+
});
20+
}
1721
});
1822
}
1923

0 commit comments

Comments
 (0)