Skip to content

Commit

Permalink
feat: streaming improvements (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-bot authored Oct 30, 2023
1 parent 510c1f3 commit 37b622c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 37 deletions.
11 changes: 0 additions & 11 deletions src/lib/ChatCompletionRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
type Completions,
type ChatCompletionMessage,
type ChatCompletionMessageParam,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsNonStreaming,
} from 'openai/resources/chat/completions';
import { type RunnableFunctions, type BaseFunctionsArgs } from './RunnableFunction';
Expand Down Expand Up @@ -34,16 +33,6 @@ export class ChatCompletionRunner extends AbstractChatCompletionRunner<ChatCompl
return runner;
}

static createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
options?: Core.RequestOptions,
): ChatCompletionRunner {
const runner = new ChatCompletionRunner();
runner._run(() => runner._runChatCompletion(completions, params, options));
return runner;
}

override _addMessage(message: ChatCompletionMessage | ChatCompletionMessageParam) {
super._addMessage(message);
if (message.role === 'assistant' && message.content) {
Expand Down
24 changes: 20 additions & 4 deletions src/lib/ChatCompletionStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
type ChatCompletion,
type ChatCompletionChunk,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsStreaming,
ChatCompletionCreateParamsBase,
} from 'openai/resources/chat/completions';
import {
AbstractChatCompletionRunner,
Expand All @@ -19,7 +19,9 @@ export interface ChatCompletionStreamEvents extends AbstractChatCompletionRunner
chunk: (chunk: ChatCompletionChunk, snapshot: ChatCompletionSnapshot) => void;
}

export type ChatCompletionStreamParams = ChatCompletionCreateParamsStreaming;
export type ChatCompletionStreamParams = Omit<ChatCompletionCreateParamsBase, 'stream'> & {
stream?: true;
};

export class ChatCompletionStream
extends AbstractChatCompletionRunner<ChatCompletionStreamEvents>
Expand All @@ -31,6 +33,13 @@ export class ChatCompletionStream
return this.#currentChatCompletionSnapshot;
}

/**
* Intended for use on the frontend, consuming a stream produced with
* `.toReadableStream()` on the backend.
*
* Note that messages sent to the model do not appear in `.on('message')`
* in this context.
*/
static fromReadableStream(stream: ReadableStream): ChatCompletionStream {
const runner = new ChatCompletionStream();
runner._run(() => runner._fromReadableStream(stream));
Expand All @@ -39,11 +48,11 @@ export class ChatCompletionStream

static createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
params: ChatCompletionStreamParams,
options?: Core.RequestOptions,
): ChatCompletionStream {
const runner = new ChatCompletionStream();
runner._run(() => runner._runChatCompletion(completions, params, options));
runner._run(() => runner._runChatCompletion(completions, { ...params, stream: true }, options));
return runner;
}

Expand Down Expand Up @@ -110,8 +119,15 @@ export class ChatCompletionStream
this.#beginRequest();
this._connected();
const stream = Stream.fromReadableStream<ChatCompletionChunk>(readableStream, this.controller);
let chatId;
for await (const chunk of stream) {
if (chatId && chatId !== chunk.id) {
// A new request has been made.
this._addChatCompletion(this.#endRequest());
}

this.#addChunk(chunk);
chatId = chunk.id;
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
Expand Down
11 changes: 0 additions & 11 deletions src/lib/ChatCompletionStreamingRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import * as Core from 'openai/core';
import {
Completions,
type ChatCompletionChunk,
type ChatCompletionCreateParams,
type ChatCompletionCreateParamsStreaming,
} from 'openai/resources/chat/completions';
import { type AbstractChatCompletionRunnerEvents } from './AbstractChatCompletionRunner';
Expand Down Expand Up @@ -41,14 +40,4 @@ export class ChatCompletionStreamingRunner
runner._run(() => runner._runFunctions(completions, params, options));
return runner;
}

static override createChatCompletion(
completions: Completions,
params: ChatCompletionCreateParams,
options?: Core.RequestOptions,
): ChatCompletionStreamingRunner {
const runner = new ChatCompletionStreamingRunner();
runner._run(() => runner._runChatCompletion(completions, params, options));
return runner;
}
}
15 changes: 4 additions & 11 deletions src/resources/beta/chat/completions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export {
RunnableFunctionWithoutParse,
ParsingFunction,
} from 'openai/lib/RunnableFunction';
import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream';
import { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions';
import { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream';
export { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream';

export class Completions extends APIResource {
/**
Expand Down Expand Up @@ -64,14 +64,7 @@ export class Completions extends APIResource {
/**
* Creates a chat completion stream
*/
stream(
body: Omit<ChatCompletionCreateParamsStreaming, 'stream'> & { stream?: true },
options?: Core.RequestOptions,
): ChatCompletionStream {
return ChatCompletionStream.createChatCompletion(
this.client.chat.completions,
{ ...body, stream: true },
options,
);
stream(body: ChatCompletionStreamParams, options?: Core.RequestOptions): ChatCompletionStream {
return ChatCompletionStream.createChatCompletion(this.client.chat.completions, body, options);
}
}

0 comments on commit 37b622c

Please sign in to comment.