Skip to content

Commit

Permalink
fix scheduler implementation/types
Browse files Browse the repository at this point in the history
  • Loading branch information
threepointone committed Feb 25, 2025
1 parent 2aa3ecf commit 90fe787
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/weak-colts-jog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/agents": patch
---

fix scheduler implementation/types
4 changes: 2 additions & 2 deletions examples/playground/src/agents/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ ${event.input}
}
}

async onTask(task: Schedule) {
async onTask(payload: unknown, schedule: Schedule<string>) {
this.broadcast(
JSON.stringify({
type: "run-schedule",
data: convertScheduleToScheduledItem(task),
data: convertScheduleToScheduledItem(schedule),
} satisfies OutgoingMessage)
);
}
Expand Down
55 changes: 34 additions & 21 deletions packages/agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import { WorkflowEntrypoint as CFWorkflowEntrypoint } from "cloudflare:workers";

export class WorkflowEntrypoint extends CFWorkflowEntrypoint {}

export type Schedule = {
export type Schedule<T = string> = {
id: string;
callback: string;
payload: string;
payload: T;
} & (
| {
type: "scheduled";
Expand Down Expand Up @@ -194,11 +194,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {
throw new Error("Not implemented");
}

async schedule(
async schedule<T = string>(
when: Date | string | number,
callback: string,
payload: unknown
): Promise<Schedule> {
payload: T
): Promise<Schedule<T>> {
const id = nanoid(9);

if (when instanceof Date) {
Expand All @@ -215,7 +215,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
return {
id,
callback,
payload: JSON.stringify(payload),
payload,
time: timestamp,
type: "scheduled",
};
Expand All @@ -235,7 +235,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
return {
id,
callback,
payload: JSON.stringify(payload),
payload,
delayInSeconds: when,
time: timestamp,
type: "delayed",
Expand All @@ -256,7 +256,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
return {
id,
callback,
payload: JSON.stringify(payload),
payload,
cron: when,
time: timestamp,
type: "cron",
Expand All @@ -265,22 +265,22 @@ export class Agent<Env, State = unknown> extends Server<Env> {
throw new Error("Invalid schedule type");
}
}
async getSchedule(id: string): Promise<Schedule | undefined> {
const result = this.sql<Schedule>`
async getSchedule<T = string>(id: string): Promise<Schedule<T> | undefined> {
const result = this.sql<Schedule<string>>`
SELECT * FROM cf_agents_schedules WHERE id = ${id}
`;
if (!result) return undefined;

return result[0];
return { ...result[0], payload: JSON.parse(result[0].payload) as T };
}
getSchedules(
getSchedules<T = string>(
criteria: {
description?: string;
id?: string;
type?: "scheduled" | "delayed" | "cron";
timeRange?: { start?: Date; end?: Date };
} = {}
): Schedule[] {
): Schedule<T>[] {
let query = "SELECT * FROM cf_agents_schedules WHERE 1=1";
const params = [];

Expand Down Expand Up @@ -311,7 +311,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {

const result = this.ctx.storage.sql
.exec(query, ...params)
.toArray() as Schedule[];
.toArray()
.map((row) => ({
...row,
payload: JSON.parse(row.payload as string) as T,
})) as Schedule<T>[];

return result;
}
Expand Down Expand Up @@ -343,17 +347,26 @@ export class Agent<Env, State = unknown> extends Server<Env> {
const now = Math.floor(Date.now() / 1000);

// Get all schedules that should be executed now
const result = this.sql<Schedule>`
const result = this.sql<Schedule<string>>`
SELECT * FROM cf_agents_schedules WHERE time <= ${now}
`;

for (const row of result || []) {
(
this[row.callback as keyof Agent<Env>] as (
schedule: Schedule
) => Promise<void>
)(row);

const callback = this[row.callback as keyof Agent<Env>];
if (!callback) {
console.error(`callback ${row.callback} not found`);
continue;
}
try {
(
callback as (
payload: unknown,
schedule: Schedule<unknown>
) => Promise<void>
).bind(this)(JSON.parse(row.payload as string), row);
} catch (e) {
console.error(`error executing callback ${row.callback}`, e);
}
if (row.type === "cron") {
// Update next execution time for cron schedules
const nextExecutionTime = getNextCronTime(row.cron);
Expand Down

0 comments on commit 90fe787

Please sign in to comment.