Skip to content

Commit

Permalink
refactor: optimize text chunking logic
Browse files Browse the repository at this point in the history
chore: optimize message sending logic
  • Loading branch information
adolphnov committed Dec 28, 2024
1 parent e6798c8 commit c5d5ba1
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 49 deletions.
11 changes: 5 additions & 6 deletions src/telegram/handler/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte

const data = context ? mergeLogMessages(text, context.USER_CONFIG) : text;
expandParams.addQuote = addQuotePrerequisites && data.length > ENV.ADD_QUOTE_LIMIT;
log.info(`sent message ids: ${isMessageSender ? [...sender.context.sentMessageIds] : sender.context.inline_message_id}`);
log.info(`sent message ids: ${isMessageSender ? sender.context.sentMessageIds : sender.context.inline_message_id}`);
isMessageSender && sendAction(sender.api.token, sender.context.chat_id, 'typing');
sentPromise = sender.sendRichText(data, undefined, 'chat', expandParams);
const resp = await sentPromise as Response;
Expand All @@ -304,12 +304,11 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte
streamSender.end = async (text: string, needLog = true): Promise<any> => {
log.info('--- start end ---');
await sentPromise;
// await waitUntil((nextEnableTime || 0) + 10);
if (isSendTelegraph(text)) {
return sendTelegraph(telegraphContext(true, false), question || 'Redo Question', text);
}
const data = context && needLog ? mergeLogMessages(text, context.USER_CONFIG) : text;
log.info(`sent message ids: ${isMessageSender ? [...sender.context.sentMessageIds] : sender.context.inline_message_id}`);
log.info(`sent message ids: ${isMessageSender ? sender.context.sentMessageIds : sender.context.inline_message_id}`);
expandParams.addQuote = addQuotePrerequisites && data.length > ENV.ADD_QUOTE_LIMIT;
while (true) {
const finalResp = await sender.sendRichText(data, undefined, 'chat', expandParams);
Expand All @@ -325,7 +324,7 @@ export function OnStreamHander(sender: MessageSender | ChosenInlineSender, conte
}
}
if (!finalResp.ok) {
(sender as MessageSender).context.sentMessageIds.clear();
(sender as MessageSender).context.sentMessageIds.length = 0;
log.error(`send message failed: ${finalResp.status} ${await finalResp.json().then(j => j.description)}`);
await sendTelegraph(telegraphContext(true, true), question || 'Redo Question', text);
return;
Expand Down Expand Up @@ -484,7 +483,7 @@ async function handleAudio(
return new Response('audio handle done');
}
clearLog(context.USER_CONFIG);
!ENV.HIDE_MIDDLE_MESSAGE && sender.context.sentMessageIds.clear();
!ENV.HIDE_MIDDLE_MESSAGE && (sender.context.sentMessageIds.length = 0);
const isMiddle = handleKey === 'audio:audio';
const otherText = (params.content as TextPart[]).filter(c => c.type === 'text').map(c => c.text).join('\n').trim();
const resp = await chatWithLLM(message, { role: 'user', content: `[AUDIO TRANSCRIPTION]: ${text}\n${otherText}` }, context, null, streamSender, isMiddle);
Expand Down Expand Up @@ -580,5 +579,5 @@ function mergeLogMessages(text: string, config: AgentUserConfig) {
if (ENV.LOG_POSITION_ON_TOP) {
return `${getLog(config)}\n\n${text.trim()}`.trim();
}
return `${text.trim()}\n${getLog(config)}`;
return `${text.trim()}\n\n${getLog(config)}`;
}
84 changes: 50 additions & 34 deletions src/telegram/utils/md2tgmd.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { log } from '../../log/logger';

/* eslint-disable regexp/no-super-linear-backtracking */
const escapeChars = /[_*[\]()\\~`>#+\-=|{}.!]/g;
const escapedChars = {
Expand Down Expand Up @@ -152,45 +150,48 @@ function handleEscape(text: string, type: 'text' | 'code', { addQuote }: ExpandP
} else {
text = text.replace(escapeChars, match => `\\${match}`);
}
addQuote && (text = quoteMessage(text));
text = quoteMessage(text, addQuote);
return text.replace(
new RegExp(Object.values(escapedChars).join('|'), 'g'),
match => escapedCharsReverseMap.get(match) ?? match,
);
}
export function chunkDocument(text: string, chunkSize: number = 4096): string[] {
export function chunkDocument(text: string, chunkSize: number = 4000): string[] {
const cleanText = text.replace(/\n\s+\n/g, '\n\n');
const textList = cleanText.split('\n');
const textList = lineSegment(cleanText);
const chunks: string[][] = [[]];
let chunkIndex = 0;
const codeStack: string[] = [];
for (const line of textList) {
if (chunks[chunkIndex].join('\n').length + line.length >= chunkSize) {
if (chunks[chunkIndex].join('\n').length + line.length > chunkSize) {
chunkIndex++;
chunks.push([]);
if (codeStack.length) {
if (codeStack.length > 0) {
// 如果插入结尾标记后超出长度限制
if (chunks[chunkIndex - 1].join('\n').length + codeStack.length * 4 >= chunkSize) {
chunks[chunkIndex - 1].push(...chunks[chunkIndex - 1].slice(-codeStack.length));
// 将上一个块中的末尾数据插入到新块开头
chunks[chunkIndex].push(...chunks[chunkIndex - 1].slice(-codeStack.length));
// 将上一个块中的末尾行取出
chunks[chunkIndex - 1].length -= codeStack.length;
}
// 插入结尾标记
chunks[chunkIndex - 1].push(...Array.from({ length: codeStack.length }).fill('```') as string[]);
chunks[chunkIndex].push(...codeStack);
}
if (line.length > chunkSize) {
const lineSplit = chunkText(chunks[chunkIndex].join('\n') + line, chunkSize);
if (lineSplit.length > 1) {
chunks.length -= 1;
chunks.push(...lineSplit.map(item => item.split('\n')));
chunkIndex = chunks.length - 1;
} else {
chunks[chunkIndex].push(line);
}
} else {
chunks[chunkIndex].push(line);
// 插入开头标记
chunks[chunkIndex].unshift(...codeStack);
// 存在冗余, 不考虑以下情况: 新块代码行加line超出限制
// if (chunks[chunkIndex].join('\n').length + line.length > chunkSize) {
// // 插入结尾标记
// chunks[chunkIndex].push(...Array.from({ length: codeStack.length }).fill('```') as string[]);
// // 插入开头标记
// chunkIndex++;
// chunks[chunkIndex] = codeStack;
// }
}

chunks[chunkIndex].push(line);
continue;
}
if (/^```.+/.test(line.trim())) {
if (/^```.+/.test(line.trimStart())) {
codeStack.push(line);
} else if (line.trim() === '```') {
if (codeStack.length) {
Expand All @@ -208,13 +209,27 @@ export function chunkDocument(text: string, chunkSize: number = 4096): string[]
return chunks.map(c => c.join('\n'));
}

function chunkText(text: string, chunkSize: number): string[] {
const chunks: string[] = [];
// remove extra whitespace
for (let i = 0; i < text.length; i += chunkSize) {
chunks.push(text.slice(i, i + chunkSize));
function lineSegment(text: string, chunkSize: number = 4000): string[] {
const chunkText = (text: string) => {
const chunks: string[] = [];
for (let i = 0; i < text.length; i += chunkSize) {
// add quote
const isNeedAddQuote = i > 0 && text.trimStart().startsWith('>');
chunks.push((isNeedAddQuote ? '>' : '') + text.slice(i, i + chunkSize));
}
return chunks;
};

const newLines: string[] = [];
const lines = text.split('\n');
for (const line of lines) {
if (line.length > chunkSize) {
newLines.push(...chunkText(line));
continue;
}
newLines.push(line);
}
return chunks;
return newLines;
}

function markData(text: string, markd: Record<string, string>, type: 'INCODE' | 'LINK' = 'INCODE') {
Expand All @@ -237,7 +252,8 @@ export function addExpandable(text: string, quoteExpandable: boolean): string {
// replace log data to expandable
// can't replace log data directly, because there may be other quote marks after the log data, tg doesn't allow expandable quote to be continuous quote
text = text.replace(/^>?LOGSTART\\>([\s\S]*?)LOGEND((?:\n>[^\n]*)*)$/m, `**>$1$2||`);
log.debug(`addExpandable:\n${text}`);
// maybe split by log start and log end
text = text.replace(/^(>?)LOGSTART/m, '$1').replace(/LOGEND$/m, '');
return text;
}
// replace log data to expandable
Expand All @@ -246,8 +262,8 @@ export function addExpandable(text: string, quoteExpandable: boolean): string {
// fold quote
// .replace(/((?:^>[^\n]+(?:\n|$))+)/gm, (match, p1) => `**${p1.trimEnd()}||\n`)
.replace(/(?:^>[^\n]*(\n|$))+/gm, (match, p1) => `**${match.trimEnd()}||${p1}`);
// reverse escape chars
log.debug(`addExpandable:\n${text}`);
// maybe split by log start and log end
text = text.replace(/^(>?)LOGSTART/m, '$1').replace(/LOGEND$/m, '');
return text;
}

Expand All @@ -256,11 +272,11 @@ export interface ExpandParams {
quoteExpandable: boolean;
}

function quoteMessage(text: string) {
const textList = text.split('\n');
if (textList.length <= 1) {
function quoteMessage(text: string, addQuote: boolean) {
if (!addQuote) {
return text;
}
const textList = text.split('\n');
textList.forEach((line, index) => {
if (!line.trimStart().startsWith('>')) {
textList[index] = `>${line}`;
Expand Down
28 changes: 19 additions & 9 deletions src/telegram/utils/send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class MessageContext implements Record<string, any> {
message_thread_id: number | null = null;
chatType: string; // 聊天类型
message: Telegram.Message; // 原始消息 用于标记需要删除的id
sentMessageIds: Set<number> = new Set();
sentMessageIds: number[] = [];

constructor(message: Telegram.Message) {
this.chat_id = message.chat.id;
Expand Down Expand Up @@ -125,26 +125,36 @@ export class MessageSender {
let lastMessageResponse = null;
let lastMessageRespJson = null;
for (let i = 0; i < messages.length; i++) {
// 不发送中间片段
if (i > 0 && i < context.sentMessageIds.size - 1) {
continue;
}
// 日志位置不在顶部时,不发送第一个消息
if (context.sentMessageIds.size > 1 && i === 0 && !ENV.LOG_POSITION_ON_TOP) {
if (ENV.LOG_POSITION_ON_TOP) {
// 不发送中间片段
if (i > 0 && i < context.sentMessageIds.length - 1) {
continue;
}
} else if (context.sentMessageIds.length > 2 && i < context.sentMessageIds.length - 2) {
// 只发送最后两个: 由于日志原因可能被分割为两块
continue;
}

// 不发送空消息
if (messages[i].trim() === '') {
continue;
}

chatContext.message_id = [...context.sentMessageIds][i] ?? null;
chatContext.message_id = context.sentMessageIds[i] ?? null;
log.info(`message id: ${chatContext.message_id}`);
// log.debug(`chunk:\n${messages[i]}`);
lastMessageResponse = await this.sendMessage(messages[i], chatContext);
if (lastMessageResponse.status === 400) {
const message = (await lastMessageResponse.clone().json() as Telegram.ResponseError).description;
if (message.includes('not modified')) {
continue;
}
}
if (lastMessageResponse.status !== 200) {
break;
}
lastMessageRespJson = await lastMessageResponse.clone().json() as Telegram.ResponseWithMessage;
this.context.sentMessageIds.add(lastMessageRespJson.result.message_id);
this.context.sentMessageIds[i] = lastMessageRespJson.result.message_id;
// 用于后续发送媒体编辑
this.context.message_id = lastMessageRespJson.result.message_id;
}
Expand Down

0 comments on commit c5d5ba1

Please sign in to comment.