diff --git a/packages/queue/db.ts b/packages/queue/db.ts new file mode 100644 index 00000000..f1412fef --- /dev/null +++ b/packages/queue/db.ts @@ -0,0 +1,19 @@ +import path from "node:path"; +import Database from "better-sqlite3"; +import { BetterSQLite3Database, drizzle } from "drizzle-orm/better-sqlite3"; +import { migrate } from "drizzle-orm/better-sqlite3/migrator"; + +import * as schema from "./schema"; + +export function buildDBClient(dbPath: string, runMigrations = false) { + const sqlite = new Database(dbPath); + const db = drizzle(sqlite, { schema }); + if (runMigrations) { + migrateDB(db); + } + return db; +} + +export function migrateDB(db: BetterSQLite3Database) { + migrate(db, { migrationsFolder: path.join(__dirname, "drizzle") }); +} diff --git a/packages/queue/drizzle.config.ts b/packages/queue/drizzle.config.ts new file mode 100644 index 00000000..6ef01d1b --- /dev/null +++ b/packages/queue/drizzle.config.ts @@ -0,0 +1,10 @@ +import type { Config } from "drizzle-kit"; + +export default { + schema: "./schema.ts", + out: "./drizzle", + driver: "better-sqlite", + dbCredentials: { + url: "data.db", + }, +} satisfies Config; diff --git a/packages/queue/drizzle/0000_wonderful_talisman.sql b/packages/queue/drizzle/0000_wonderful_talisman.sql new file mode 100644 index 00000000..e042ab92 --- /dev/null +++ b/packages/queue/drizzle/0000_wonderful_talisman.sql @@ -0,0 +1,18 @@ +CREATE TABLE `tasks` ( + `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, + `queue` text NOT NULL, + `payload` text NOT NULL, + `createdAt` integer NOT NULL, + `status` text DEFAULT 'pending' NOT NULL, + `expireAt` integer, + `allocationId` text NOT NULL, + `numRunsLeft` integer NOT NULL, + `maxNumRuns` integer NOT NULL +); +--> statement-breakpoint +CREATE INDEX `tasks_queue_idx` ON `tasks` (`queue`);--> statement-breakpoint +CREATE INDEX `tasks_status_idx` ON `tasks` (`status`);--> statement-breakpoint +CREATE INDEX `tasks_expire_at_idx` ON `tasks` (`expireAt`);--> statement-breakpoint +CREATE INDEX `tasks_num_runs_left_idx` ON `tasks` (`numRunsLeft`);--> statement-breakpoint +CREATE INDEX `tasks_max_num_runs_idx` ON `tasks` (`maxNumRuns`);--> statement-breakpoint +CREATE INDEX `tasks_allocation_id_idx` ON `tasks` (`allocationId`); \ No newline at end of file diff --git a/packages/queue/drizzle/meta/0000_snapshot.json b/packages/queue/drizzle/meta/0000_snapshot.json new file mode 100644 index 00000000..57c7c2f4 --- /dev/null +++ b/packages/queue/drizzle/meta/0000_snapshot.json @@ -0,0 +1,130 @@ +{ + "version": "5", + "dialect": "sqlite", + "id": "3094773c-0138-46b2-b617-4b10093b0f53", + "prevId": "00000000-0000-0000-0000-000000000000", + "tables": { + "tasks": { + "name": "tasks", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "queue": { + "name": "queue", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "payload": { + "name": "payload", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'pending'" + }, + "expireAt": { + "name": "expireAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "allocationId": { + "name": "allocationId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "numRunsLeft": { + "name": "numRunsLeft", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "maxNumRuns": { + "name": "maxNumRuns", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "tasks_queue_idx": { + "name": "tasks_queue_idx", + "columns": [ + "queue" + ], + "isUnique": false + }, + "tasks_status_idx": { + "name": "tasks_status_idx", + "columns": [ + "status" + ], + "isUnique": false + }, + "tasks_expire_at_idx": { + "name": "tasks_expire_at_idx", + "columns": [ + "expireAt" + ], + "isUnique": false + }, + "tasks_num_runs_left_idx": { + "name": "tasks_num_runs_left_idx", + "columns": [ + "numRunsLeft" + ], + "isUnique": false + }, + "tasks_max_num_runs_idx": { + "name": "tasks_max_num_runs_idx", + "columns": [ + "maxNumRuns" + ], + "isUnique": false + }, + "tasks_allocation_id_idx": { + "name": "tasks_allocation_id_idx", + "columns": [ + "allocationId" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + } + }, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + } +} \ No newline at end of file diff --git a/packages/queue/drizzle/meta/_journal.json b/packages/queue/drizzle/meta/_journal.json new file mode 100644 index 00000000..2b14f895 --- /dev/null +++ b/packages/queue/drizzle/meta/_journal.json @@ -0,0 +1,13 @@ +{ + "version": "5", + "dialect": "sqlite", + "entries": [ + { + "idx": 0, + "version": "5", + "when": 1720992922192, + "tag": "0000_wonderful_talisman", + "breakpoints": true + } + ] +} \ No newline at end of file diff --git a/packages/queue/index.ts b/packages/queue/index.ts new file mode 100644 index 00000000..c9144f29 --- /dev/null +++ b/packages/queue/index.ts @@ -0,0 +1,6 @@ +export { SqliteQueue } from "./queue"; +export { buildDBClient, migrateDB } from "./db"; +export type { SqliteQueueOptions, RunnerOptions, RunnerFuncs } from "./options"; +export { Runner } from "./runner"; + +export type { DequeuedJob, DequeuedJobError } from "./types"; diff --git a/packages/queue/options.ts b/packages/queue/options.ts new file mode 100644 index 00000000..18f8e52d --- /dev/null +++ b/packages/queue/options.ts @@ -0,0 +1,22 @@ +import { ZodType } from "zod"; + +import { DequeuedJob, DequeuedJobError } from "./types"; + +export interface SqliteQueueOptions { + defaultJobArgs: { + numRetries: number; + }; +} + +export interface RunnerFuncs { + run: (job: DequeuedJob) => Promise; + onComplete?: (job: DequeuedJob) => Promise; + onError?: (job: DequeuedJobError) => Promise; +} + +export interface RunnerOptions { + pollIntervalMs: number; + timeoutSecs: number; + concurrency: number; + validator?: ZodType; +} diff --git a/packages/queue/package.json b/packages/queue/package.json new file mode 100644 index 00000000..e0e9d5d1 --- /dev/null +++ b/packages/queue/package.json @@ -0,0 +1,35 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@hoarder/queue", + "version": "0.1.0", + "private": true, + "type": "module", + "dependencies": { + "better-sqlite3": "^9.4.3", + "drizzle-orm": "^0.29.4", + "zod": "^3.22.4", + "async-mutex": "^0.4.1" + }, + "devDependencies": { + "@hoarder/eslint-config": "workspace:^0.2.0", + "@hoarder/prettier-config": "workspace:^0.1.0", + "@hoarder/tsconfig": "workspace:^0.1.0", + "@types/better-sqlite3": "^7.6.9", + "drizzle-kit": "^0.20.14", + "vitest": "^1.3.1" + }, + "scripts": { + "typecheck": "tsc --noEmit", + "test": "vitest", + "format": "prettier . --ignore-path ../../.prettierignore", + "lint": "eslint ." + }, + "main": "index.ts", + "eslintConfig": { + "root": true, + "extends": [ + "@hoarder/eslint-config/base" + ] + }, + "prettier": "@hoarder/prettier-config" +} diff --git a/packages/queue/queue.ts b/packages/queue/queue.ts new file mode 100644 index 00000000..ad486468 --- /dev/null +++ b/packages/queue/queue.ts @@ -0,0 +1,146 @@ +import assert from "node:assert"; +import { and, asc, count, eq, gt, lt, or } from "drizzle-orm"; + +import { buildDBClient } from "./db"; +import { SqliteQueueOptions } from "./options"; +import { Job, tasksTable } from "./schema"; + +// generate random id +function generateAllocationId() { + return Math.random().toString(36).substring(2, 15); +} + +export class SqliteQueue { + queueName: string; + db: ReturnType; + options: SqliteQueueOptions; + + constructor( + name: string, + db: ReturnType, + options: SqliteQueueOptions, + ) { + this.queueName = name; + this.options = options; + this.db = db; + } + + name() { + return this.queueName; + } + + async enqueue(payload: T): Promise { + const job = await this.db + .insert(tasksTable) + .values({ + queue: this.queueName, + payload: JSON.stringify(payload), + numRunsLeft: this.options.defaultJobArgs.numRetries + 1, + maxNumRuns: this.options.defaultJobArgs.numRetries + 1, + allocationId: generateAllocationId(), + }) + .returning(); + + return job[0]; + } + + async stats() { + const res = await this.db + .select({ status: tasksTable.status, count: count() }) + .from(tasksTable) + .where(eq(tasksTable.queue, this.queueName)) + .groupBy(tasksTable.status); + + return res.reduce( + (acc, r) => { + acc[r.status] += r.count; + return acc; + }, + { + pending: 0, + pending_retry: 0, + running: 0, + failed: 0, + }, + ); + } + + async attemptDequeue(options: { timeoutSecs: number }): Promise { + return await this.db.transaction(async (txn) => { + const jobs = await txn + .select() + .from(tasksTable) + .where( + and( + eq(tasksTable.queue, this.queueName), + gt(tasksTable.numRunsLeft, 0), + or( + // Not picked by a worker yet + eq(tasksTable.status, "pending"), + + // Failed but still has attempts left + eq(tasksTable.status, "pending_retry"), + + // Expired and still has attempts left + and( + eq(tasksTable.status, "running"), + lt(tasksTable.expireAt, new Date()), + ), + ), + ), + ) + .orderBy(asc(tasksTable.createdAt)) + .limit(1); + + if (jobs.length == 0) { + return null; + } + assert(jobs.length == 1); + const job = jobs[0]; + + const result = await txn + .update(tasksTable) + .set({ + status: "running", + numRunsLeft: job.numRunsLeft - 1, + allocationId: generateAllocationId(), + expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), + }) + .where( + and( + eq(tasksTable.id, job.id), + + // The compare and swap is necessary to avoid race conditions + eq(tasksTable.allocationId, job.allocationId), + ), + ) + .returning(); + if (result.length == 0) { + return null; + } + assert(result.length == 1); + return result[0]; + }); + } + + async finalize( + id: number, + alloctionId: string, + status: "completed" | "pending_retry" | "failed", + ) { + if (status == "completed") { + await this.db + .delete(tasksTable) + .where( + and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), + ); + } else { + await this.db + .update(tasksTable) + .set({ status: status, expireAt: null }) + .where( + and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), + ); + } + } +} diff --git a/packages/queue/runner.test.ts b/packages/queue/runner.test.ts new file mode 100644 index 00000000..9e50c9a5 --- /dev/null +++ b/packages/queue/runner.test.ts @@ -0,0 +1,440 @@ +/* eslint-disable @typescript-eslint/require-await */ +import { Semaphore } from "async-mutex"; +import { eq } from "drizzle-orm"; +import { describe, expect, test } from "vitest"; +import { z } from "zod"; + +import { + buildDBClient, + DequeuedJob, + DequeuedJobError, + Runner, + RunnerOptions, + SqliteQueue, +} from "./"; +import { tasksTable } from "./schema"; + +class Baton { + semaphore: Semaphore; + constructor() { + this.semaphore = new Semaphore(0); + this.reset(); + } + post() { + this.semaphore.setValue(100000); + } + + async wait() { + await this.semaphore.acquire(); + } + + reset() { + this.semaphore.setValue(-Infinity); + } +} + +class Barrier { + semaphore: Semaphore; + baton: Baton; + constructor(numParticipants: number) { + this.semaphore = new Semaphore(numParticipants * -1 + 1); + this.baton = new Baton(); + this.reset(numParticipants * -1 + 1); + } + + async notifyReachedAndWait() { + this.semaphore.release(); + await this.baton.wait(); + } + + async waitUntilAllReached() { + await this.semaphore.waitForUnlock(); + } + + allowParticipantsToProceed() { + this.baton.post(); + } + + reset(numParticipants: number) { + this.semaphore.setValue(numParticipants); + this.baton.reset(); + } +} + +const defaultRunnerOpts = { + pollIntervalMs: 100, + timeoutSecs: 100, + concurrency: 2, + validator: z.object({ + increment: z.number(), + succeedAfter: z.number().optional().default(0), + blockForSec: z.number().optional().default(0), + }), +}; + +interface Work { + increment: number; + succeedAfter?: number; + blockForSec?: number; +} + +interface Results { + result: number; + numCalled: number; + numCompleted: number; + numFailed: number; +} + +async function waitUntilAllSettled(queue: SqliteQueue) { + let stats = await queue.stats(); + while (stats.running > 0 || stats.pending > 0 || stats.pending_retry > 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); + stats = await queue.stats(); + console.log(stats); + } +} + +function buildRunner( + queue: SqliteQueue, + opts: RunnerOptions, + barrier: Barrier, + inputResults?: Results, +) { + const results = inputResults ?? { + result: 0, + numCalled: 0, + numCompleted: 0, + numFailed: 0, + }; + const runner = new Runner( + queue, + { + run: async (job: DequeuedJob) => { + console.log("STARTED:", job); + results.numCalled++; + await barrier.notifyReachedAndWait(); + if (job.runNumber < (job.data.succeedAfter ?? 0)) { + throw new Error("Failed"); + } + if (job.data.blockForSec !== undefined) { + await new Promise((resolve) => + setTimeout(resolve, job.data.blockForSec! * 1000), + ); + } + results.result += job.data.increment; + }, + onComplete: async (job: DequeuedJob) => { + console.log("COMPLETED:", job); + results.numCompleted++; + }, + onError: async (job: DequeuedJobError) => { + console.log("FAILED:", job); + results.numFailed++; + }, + }, + opts, + ); + + return { runner, results }; +} + +describe("SqiteQueueRunner", () => { + test("should run jobs with correct concurrency", async () => { + const queue = new SqliteQueue( + "queue1", + buildDBClient(":memory:", true), + { + defaultJobArgs: { + numRetries: 0, + }, + }, + ); + + const barrier = new Barrier(2); + const { runner, results } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 2 }, + barrier, + ); + + queue.enqueue({ increment: 1 }); + queue.enqueue({ increment: 2 }); + queue.enqueue({ increment: 3 }); + + expect(await queue.stats()).toEqual({ + pending: 3, + running: 0, + pending_retry: 0, + failed: 0, + }); + + const runnerPromise = runner.runUntilEmpty(); + + // Wait until all runners reach the synchronization point + await barrier.waitUntilAllReached(); + + // Ensure that we have two "running" jobs given the concurrency of 2 + expect(await queue.stats()).toEqual({ + pending: 1, + running: 2, + pending_retry: 0, + failed: 0, + }); + + // Allow jobs to proceed + barrier.allowParticipantsToProceed(); + + // Wait until all jobs are consumed + await runnerPromise; + + expect(await queue.stats()).toEqual({ + pending: 0, + running: 0, + pending_retry: 0, + failed: 0, + }); + + expect(results.result).toEqual(6); + expect(results.numCalled).toEqual(3); + expect(results.numCompleted).toEqual(3); + expect(results.numFailed).toEqual(0); + }); + + test("should retry errors", async () => { + const queue = new SqliteQueue( + "queue1", + buildDBClient(":memory:", true), + { + defaultJobArgs: { + numRetries: 2, + }, + }, + ); + + const barrier = new Barrier(0); + barrier.allowParticipantsToProceed(); + const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier); + + queue.enqueue({ increment: 1, succeedAfter: 2 }); + queue.enqueue({ increment: 1, succeedAfter: 10 }); + queue.enqueue({ increment: 3, succeedAfter: 0 }); + + const runnerPromise = runner.runUntilEmpty(); + + // Wait until all jobs are consumed + await runnerPromise; + + expect(await queue.stats()).toEqual({ + pending: 0, + pending_retry: 0, + running: 0, + failed: 1, + }); + + expect(results.result).toEqual(4); + expect(results.numCalled).toEqual(7); + expect(results.numCompleted).toEqual(2); + expect(results.numFailed).toEqual(1); + }); + + test("timeouts are respected", async () => { + const queue = new SqliteQueue( + "queue1", + buildDBClient(":memory:", true), + { + defaultJobArgs: { + numRetries: 1, + }, + }, + ); + + const barrier = new Barrier(1); + barrier.allowParticipantsToProceed(); + const { runner: runner, results } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 1, timeoutSecs: 1 }, + barrier, + ); + + queue.enqueue({ increment: 1, blockForSec: 10 }); + await runner.runUntilEmpty(); + + expect(await queue.stats()).toEqual({ + pending: 0, + pending_retry: 0, + running: 0, + failed: 1, + }); + + expect(results.result).toEqual(0); + expect(results.numCalled).toEqual(2); + expect(results.numCompleted).toEqual(0); + expect(results.numFailed).toEqual(1); + }); + + test("serialization errors", async () => { + const queue = new SqliteQueue( + "queue1", + buildDBClient(":memory:", true), + { + defaultJobArgs: { + numRetries: 1, + }, + }, + ); + + const job = await queue.enqueue({ increment: 1 }); + // Corrupt the payload + await queue.db + .update(tasksTable) + .set({ payload: "{}" }) + .where(eq(tasksTable.id, job.id)); + + const barrier = new Barrier(1); + barrier.allowParticipantsToProceed(); + const { runner, results } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 1 }, + barrier, + ); + + const p = runner.run(); + await waitUntilAllSettled(queue); + runner.stop(); + await p; + + expect(await queue.stats()).toEqual({ + pending: 0, + pending_retry: 0, + running: 0, + failed: 1, + }); + + expect(results.result).toEqual(0); + expect(results.numCalled).toEqual(0); + expect(results.numCompleted).toEqual(0); + expect(results.numFailed).toEqual(1); + }); + + test("concurrent runners", async () => { + const queue = new SqliteQueue( + "queue1", + buildDBClient(":memory:", true), + { + defaultJobArgs: { + numRetries: 0, + }, + }, + ); + + await queue.enqueue({ increment: 1 }); + await queue.enqueue({ increment: 2 }); + await queue.enqueue({ increment: 3 }); + + const barrier = new Barrier(3); + const { runner: runner1, results } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 1 }, + barrier, + ); + const { runner: runner2 } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 1 }, + barrier, + results, + ); + const { runner: runner3 } = buildRunner( + queue, + { ...defaultRunnerOpts, concurrency: 1 }, + barrier, + results, + ); + + const runPromises = Promise.all([ + runner1.run(), + runner2.run(), + runner3.run(), + ]); + + await barrier.waitUntilAllReached(); + + expect(await queue.stats()).toEqual({ + pending: 0, + pending_retry: 0, + running: 3, + failed: 0, + }); + + barrier.allowParticipantsToProceed(); + + runner1.stop(); + runner2.stop(); + runner3.stop(); + + await runPromises; + + expect(results.result).toEqual(6); + expect(results.numCalled).toEqual(3); + expect(results.numCompleted).toEqual(3); + expect(results.numFailed).toEqual(0); + }); + + test("large test", async () => { + const db = buildDBClient(":memory:", true); + const queue1 = new SqliteQueue("queue1", db, { + defaultJobArgs: { + numRetries: 0, + }, + }); + const queue2 = new SqliteQueue("queue2", db, { + defaultJobArgs: { + numRetries: 0, + }, + }); + + const barrier = new Barrier(0); + barrier.allowParticipantsToProceed(); + const results = { + result: 0, + numCalled: 0, + numCompleted: 0, + numFailed: 0, + }; + const runners = []; + const runnerPromises = []; + + for (let i = 0; i < 10; i++) { + const { runner } = buildRunner( + i % 2 == 0 ? queue1 : queue2, + { ...defaultRunnerOpts, concurrency: 2 }, + barrier, + results, + ); + runners.push(runner); + runnerPromises.push(runner.run()); + } + + { + const enqueuePromises = []; + for (let i = 0; i < 1000; i++) { + enqueuePromises.push( + (i % 2 == 0 ? queue1 : queue2).enqueue({ increment: i }), + ); + } + await Promise.all(enqueuePromises); + } + + await Promise.all([ + waitUntilAllSettled(queue1), + waitUntilAllSettled(queue2), + ]); + + runners.forEach((runner) => runner.stop()); + await Promise.all(runnerPromises); + + expect(results.result).toEqual(499500); + expect(results.numCalled).toEqual(1000); + expect(results.numCompleted).toEqual(1000); + expect(results.numFailed).toEqual(0); + }); +}); diff --git a/packages/queue/runner.ts b/packages/queue/runner.ts new file mode 100644 index 00000000..1a90f969 --- /dev/null +++ b/packages/queue/runner.ts @@ -0,0 +1,115 @@ +import assert from "node:assert"; +import { Semaphore } from "async-mutex"; + +import { RunnerFuncs, RunnerOptions } from "./options"; +import { SqliteQueue } from "./queue"; +import { Job } from "./schema"; +import { DequeuedJob } from "./types"; + +export class Runner { + queue: SqliteQueue; + funcs: RunnerFuncs; + opts: RunnerOptions; + stopping = false; + + constructor( + queue: SqliteQueue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ) { + this.queue = queue; + this.funcs = funcs; + this.opts = opts; + } + + async run() { + return this.runImpl(false); + } + + stop() { + this.stopping = true; + } + + async runUntilEmpty() { + return this.runImpl(true); + } + + async runImpl(breakOnEmpty: boolean) { + const semaphore = new Semaphore(this.opts.concurrency); + const inFlight = new Map>(); + while (!this.stopping) { + await semaphore.waitForUnlock(); + const job = await this.queue.attemptDequeue({ + timeoutSecs: this.opts.timeoutSecs, + }); + if (!job && breakOnEmpty && inFlight.size == 0) { + // No more jobs to process, and no ongoing jobs. + break; + } + if (!job) { + await new Promise((resolve) => + setTimeout(resolve, this.opts.pollIntervalMs), + ); + continue; + } + const [_, release] = await semaphore.acquire(); + inFlight.set( + job.id, + this.runOnce(job).finally(() => { + inFlight.delete(job.id); + release(); + }), + ); + } + await Promise.allSettled(inFlight.values()); + } + + async runOnce(job: Job) { + assert(job.allocationId); + + let parsed: T; + try { + parsed = JSON.parse(job.payload) as T; + if (this.opts.validator) { + parsed = this.opts.validator.parse(parsed); + } + } catch (e) { + if (job.numRunsLeft <= 0) { + await this.funcs.onError?.({ + id: job.id.toString(), + error: e as Error, + }); + await this.queue.finalize(job.id, job.allocationId, "failed"); + } else { + await this.queue.finalize(job.id, job.allocationId, "pending_retry"); + } + return; + } + + const dequeuedJob: DequeuedJob = { + id: job.id.toString(), + data: parsed, + runNumber: job.maxNumRuns - job.numRunsLeft - 1, + }; + try { + await Promise.race([ + this.funcs.run(dequeuedJob), + new Promise((_, reject) => + setTimeout( + () => reject(new Error("Timeout")), + this.opts.timeoutSecs * 1000, + ), + ), + ]); + await this.funcs.onComplete?.(dequeuedJob); + await this.queue.finalize(job.id, job.allocationId, "completed"); + } catch (e) { + if (job.numRunsLeft <= 0) { + await this.funcs.onError?.({ ...dequeuedJob, error: e as Error }); + await this.queue.finalize(job.id, job.allocationId, "failed"); + } else { + await this.queue.finalize(job.id, job.allocationId, "pending_retry"); + } + } + } +} diff --git a/packages/queue/schema.ts b/packages/queue/schema.ts new file mode 100644 index 00000000..377c6b1c --- /dev/null +++ b/packages/queue/schema.ts @@ -0,0 +1,36 @@ +import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core"; + +function createdAtField() { + return integer("createdAt", { mode: "timestamp" }) + .notNull() + .$defaultFn(() => new Date()); +} + +export const tasksTable = sqliteTable( + "tasks", + { + id: integer("id").notNull().primaryKey({ autoIncrement: true }), + queue: text("queue").notNull(), + payload: text("payload").notNull(), + createdAt: createdAtField(), + status: text("status", { + enum: ["pending", "running", "pending_retry", "failed"], + }) + .notNull() + .default("pending"), + expireAt: integer("expireAt", { mode: "timestamp" }), + allocationId: text("allocationId").notNull(), + numRunsLeft: integer("numRunsLeft").notNull(), + maxNumRuns: integer("maxNumRuns").notNull(), + }, + (tasks) => ({ + queueIdx: index("tasks_queue_idx").on(tasks.queue), + statusIdx: index("tasks_status_idx").on(tasks.status), + expireAtIdx: index("tasks_expire_at_idx").on(tasks.expireAt), + numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft), + maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns), + allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId), + }), +); + +export type Job = typeof tasksTable.$inferSelect; diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json new file mode 100644 index 00000000..71bf61e7 --- /dev/null +++ b/packages/queue/tsconfig.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@hoarder/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + }, +} diff --git a/packages/queue/types.ts b/packages/queue/types.ts new file mode 100644 index 00000000..01975cc7 --- /dev/null +++ b/packages/queue/types.ts @@ -0,0 +1,11 @@ +export interface DequeuedJob { + id: string; + data: T; + runNumber: number; +} + +export interface DequeuedJobError { + id: string; + data?: T; + error: Error; +} diff --git a/packages/queue/vitest.config.ts b/packages/queue/vitest.config.ts new file mode 100644 index 00000000..a206cfc4 --- /dev/null +++ b/packages/queue/vitest.config.ts @@ -0,0 +1,13 @@ +/// + +import { defineConfig } from "vitest/config"; + +// https://vitejs.dev/config/ +export default defineConfig({ + plugins: [], + test: { + alias: { + "@/*": "./*", + }, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eac8aee8..b405d057 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -834,6 +834,40 @@ importers: specifier: ^0.20.14 version: 0.20.14 + packages/queue: + dependencies: + async-mutex: + specifier: ^0.4.1 + version: 0.4.1 + better-sqlite3: + specifier: ^9.4.3 + version: 9.4.3 + drizzle-orm: + specifier: ^0.29.4 + version: 0.29.4(@types/better-sqlite3@7.6.9)(better-sqlite3@9.4.3) + zod: + specifier: ^3.22.4 + version: 3.22.4 + devDependencies: + '@hoarder/eslint-config': + specifier: workspace:^0.2.0 + version: link:../../tooling/eslint + '@hoarder/prettier-config': + specifier: workspace:^0.1.0 + version: link:../../tooling/prettier + '@hoarder/tsconfig': + specifier: workspace:^0.1.0 + version: link:../../tooling/typescript + '@types/better-sqlite3': + specifier: ^7.6.9 + version: 7.6.9 + drizzle-kit: + specifier: ^0.20.14 + version: 0.20.14 + vitest: + specifier: ^1.3.1 + version: 1.3.1(@types/node@20.11.20) + packages/shared: dependencies: bullmq: @@ -7136,16 +7170,20 @@ packages: glob@6.0.4: resolution: {integrity: sha512-MKZeRNyYZAVVVG1oZeLaWie1uweH40m9AZwIwxyPbTSX4hHrVYSzLg0Ro5Z5R7XKkIX+Cc6oD1rqeDJnwsB8/A==} + deprecated: Glob versions prior to v9 are no longer supported glob@7.1.6: resolution: {integrity: sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA==} + deprecated: Glob versions prior to v9 are no longer supported glob@7.2.3: resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==} + deprecated: Glob versions prior to v9 are no longer supported glob@8.1.0: resolution: {integrity: sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==} engines: {node: '>=12'} + deprecated: Glob versions prior to v9 are no longer supported global-dirs@3.0.1: resolution: {integrity: sha512-NBcGGFbBA9s1VzD41QXDG+3++t9Mn5t1FpLdhESY6oKY4gYTFpX4wO3sqGUa0Srjtbfj3szX0RnemmrVRUdULA==} @@ -7526,6 +7564,7 @@ packages: inflight@1.0.6: resolution: {integrity: sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==} + deprecated: This module is not supported, and leaks memory. Do not use it. Check out lru-cache if you want a good and tested way to coalesce async requests by a key value, which is much more comprehensive and powerful. inherits@2.0.3: resolution: {integrity: sha512-x00IRNXNy63jwGkJmzPigoySHbaqpNuzKbBOmzK+g2OdZpQ9w+sxCN+VSB3ja7IAge2OP2qpfxTjeNcyjmW1uw==}