Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming improvements #411

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/lib/ChatCompletionRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
type Completions,
type ChatCompletionMessage,
type ChatCompletionMessageParam,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsNonStreaming,
} from 'openai/resources/chat/completions';
import { type RunnableFunctions, type BaseFunctionsArgs } from './RunnableFunction';
Expand Down Expand Up @@ -34,16 +33,6 @@ export class ChatCompletionRunner extends AbstractChatCompletionRunner<ChatCompl
return runner;
}

static createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
options?: Core.RequestOptions,
): ChatCompletionRunner {
const runner = new ChatCompletionRunner();
runner._run(() => runner._runChatCompletion(completions, params, options));
return runner;
}

override _addMessage(message: ChatCompletionMessage | ChatCompletionMessageParam) {
super._addMessage(message);
if (message.role === 'assistant' && message.content) {
Expand Down
24 changes: 20 additions & 4 deletions src/lib/ChatCompletionStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
type ChatCompletion,
type ChatCompletionChunk,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsStreaming,
ChatCompletionCreateParamsBase,
} from 'openai/resources/chat/completions';
import {
AbstractChatCompletionRunner,
Expand All @@ -19,7 +19,9 @@ export interface ChatCompletionStreamEvents extends AbstractChatCompletionRunner
chunk: (chunk: ChatCompletionChunk, snapshot: ChatCompletionSnapshot) => void;
}

export type ChatCompletionStreamParams = ChatCompletionCreateParamsStreaming;
export type ChatCompletionStreamParams = Omit<ChatCompletionCreateParamsBase, 'stream'> & {
stream?: true;
};

export class ChatCompletionStream
extends AbstractChatCompletionRunner<ChatCompletionStreamEvents>
Expand All @@ -31,6 +33,13 @@ export class ChatCompletionStream
return this.#currentChatCompletionSnapshot;
}

/**
* Intended for use on the frontend, consuming a stream produced with
* `.toReadableStream()` on the backend.
*
* Note that messages sent to the model do not appear in `.on('message')`
* in this context.
*/
static fromReadableStream(stream: ReadableStream): ChatCompletionStream {
const runner = new ChatCompletionStream();
runner._run(() => runner._fromReadableStream(stream));
Expand All @@ -39,11 +48,11 @@ export class ChatCompletionStream

static createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
params: ChatCompletionStreamParams,
options?: Core.RequestOptions,
): ChatCompletionStream {
const runner = new ChatCompletionStream();
runner._run(() => runner._runChatCompletion(completions, params, options));
runner._run(() => runner._runChatCompletion(completions, { ...params, stream: true }, options));
return runner;
}

Expand Down Expand Up @@ -110,8 +119,15 @@ export class ChatCompletionStream
this.#beginRequest();
this._connected();
const stream = Stream.fromReadableStream<ChatCompletionChunk>(readableStream, this.controller);
let chatId;
for await (const chunk of stream) {
if (chatId && chatId !== chunk.id) {
// A new request has been made.
this._addChatCompletion(this.#endRequest());
}

this.#addChunk(chunk);
chatId = chunk.id;
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
Expand Down
11 changes: 0 additions & 11 deletions src/lib/ChatCompletionStreamingRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import * as Core from 'openai/core';
import {
Completions,
type ChatCompletionChunk,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsStreaming,
} from 'openai/resources/chat/completions';
import { type AbstractChatCompletionRunnerEvents } from './AbstractChatCompletionRunner';
Expand Down Expand Up @@ -41,14 +40,4 @@ export class ChatCompletionStreamingRunner
runner._run(() => runner._runFunctions(completions, params, options));
return runner;
}

static override createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
options?: Core.RequestOptions,
): ChatCompletionStreamingRunner {
const runner = new ChatCompletionStreamingRunner();
runner._run(() => runner._runChatCompletion(completions, params, options));
return runner;
}
}
15 changes: 4 additions & 11 deletions src/resources/beta/chat/completions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export {
RunnableFunctionWithoutParse,
ParsingFunction,
} from 'openai/lib/RunnableFunction';
import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream';
import { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions';
import { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream';
export { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream';

export class Completions extends APIResource {
/**
Expand Down Expand Up @@ -64,14 +64,7 @@ export class Completions extends APIResource {
/**
* Creates a chat completion stream
*/
stream(
body: Omit<ChatCompletionCreateParamsStreaming, 'stream'> & { stream?: true },
options?: Core.RequestOptions,
): ChatCompletionStream {
return ChatCompletionStream.createChatCompletion(
this.client.chat.completions,
{ ...body, stream: true },
options,
);
stream(body: ChatCompletionStreamParams, options?: Core.RequestOptions): ChatCompletionStream {
return ChatCompletionStream.createChatCompletion(this.client.chat.completions, body, options);
}
}