Skip to content

Commit

Permalink
feat: add cf worker timed task entry function
Browse files Browse the repository at this point in the history
  • Loading branch information
adolphnov committed Aug 14, 2024
1 parent 1b39f7b commit 2383edc
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 30 deletions.
26 changes: 9 additions & 17 deletions adapter/docker/index.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
import adapter from 'cloudflare-worker-adapter';
import { RedisCache } from 'cloudflare-worker-adapter/cache/redis.js';
import toml from 'toml';
import tasks from 'chatgpt-telegram-workers/task';
import { default as worker } from 'chatgpt-telegram-workers';

const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
const cache = new RedisCache(redisUrl);
const { default: worker } = await import('chatgpt-telegram-workers');

// 定时任务
const raw = fs.readFileSync('./config/config.toml');
const env = { ...toml.parse(raw).vars, DATABASE: cache };
if (env.SCHEDULE_TIME && env.SCHEDULE_TIME > 5) {
setInterval(
async () => {
const promises = [];
for (const task of Object.values(tasks)) {
promises.push(task(env));
}
await Promise.all(promises);
},
env.SCHEDULE_TIME * 60 * 1000,
);
}
// 定时任务
const raw = fs.readFileSync('../../wrangler.toml');
const env = { ...toml.parse(raw).vars, DATABASE: cache };
if (env.SCHEDULE_TIME && env.SCHEDULE_TIME > 5) {
setInterval(async () => {
await worker.scheduled(null, env, null);
}, env.SCHEDULE_TIME * 60 * 1000);
}

