diff --git a/dist/buildinfo.json b/dist/buildinfo.json index 21305b57..768ad367 100644 --- a/dist/buildinfo.json +++ b/dist/buildinfo.json @@ -1 +1 @@ -{"sha":"954d016","timestamp":1731348734} \ No newline at end of file +{"sha":"c9726d1","timestamp":1731403008} \ No newline at end of file diff --git a/dist/index.js b/dist/index.js index 4e9cb0a7..3a59cbcc 100644 --- a/dist/index.js +++ b/dist/index.js @@ -407,8 +407,8 @@ const ENV_KEY_MAPPER = { WORKERS_AI_MODEL: "WORKERS_CHAT_MODEL" }; class Environment extends EnvironmentConfig { - BUILD_TIMESTAMP = 1731348734; - BUILD_VERSION = "954d016"; + BUILD_TIMESTAMP = 1731403008; + BUILD_VERSION = "c9726d1"; I18N = loadI18n(); PLUGINS_ENV = {}; USER_CONFIG = createAgentUserConfig(); @@ -16038,7 +16038,7 @@ async function imageToBase64String(url) { function renderBase64DataURI(params) { return `data:${params.format};base64,${params.data}`; } -async function messageInitialize(sender) { +async function messageInitialize(sender, streamSender) { if (!sender.context.message_id) { try { setTimeout(() => sendAction(sender.api.token, sender.context.chat_id, "typing"), 0); @@ -16046,12 +16046,7 @@ async function messageInitialize(sender) { return; } log.info(`send init message`); - const response = await sender.sendPlainText("...", "chat"); - const msg = await response.json(); - log.info(`send init message done`); - sender.update({ - message_id: msg.result.message_id - }); + streamSender.send("...", "chat"); } catch (e) { console.error("Failed to initialize message:", e); } @@ -16060,7 +16055,7 @@ async function messageInitialize(sender) { async function chatWithLLM(message, params, context, modifier) { const sender = MessageSender.from(context.SHARE_CONTEXT.botToken, message); const streamSender = OnStreamHander(sender, context, message.text || ""); - streamSender.sentPromise = messageInitialize(sender); + messageInitialize(sender, streamSender); const agent = loadChatLLM(context.USER_CONFIG); if (!agent) { return streamSender.end?.("LLM is not enabled"); @@ -16155,10 +16150,10 @@ ${urls.join("\n")}`); } } function OnStreamHander(sender, context, question) { + let sentPromise = null; + let nextEnableTime = Date.now(); + const sentMessageIds2 = sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : []; const streamSender = { - nextEnableTime: Date.now(), - sentMessageIds: sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : [], - sentPromise: null, send: null, end: null }; @@ -16167,21 +16162,22 @@ function OnStreamHander(sender, context, question) { if (sender instanceof MessageSender && isTelegramChatTypeGroup(sender.context.chatType) && ENV.TELEGRAPH_NUM_LIMIT > 0 && text.length > ENV.TELEGRAPH_NUM_LIMIT && context) { return; } - if ((streamSender.nextEnableTime || 0) > Date.now()) { - log.info(`Need await: ${(streamSender.nextEnableTime || 0) - Date.now()}ms`); + if ((nextEnableTime || 0) > Date.now()) { + log.info(`Need await: ${(nextEnableTime || 0) - Date.now()}ms`); return; } if (ENV.TELEGRAM_MIN_STREAM_INTERVAL > 0) { - streamSender.nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL; + nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL; } const data = context ? `${getLog(context.USER_CONFIG)} ${text}` : text; - const resp = await sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE, "chat"); + sentPromise = sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE, "chat"); + const resp = await sentPromise; if (resp.status === 429) { const retryAfter = Number.parseInt(resp.headers.get("Retry-After") || ""); if (retryAfter) { - streamSender.nextEnableTime = Date.now() + retryAfter * 1e3; - log.info(`Status 429, need wait: ${streamSender.nextEnableTime - Date.now()}ms`); + nextEnableTime = Date.now() + retryAfter * 1e3; + log.info(`Status 429, need wait: ${nextEnableTime - Date.now()}ms`); return; } } @@ -16190,7 +16186,7 @@ ${text}` : text; sender.update({ message_id: respJson.result.message_id }); - streamSender.sentMessageIds.push(respJson.result.message_id); + sentMessageIds2.push(respJson.result.message_id); } else if (!resp.ok) { log.error(`send message failed: ${resp.status} ${resp.statusText}`); return sender.sendPlainText(text); @@ -16200,8 +16196,8 @@ ${text}` : text; } }; streamSender.end = async (text) => { - await streamSender.sentPromise; - await waitUntil((streamSender.nextEnableTime || 0) + 10); + await sentPromise; + await waitUntil((nextEnableTime || 0) + 10); if (sender instanceof MessageSender && isTelegramChatTypeGroup(sender.context.chatType) && ENV.TELEGRAPH_NUM_LIMIT > 0 && text.length > ENV.TELEGRAPH_NUM_LIMIT && context) { return sendTelegraph(context, sender, question || "Redo Question", text); } @@ -16990,6 +16986,7 @@ async function streamHandler(stream, contentExtractor, onStream, messageReferenc let updateStep = 5; let lastChunk = ""; const immediatePromise = Promise.resolve("[PROMISE DONE]"); + let sendPromise = null; try { for await (const part of stream) { const textPart = contentExtractor(part); @@ -17001,12 +16998,12 @@ async function streamHandler(stream, contentExtractor, onStream, messageReferenc messageReferencer?.push(lastChunk); lastChunk = textPart; if (lastChunk && lengthDelta > updateStep) { - if (onStream.sentPromise && await Promise.race([onStream.sentPromise, immediatePromise]) === "[PROMISE DONE]") { + if (sendPromise && await Promise.race([sendPromise, immediatePromise]) === "[PROMISE DONE]") { continue; } lengthDelta = 0; updateStep += 20; - onStream.sentPromise = onStream.send(`${contentFull}●`); + sendPromise = onStream.send(`${contentFull}●`); } } contentFull += lastChunk; @@ -17018,7 +17015,7 @@ async function streamHandler(stream, contentExtractor, onStream, messageReferenc contentFull += ` ERROR: ${e.message}`; } - await onStream.sentPromise; + await sendPromise; return contentFull; } async function requestChatCompletionsV2(params, onStream, onResult = null) { @@ -20188,7 +20185,6 @@ class PerplexityCommandHandler { const startTime2 = Date.now(); const result2 = await WssRequest(perplexityWsUrl, null, perplexityWsOptions, perplexityMessage, { onStream }).catch(console.error); logs.chat.time.push(((Date.now() - startTime2) / 1e3).toFixed(1)); - await waitUntil(onStream.nextEnableTime || 0); await onStream.end?.(result2); return new Response("success"); }; diff --git a/dist/timestamp b/dist/timestamp index bb3c130d..ca467d76 100644 --- a/dist/timestamp +++ b/dist/timestamp @@ -1 +1 @@ -1731348734 \ No newline at end of file +1731403008 \ No newline at end of file diff --git a/src/agent/model_middleware.ts b/src/agent/model_middleware.ts index 7e7f22fc..63a40b24 100644 --- a/src/agent/model_middleware.ts +++ b/src/agent/model_middleware.ts @@ -57,6 +57,7 @@ export function AIMiddleware({ config, _models, activeTools, onStream, toolChoic params.mode.toolChoice = toolChoice[step] as any; log.info(`toolChoice changed: ${JSON.stringify(toolChoice[step])}`); } + log.info(`warp params result: ${JSON.stringify(params)}`); return params; }, @@ -76,7 +77,7 @@ export function AIMiddleware({ config, _models, activeTools, onStream, toolChoic log.info('llm request end'); log.info(finishReason); log.info('step text:', text); - log.debug('step raw request:', request); + // log.debug('step raw request:', request); log.debug('step raw response:', response); const time = ((Date.now() - startTime!) / 1e3).toFixed(1); diff --git a/src/agent/request.ts b/src/agent/request.ts index bc5599da..7315dcbb 100644 --- a/src/agent/request.ts +++ b/src/agent/request.ts @@ -140,6 +140,7 @@ export async function streamHandler(stream: AsyncIterable, contentExtractor let lastChunk = ''; const immediatePromise = Promise.resolve('[PROMISE DONE]'); + let sendPromise: Promise | null = null; try { for await (const part of stream) { @@ -157,13 +158,13 @@ export async function streamHandler(stream: AsyncIterable, contentExtractor if (lastChunk && lengthDelta > updateStep) { // 已发送过消息且消息未发送完成 - if (onStream.sentPromise && (await Promise.race([onStream.sentPromise, immediatePromise]) === '[PROMISE DONE]')) { + if (sendPromise && (await Promise.race([sendPromise, immediatePromise]) === '[PROMISE DONE]')) { continue; } lengthDelta = 0; updateStep += 20; - onStream.sentPromise = onStream.send(`${contentFull}●`); + sendPromise = onStream.send(`${contentFull}●`); } } contentFull += lastChunk; @@ -175,7 +176,7 @@ export async function streamHandler(stream: AsyncIterable, contentExtractor contentFull += `\nERROR: ${(e as Error).message}`; } - await onStream.sentPromise; + await sendPromise; return contentFull; } export async function requestChatCompletionsV2(params: { model: LanguageModelV1; toolModel?: LanguageModelV1; prompt?: string; messages: CoreMessage[]; tools?: any; activeTools?: string[]; toolChoice?: CoreToolChoice[]; context: AgentUserConfig }, onStream: ChatStreamTextHandler | null, onResult: OnResult | null = null): Promise<{ messages: ResponseMessage[]; content: string }> { diff --git a/src/agent/types.ts b/src/agent/types.ts index 073a8d5d..c8726dd2 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -43,11 +43,8 @@ export type MessageTool = MessageBase & { }; export interface ChatStreamTextHandler { - send: (text: string, isEnd?: boolean, sendType?: 'chat' | 'telegraph') => Promise; + send: (text: string, sendType?: 'chat' | 'telegraph') => Promise; end?: (text: string) => Promise; - nextEnableTime?: number; - sentMessageIds?: number[]; - sentPromise?: Promise | null; } export type ImageAgentRequest = (prompt: string, context: AgentUserConfig) => Promise; diff --git a/src/telegram/command/system.ts b/src/telegram/command/system.ts index 7d1b0eae..37c99614 100644 --- a/src/telegram/command/system.ts +++ b/src/telegram/command/system.ts @@ -599,7 +599,6 @@ export class PerplexityCommandHandler implements CommandHandler { const startTime = Date.now(); const result = await WssRequest(perplexityWsUrl, null, perplexityWsOptions, perplexityMessage, { onStream }).catch(console.error); logs.chat.time.push(((Date.now() - startTime) / 1e3).toFixed(1)); - await waitUntil(onStream.nextEnableTime || 0); await onStream.end?.(result); return new Response('success'); }; diff --git a/src/telegram/handler/chat.ts b/src/telegram/handler/chat.ts index a8ed8c39..82b3d1dc 100644 --- a/src/telegram/handler/chat.ts +++ b/src/telegram/handler/chat.ts @@ -15,7 +15,7 @@ import { createTelegramBotAPI } from '../api'; import { MessageSender, sendAction, TelegraphSender } from '../utils/send'; import { isTelegramChatTypeGroup, type UnionData, waitUntil } from '../utils/utils'; -async function messageInitialize(sender: MessageSender): Promise { +async function messageInitialize(sender: MessageSender, streamSender: ChatStreamTextHandler): Promise { if (!sender.context.message_id) { try { setTimeout(() => sendAction(sender.api.token, sender.context.chat_id, 'typing'), 0); @@ -23,12 +23,7 @@ async function messageInitialize(sender: MessageSender): Promise { return; } log.info(`send init message`); - const response = await sender.sendPlainText('...', 'chat'); - const msg = await response.json() as Telegram.ResponseWithMessage; - log.info(`send init message done`); - sender.update({ - message_id: msg.result.message_id, - }); + streamSender.send('...', 'chat'); } catch (e) { console.error('Failed to initialize message:', e); } @@ -43,7 +38,7 @@ export async function chatWithLLM( ): Promise { const sender = MessageSender.from(context.SHARE_CONTEXT.botToken, message); const streamSender = OnStreamHander(sender, context, message.text || ''); - streamSender.sentPromise = messageInitialize(sender); + messageInitialize(sender, streamSender); const agent = loadChatLLM(context.USER_CONFIG); if (!agent) { @@ -155,10 +150,11 @@ export class ChatHandler implements MessageHandler { } export function OnStreamHander(sender: MessageSender | ChosenInlineSender, context?: WorkerContext, question?: string): ChatStreamTextHandler { + let sentPromise = null as Promise | null; + let nextEnableTime = Date.now(); + const sentMessageIds = sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : []; + const streamSender = { - nextEnableTime: Date.now(), - sentMessageIds: sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : [], - sentPromise: null as Promise | null, send: null as ((text: string, isEnd: boolean, sendType?: 'chat' | 'telegraph') => Promise) | null, end: null as ((text: string) => Promise) | null, }; @@ -172,25 +168,26 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte return; } // 判断是否需要等待 - if ((streamSender.nextEnableTime || 0) > Date.now()) { - log.info(`Need await: ${(streamSender.nextEnableTime || 0) - Date.now()}ms`); + if ((nextEnableTime || 0) > Date.now()) { + log.info(`Need await: ${(nextEnableTime || 0) - Date.now()}ms`); return; } // 设置最小流间隔 if (ENV.TELEGRAM_MIN_STREAM_INTERVAL > 0) { - streamSender.nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL; + nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL; } const data = context ? `${getLog(context.USER_CONFIG)}\n${text}` : text; - const resp = await sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE as Telegram.ParseMode, 'chat'); + sentPromise = sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE as Telegram.ParseMode, 'chat'); + const resp = await sentPromise; // 判断429 if (resp.status === 429) { // 获取重试时间 const retryAfter = Number.parseInt(resp.headers.get('Retry-After') || ''); if (retryAfter) { - streamSender.nextEnableTime = Date.now() + retryAfter * 1000; - log.info(`Status 429, need wait: ${streamSender.nextEnableTime - Date.now()}ms`); + nextEnableTime = Date.now() + retryAfter * 1000; + log.info(`Status 429, need wait: ${nextEnableTime - Date.now()}ms`); return; } } @@ -200,7 +197,7 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte sender.update({ message_id: respJson.result.message_id, }); - streamSender.sentMessageIds.push(respJson.result.message_id); + sentMessageIds.push(respJson.result.message_id); } else if (!resp.ok) { log.error(`send message failed: ${resp.status} ${resp.statusText}`); return sender.sendPlainText(text); @@ -211,8 +208,8 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte }; streamSender.end = async (text: string): Promise => { - await streamSender.sentPromise; - await waitUntil((streamSender.nextEnableTime || 0) + 10); + await sentPromise; + await waitUntil((nextEnableTime || 0) + 10); if (sender instanceof MessageSender && isTelegramChatTypeGroup(sender.context.chatType) && ENV.TELEGRAPH_NUM_LIMIT > 0