Skip to content

Commit

Permalink
fix: fix multiple return results
Browse files Browse the repository at this point in the history
  • Loading branch information
adolphnov committed Nov 12, 2024
1 parent c9726d1 commit c01651e
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 57 deletions.
2 changes: 1 addition & 1 deletion dist/buildinfo.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 22 additions & 26 deletions dist/index.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dist/timestamp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/agent/model_middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export function AIMiddleware({ config, _models, activeTools, onStream, toolChoic
params.mode.toolChoice = toolChoice[step] as any;
log.info(`toolChoice changed: ${JSON.stringify(toolChoice[step])}`);
}
log.info(`warp params result: ${JSON.stringify(params)}`);
return params;
},

Expand All @@ -76,7 +77,7 @@ export function AIMiddleware({ config, _models, activeTools, onStream, toolChoic
log.info('llm request end');
log.info(finishReason);
log.info('step text:', text);
log.debug('step raw request:', request);
// log.debug('step raw request:', request);
log.debug('step raw response:', response);

const time = ((Date.now() - startTime!) / 1e3).toFixed(1);
Expand Down
7 changes: 4 additions & 3 deletions src/agent/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export async function streamHandler(stream: AsyncIterable<any>, contentExtractor
let lastChunk = '';

const immediatePromise = Promise.resolve('[PROMISE DONE]');
let sendPromise: Promise<any> | null = null;

try {
for await (const part of stream) {
Expand All @@ -157,13 +158,13 @@ export async function streamHandler(stream: AsyncIterable<any>, contentExtractor

if (lastChunk && lengthDelta > updateStep) {
// 已发送过消息且消息未发送完成
if (onStream.sentPromise && (await Promise.race([onStream.sentPromise, immediatePromise]) === '[PROMISE DONE]')) {
if (sendPromise && (await Promise.race([sendPromise, immediatePromise]) === '[PROMISE DONE]')) {
continue;
}

lengthDelta = 0;
updateStep += 20;
onStream.sentPromise = onStream.send(`${contentFull}●`);
sendPromise = onStream.send(`${contentFull}●`);
}
}
contentFull += lastChunk;
Expand All @@ -175,7 +176,7 @@ export async function streamHandler(stream: AsyncIterable<any>, contentExtractor
contentFull += `\nERROR: ${(e as Error).message}`;
}

await onStream.sentPromise;
await sendPromise;
return contentFull;
}
export async function requestChatCompletionsV2(params: { model: LanguageModelV1; toolModel?: LanguageModelV1; prompt?: string; messages: CoreMessage[]; tools?: any; activeTools?: string[]; toolChoice?: CoreToolChoice<any>[]; context: AgentUserConfig }, onStream: ChatStreamTextHandler | null, onResult: OnResult | null = null): Promise<{ messages: ResponseMessage[]; content: string }> {
Expand Down
5 changes: 1 addition & 4 deletions src/agent/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ export type MessageTool = MessageBase & {
};

export interface ChatStreamTextHandler {
send: (text: string, isEnd?: boolean, sendType?: 'chat' | 'telegraph') => Promise<any>;
send: (text: string, sendType?: 'chat' | 'telegraph') => Promise<any>;
end?: (text: string) => Promise<any>;
nextEnableTime?: number;
sentMessageIds?: number[];
sentPromise?: Promise<any> | null;
}

export type ImageAgentRequest = (prompt: string, context: AgentUserConfig) => Promise<ImageResult>;
Expand Down
1 change: 0 additions & 1 deletion src/telegram/command/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ export class PerplexityCommandHandler implements CommandHandler {
const startTime = Date.now();
const result = await WssRequest(perplexityWsUrl, null, perplexityWsOptions, perplexityMessage, { onStream }).catch(console.error);
logs.chat.time.push(((Date.now() - startTime) / 1e3).toFixed(1));
await waitUntil(onStream.nextEnableTime || 0);
await onStream.end?.(result);
return new Response('success');
};
Expand Down
37 changes: 17 additions & 20 deletions src/telegram/handler/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,15 @@ import { createTelegramBotAPI } from '../api';
import { MessageSender, sendAction, TelegraphSender } from '../utils/send';
import { isTelegramChatTypeGroup, type UnionData, waitUntil } from '../utils/utils';

async function messageInitialize(sender: MessageSender): Promise<void> {
async function messageInitialize(sender: MessageSender, streamSender: ChatStreamTextHandler): Promise<void> {
if (!sender.context.message_id) {
try {
setTimeout(() => sendAction(sender.api.token, sender.context.chat_id, 'typing'), 0);
if (!ENV.SEND_INIT_MESSAGE) {
return;
}
log.info(`send init message`);
const response = await sender.sendPlainText('...', 'chat');
const msg = await response.json() as Telegram.ResponseWithMessage;
log.info(`send init message done`);
sender.update({
message_id: msg.result.message_id,
});
streamSender.send('...', 'chat');
} catch (e) {
console.error('Failed to initialize message:', e);
}
Expand All @@ -43,7 +38,7 @@ export async function chatWithLLM(
): Promise<UnionData | Response> {
const sender = MessageSender.from(context.SHARE_CONTEXT.botToken, message);
const streamSender = OnStreamHander(sender, context, message.text || '');
streamSender.sentPromise = messageInitialize(sender);
messageInitialize(sender, streamSender);

const agent = loadChatLLM(context.USER_CONFIG);
if (!agent) {
Expand Down Expand Up @@ -155,10 +150,11 @@ export class ChatHandler implements MessageHandler<WorkerContext> {
}

export function OnStreamHander(sender: MessageSender | ChosenInlineSender, context?: WorkerContext, question?: string): ChatStreamTextHandler {
let sentPromise = null as Promise<Response> | null;
let nextEnableTime = Date.now();
const sentMessageIds = sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : [];

const streamSender = {
nextEnableTime: Date.now(),
sentMessageIds: sender instanceof MessageSender && sender.context.message_id ? [sender.context.message_id] : [],
sentPromise: null as Promise<Response> | null,
send: null as ((text: string, isEnd: boolean, sendType?: 'chat' | 'telegraph') => Promise<any>) | null,
end: null as ((text: string) => Promise<any>) | null,
};
Expand All @@ -172,25 +168,26 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte
return;
}
// 判断是否需要等待
if ((streamSender.nextEnableTime || 0) > Date.now()) {
log.info(`Need await: ${(streamSender.nextEnableTime || 0) - Date.now()}ms`);
if ((nextEnableTime || 0) > Date.now()) {
log.info(`Need await: ${(nextEnableTime || 0) - Date.now()}ms`);
return;
}

// 设置最小流间隔
if (ENV.TELEGRAM_MIN_STREAM_INTERVAL > 0) {
streamSender.nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL;
nextEnableTime = Date.now() + ENV.TELEGRAM_MIN_STREAM_INTERVAL;
}

const data = context ? `${getLog(context.USER_CONFIG)}\n${text}` : text;
const resp = await sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE as Telegram.ParseMode, 'chat');
sentPromise = sender.sendRichText(data, ENV.DEFAULT_PARSE_MODE as Telegram.ParseMode, 'chat');
const resp = await sentPromise;
// 判断429
if (resp.status === 429) {
// 获取重试时间
const retryAfter = Number.parseInt(resp.headers.get('Retry-After') || '');
if (retryAfter) {
streamSender.nextEnableTime = Date.now() + retryAfter * 1000;
log.info(`Status 429, need wait: ${streamSender.nextEnableTime - Date.now()}ms`);
nextEnableTime = Date.now() + retryAfter * 1000;
log.info(`Status 429, need wait: ${nextEnableTime - Date.now()}ms`);
return;
}
}
Expand All @@ -200,7 +197,7 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte
sender.update({
message_id: respJson.result.message_id,
});
streamSender.sentMessageIds.push(respJson.result.message_id);
sentMessageIds.push(respJson.result.message_id);
} else if (!resp.ok) {
log.error(`send message failed: ${resp.status} ${resp.statusText}`);
return sender.sendPlainText(text);
Expand All @@ -211,8 +208,8 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte
};

streamSender.end = async (text: string): Promise<any> => {
await streamSender.sentPromise;
await waitUntil((streamSender.nextEnableTime || 0) + 10);
await sentPromise;
await waitUntil((nextEnableTime || 0) + 10);
if (sender instanceof MessageSender
&& isTelegramChatTypeGroup(sender.context.chatType)
&& ENV.TELEGRAPH_NUM_LIMIT > 0
Expand Down

0 comments on commit c01651e

Please sign in to comment.