From 90fe7878ff0be64a41023070cc77742e49ec542e Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Tue, 25 Feb 2025 01:26:02 +0000 Subject: [PATCH] fix scheduler implementation/types --- .changeset/weak-colts-jog.md | 5 ++ examples/playground/src/agents/scheduler.ts | 4 +- packages/agents/src/index.ts | 55 +++++++++++++-------- 3 files changed, 41 insertions(+), 23 deletions(-) create mode 100644 .changeset/weak-colts-jog.md diff --git a/.changeset/weak-colts-jog.md b/.changeset/weak-colts-jog.md new file mode 100644 index 0000000..31c0c38 --- /dev/null +++ b/.changeset/weak-colts-jog.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/agents": patch +--- + +fix scheduler implementation/types diff --git a/examples/playground/src/agents/scheduler.ts b/examples/playground/src/agents/scheduler.ts index 09c3483..bdba074 100644 --- a/examples/playground/src/agents/scheduler.ts +++ b/examples/playground/src/agents/scheduler.ts @@ -121,11 +121,11 @@ ${event.input} } } - async onTask(task: Schedule) { + async onTask(payload: unknown, schedule: Schedule) { this.broadcast( JSON.stringify({ type: "run-schedule", - data: convertScheduleToScheduledItem(task), + data: convertScheduleToScheduledItem(schedule), } satisfies OutgoingMessage) ); } diff --git a/packages/agents/src/index.ts b/packages/agents/src/index.ts index 15eac33..79ba4d5 100644 --- a/packages/agents/src/index.ts +++ b/packages/agents/src/index.ts @@ -17,10 +17,10 @@ import { WorkflowEntrypoint as CFWorkflowEntrypoint } from "cloudflare:workers"; export class WorkflowEntrypoint extends CFWorkflowEntrypoint {} -export type Schedule = { +export type Schedule = { id: string; callback: string; - payload: string; + payload: T; } & ( | { type: "scheduled"; @@ -194,11 +194,11 @@ export class Agent extends Server { throw new Error("Not implemented"); } - async schedule( + async schedule( when: Date | string | number, callback: string, - payload: unknown - ): Promise { + payload: T + ): Promise> { const id = nanoid(9); if (when instanceof Date) { @@ -215,7 +215,7 @@ export class Agent extends Server { return { id, callback, - payload: JSON.stringify(payload), + payload, time: timestamp, type: "scheduled", }; @@ -235,7 +235,7 @@ export class Agent extends Server { return { id, callback, - payload: JSON.stringify(payload), + payload, delayInSeconds: when, time: timestamp, type: "delayed", @@ -256,7 +256,7 @@ export class Agent extends Server { return { id, callback, - payload: JSON.stringify(payload), + payload, cron: when, time: timestamp, type: "cron", @@ -265,22 +265,22 @@ export class Agent extends Server { throw new Error("Invalid schedule type"); } } - async getSchedule(id: string): Promise { - const result = this.sql` + async getSchedule(id: string): Promise | undefined> { + const result = this.sql>` SELECT * FROM cf_agents_schedules WHERE id = ${id} `; if (!result) return undefined; - return result[0]; + return { ...result[0], payload: JSON.parse(result[0].payload) as T }; } - getSchedules( + getSchedules( criteria: { description?: string; id?: string; type?: "scheduled" | "delayed" | "cron"; timeRange?: { start?: Date; end?: Date }; } = {} - ): Schedule[] { + ): Schedule[] { let query = "SELECT * FROM cf_agents_schedules WHERE 1=1"; const params = []; @@ -311,7 +311,11 @@ export class Agent extends Server { const result = this.ctx.storage.sql .exec(query, ...params) - .toArray() as Schedule[]; + .toArray() + .map((row) => ({ + ...row, + payload: JSON.parse(row.payload as string) as T, + })) as Schedule[]; return result; } @@ -343,17 +347,26 @@ export class Agent extends Server { const now = Math.floor(Date.now() / 1000); // Get all schedules that should be executed now - const result = this.sql` + const result = this.sql>` SELECT * FROM cf_agents_schedules WHERE time <= ${now} `; for (const row of result || []) { - ( - this[row.callback as keyof Agent] as ( - schedule: Schedule - ) => Promise - )(row); - + const callback = this[row.callback as keyof Agent]; + if (!callback) { + console.error(`callback ${row.callback} not found`); + continue; + } + try { + ( + callback as ( + payload: unknown, + schedule: Schedule + ) => Promise + ).bind(this)(JSON.parse(row.payload as string), row); + } catch (e) { + console.error(`error executing callback ${row.callback}`, e); + } if (row.type === "cron") { // Update next execution time for cron schedules const nextExecutionTime = getNextCronTime(row.cron);