Skip to content

Shubhra/ajs 36 refactor llm with streams #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: dev-1.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions agents/src/llm/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import { EventEmitter } from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import type { LLMMetrics } from '../metrics/base.js';
import { AsyncIterableQueue } from '../utils.js';
import { IdentityTransform } from '../stream/identity_transform.js';
import type { ChatContext, ChatRole } from './chat_context.js';
import type { FunctionCallInfo, FunctionContext } from './function_context.js';

Expand Down Expand Up @@ -59,20 +60,29 @@ export abstract class LLM extends (EventEmitter as new () => TypedEmitter<LLMCal
}

export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
protected output = new AsyncIterableQueue<ChatChunk>();
protected queue = new AsyncIterableQueue<ChatChunk>();
protected outputWriter: WritableStreamDefaultWriter<ChatChunk>;
protected closed = false;
protected _functionCalls: FunctionCallInfo[] = [];
abstract label: string;

#llm: LLM;
#chatCtx: ChatContext;
#fncCtx?: FunctionContext;
private output: IdentityTransform<ChatChunk>;
private outputReader: ReadableStreamDefaultReader<ChatChunk>;
private metricsStream: ReadableStream<ChatChunk>;

constructor(llm: LLM, chatCtx: ChatContext, fncCtx?: FunctionContext) {
this.#llm = llm;
this.#chatCtx = chatCtx;
this.#fncCtx = fncCtx;

this.output = new IdentityTransform();
this.outputWriter = this.output.writable.getWriter();
const [outputStream, metricsStream] = this.output.readable.tee();
this.outputReader = outputStream.getReader();
this.metricsStream = metricsStream;

this.monitorMetrics();
}

Expand All @@ -82,8 +92,11 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
let requestId = '';
let usage: CompletionUsage | undefined;

for await (const ev of this.queue) {
this.output.put(ev);
const metricsReader = this.metricsStream.getReader();
while (true) {
const { done, value: ev } = await metricsReader.read();
if (done) break;

requestId = ev.requestId;
if (!ttft) {
ttft = process.hrtime.bigint() - startTime;
Expand All @@ -92,7 +105,7 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
usage = ev.usage;
}
}
this.output.close();
metricsReader.releaseLock();

const duration = process.hrtime.bigint() - startTime;
const metrics: LLMMetrics = {
Expand Down Expand Up @@ -139,12 +152,18 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
}

next(): Promise<IteratorResult<ChatChunk>> {
return this.output.next();
return this.outputReader.read().then(({ done, value }) => {
if (done) {
return { done: true, value: undefined };
}
return { done: false, value };
});
}

close() {
this.output.close();
this.queue.close();
if (!this.closed) {
this.outputWriter.close();
}
this.closed = true;
}

Expand Down
6 changes: 3 additions & 3 deletions plugins/openai/src/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,12 +469,12 @@ export class LLMStream extends llm.LLMStream {
for (const choice of chunk.choices) {
const chatChunk = this.#parseChoice(chunk.id, choice);
if (chatChunk) {
this.queue.put(chatChunk);
this.outputWriter.write(chatChunk);
}

if (chunk.usage) {
const usage = chunk.usage;
this.queue.put({
this.outputWriter.write({
requestId: chunk.id,
choices: [],
usage: {
Expand All @@ -487,7 +487,7 @@ export class LLMStream extends llm.LLMStream {
}
}
} finally {
this.queue.close();
this.close();
}
}

Expand Down