adapter.startServer(
8787,
Expand Down
10 changes: 3 additions & 7 deletions adapter/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import HttpsProxyAgent from 'https-proxy-agent';
import fetch from 'node-fetch';
import { ENV } from '../../src/config/env.js';
import toml from 'toml';
import tasks from "../../src/tools/scheduleTask.js";
import { default as worker } from '../../main.js';

const config = JSON.parse(fs.readFileSync('./config.json', 'utf-8'));

Expand Down Expand Up @@ -62,19 +62,15 @@ try {
const env = { ...toml.parse(raw).vars, DATABASE: cache };
if (env.SCHEDULE_TIME && env.SCHEDULE_TIME > 5) {
setInterval(async () => {
const promises = [];
for (const task of Object.values(tasks)) {
promises.push(task(env));
}
await Promise.all(promises);
await worker.scheduled(null, env, null);
}, env.SCHEDULE_TIME * 60 * 1000);
}

} catch (e) {
console.log(e);
}

const {default: worker} = await import('../../main.js');
// const {default: worker} = await import('../../main.js');
adapter.startServer(
config.port || 8787,
config.host || '0.0.0.0',
Expand Down
2 changes: 1 addition & 1 deletion dist/buildinfo.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"sha":"e008349","timestamp":1723608303}
{"sha":"1b39f7b","timestamp":1723611264}
83 changes: 80 additions & 3 deletions dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ var Environment = class {
// -- 版本数据 --
//
// 当前版本
BUILD_TIMESTAMP = 1723608303;
BUILD_TIMESTAMP = 1723611264;
// 当前版本 commit id
BUILD_VERSION = "e008349";
BUILD_VERSION = "1b39f7b";
// -- 基础配置 --
/**
* @type {I18n | null}
Expand Down Expand Up @@ -984,6 +984,21 @@ function deleteMessageFromTelegramWithContext(context) {
);
};
}
async function deleteMessagesFromTelegram(chat_id, bot_token, message_ids) {
return await fetch(
`${ENV.TELEGRAM_API_DOMAIN}/bot${bot_token}/deleteMessages`,
{
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
chat_id,
message_ids
})
}
).then((r) => r.json());
}
async function sendPhotoToTelegram(photo, token, context, _info = null) {
const url = `${ENV.TELEGRAM_API_DOMAIN}/bot${token}/sendPhoto`;
let body;
Expand Down Expand Up @@ -4697,12 +4712,62 @@ var jina_reader = {
type: "web_crawler"
};

// src/tools/scheduleTask.js
async function schedule_detele_message(ENV2) {
try {
console.log("- Start task: schedule_detele_message");
const DATABASE2 = ENV2.DATABASE;
const scheduleDeteleKey = "schedule_detele_message";
const scheduledData = JSON.parse(await DATABASE2.get(scheduleDeteleKey) || "{}");
let botTokens = [];
if (typeof ENV2.TELEGRAM_AVAILABLE_TOKENS === "string") {
botTokens = parseArray(ENV2.TELEGRAM_AVAILABLE_TOKENS);
} else
botTokens = ENV2.TELEGRAM_AVAILABLE_TOKENS;
const taskPromises = [];
for (const [bot_name, chats] of Object.entries(scheduledData)) {
const bot_index = ENV2.TELEGRAM_BOT_NAME.indexOf(bot_name);
if (bot_index < 0)
throw new Error("bot name is invalid");
const bot_token = botTokens[bot_index];
if (!bot_token)
throw new Error("bot token is null");
for (const [chat_id, messages] of Object.entries(chats)) {
if (messages.length === 0)
continue;
const expired_msgs = messages.filter((msg) => msg.ttl <= Date.now()).map((msg) => msg.id).flat();
if (expired_msgs.length === 0)
continue;
scheduledData[bot_name][chat_id] = messages.filter((msg) => msg.ttl > Date.now());
console.log(`Start delete: ${chat_id} - ${expired_msgs}`);
for (let i = 0; i < expired_msgs.length; i += 100) {
taskPromises.push(deleteMessagesFromTelegram(chat_id, bot_token, expired_msgs.slice(i, i + 100)));
}
}
}
const resp = await Promise.all(taskPromises);
for (const [i, { ok, description }] of Object.entries(resp)) {
if (ok) {
console.log(`task ${+i + 1}: delete successful`);
} else {
console.error(`task {i+1}: ${description}`);
}
}
await DATABASE2.put(scheduleDeteleKey, JSON.stringify(scheduledData));
return new Response(`{ok:"true"}`, { headers: { "Content-Type": "application/json" } });
} catch (e) {
console.error(e.message);
return new Response(`{ok:"false"}`, { headers: { "Content-Type": "application/json" } });
}
}
var scheduleTask_default = { schedule_detele_message };

// src/tools/index.js
var tools_default2 = { duckduckgo_search, jina_reader };

// main.js
var main_default = {
async fetch(request, env) {
async fetch(request, env, ctx) {
try {
env.tools = tools_default2;
initEnv(env, i18n);
Expand All @@ -4711,6 +4776,18 @@ var main_default = {
console.error(e);
return new Response(errorToString(e), { status: 500 });
}
},
async scheduled(event, env, ctx) {
try {
const promises = [];
for (const task of Object.values(scheduleTask_default)) {
promises.push(task(env));
}
await Promise.all(promises);
console.log("All tasks done.");
} catch (e) {
console.error("Error in scheduled tasks:", e);
}
}
};
export {
Expand Down
2 changes: 1 addition & 1 deletion dist/timestamp
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1723608303
1723611264
17 changes: 16 additions & 1 deletion main.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import {handleRequest} from './src/route.js';
import {errorToString} from './src/utils/utils.js';
import i18n from './src/i18n/index.js';
import tools from "./src/tools/index.js";
import tasks from "./src/tools/scheduleTask.js";


export default {
async fetch(request, env) {
async fetch(request, env, ctx) {
try {
env.tools = tools;
initEnv(env, i18n);
Expand All @@ -16,4 +17,18 @@ export default {
return new Response(errorToString(e), {status: 500});
}
},

async scheduled(event, env, ctx) {
try {
const promises = [];
for (const task of Object.values(tasks)) {
promises.push(task(env));
}
await Promise.all(promises);
console.log('All tasks done.');
} catch (e) {
console.error('Error in scheduled tasks:', e);
}
}
};

0 comments on commit 2383edc

Please sign in to comment.