From fba4e936ee5095836995577902ca9f0828060007 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 19 Jul 2024 14:19:28 -0700 Subject: [PATCH 1/3] Fix double streaming issue when streamEvents is called directly on chat models/LLMs --- .../tests/runnable_stream_events_v2.test.ts | 54 +++++++++++++++++++ langchain-core/src/tracers/event_stream.ts | 4 ++ 2 files changed, 58 insertions(+) diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index 88826e8649df..4eb9032d9cb3 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -80,6 +80,60 @@ test("Runnable streamEvents method", async () => { ]); }); +test.only("Runnable streamEvents method on a chat model", async () => { + const model = new FakeListChatModel({ + responses: ["abc"], + }); + + const events = []; + const eventStream = await model.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toMatchObject([ + { + data: { input: "hello" }, + event: "on_chat_model_start", + name: "FakeListChatModel", + metadata: expect.any(Object), + run_id: expect.any(String), + tags: [], + }, + { + data: { chunk: new AIMessageChunk({ content: "a" })}, + event: "on_chat_model_stream", + name: "FakeListChatModel", + metadata: expect.any(Object), + run_id: expect.any(String), + tags: [], + }, + { + data: { chunk: new AIMessageChunk({ content: "b" })}, + event: "on_chat_model_stream", + name: "FakeListChatModel", + metadata: expect.any(Object), + run_id: expect.any(String), + tags: [], + }, + { + data: { chunk: new AIMessageChunk({ content: "c" })}, + event: "on_chat_model_stream", + name: "FakeListChatModel", + metadata: expect.any(Object), + run_id: expect.any(String), + tags: [], + }, + { + data: { output: new AIMessageChunk({ content: "abc" }) }, + event: "on_chat_model_end", + name: "FakeListChatModel", + metadata: expect.any(Object), + run_id: expect.any(String), + tags: [] + }, + ]); +}); + test("Runnable streamEvents method with three runnables", async () => { const r = RunnableLambda.from(reverse); diff --git a/langchain-core/src/tracers/event_stream.ts b/langchain-core/src/tracers/event_stream.ts index aa5393fa8f9b..9ad26247c527 100644 --- a/langchain-core/src/tracers/event_stream.ts +++ b/langchain-core/src/tracers/event_stream.ts @@ -354,6 +354,10 @@ export class EventStreamCallbackHandler extends BaseTracer { if (runInfo === undefined) { throw new Error(`onLLMNewToken: Run ID ${run.id} not found in run map.`); } + // Covered by streamed chunks in tapOutputIterable + if (run.parent_run_id === undefined) { + return; + } if (runInfo.runType === "chat_model") { eventName = "on_chat_model_stream"; if (kwargs?.chunk === undefined) { From 8d23eeeb554a9d82dfc30eaeaf3d837a5565dd9e Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 19 Jul 2024 14:47:11 -0700 Subject: [PATCH 2/3] Fix lint, add docs --- docs/core_docs/docs/how_to/streaming.ipynb | 2 + langchain-core/src/runnables/base.ts | 52 +++++++++++++++++-- .../tests/runnable_stream_events_v2.test.ts | 10 ++-- langchain-core/src/tracers/event_stream.ts | 2 +- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/docs/core_docs/docs/how_to/streaming.ipynb b/docs/core_docs/docs/how_to/streaming.ipynb index b71664a842c4..fb1d28625800 100644 --- a/docs/core_docs/docs/how_to/streaming.ipynb +++ b/docs/core_docs/docs/how_to/streaming.ipynb @@ -663,6 +663,8 @@ "| on_prompt_start | [template_name] | | {\"question\": \"hello\"} | |\n", "| on_prompt_end | [template_name] | | {\"question\": \"hello\"} | ChatPromptValue(messages: [SystemMessage, ...]) |\n", "\n", + "`streamEvents` will also emit dispatched custom events in `v2`. Please see [this guide](/docs/how_to/callbacks_custom_events/) for more.\n", + "\n", "### Chat Model\n", "\n", "Let's start off by looking at the events produced by a chat model." diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index ea0f00bd402b..e66216584edc 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -762,11 +762,11 @@ export abstract class Runnable< * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ * | on_llm_end | [model name] | | 'Hello human!' | | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ - * | on_chain_start | format_docs | | | | + * | on_chain_start | some_runnable | | | | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ - * | on_chain_stream | format_docs | "hello world!, goodbye world!" | | | + * | on_chain_stream | some_runnable | "hello world!, goodbye world!" | | | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ - * | on_chain_end | format_docs | | [Document(...)] | "hello world!, goodbye world!" | + * | on_chain_end | some_runnable | | [Document(...)] | "hello world!, goodbye world!" | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ * | on_tool_start | some_tool | | {"x": 1, "y": "2"} | | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ @@ -780,6 +780,52 @@ export abstract class Runnable< * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ * | on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * + * The "on_chain_*" events are the default for Runnables that don't fit one of the above categories. + * + * In addition to the standard events above, users can also dispatch custom events. + * + * Custom events will be only be surfaced with in the `v2` version of the API! + * + * A custom event has following format: + * + * +-----------+------+-----------------------------------------------------------------------------------------------------------+ + * | Attribute | Type | Description | + * +===========+======+===========================================================================================================+ + * | name | str | A user defined name for the event. | + * +-----------+------+-----------------------------------------------------------------------------------------------------------+ + * | data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. | + * +-----------+------+-----------------------------------------------------------------------------------------------------------+ + * + * Here's an example: + * @example + * ```ts + * import { RunnableLambda } from "@langchain/core/runnables"; + * import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch"; + * // Use this import for web environments that don't support "async_hooks" + * // and manually pass config to child runs. + * // import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch/web"; + * + * const slowThing = RunnableLambda.from(async (someInput: string) => { + * // Placeholder for some slow operation + * await new Promise((resolve) => setTimeout(resolve, 100)); + * await dispatchCustomEvent("progress_event", { + * message: "Finished step 1 of 2", + * }); + * await new Promise((resolve) => setTimeout(resolve, 100)); + * return "Done"; + * }); + * + * const eventStream = await slowThing.streamEvents("hello world", { + * version: "v2", + * }); + * + * for await (const event of eventStream) { + * if (event.event === "on_custom_event") { + * console.log(event); + * } + * } + * ``` */ streamEvents( input: RunInput, diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index 4eb9032d9cb3..e9e7c6762a3d 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -80,7 +80,7 @@ test("Runnable streamEvents method", async () => { ]); }); -test.only("Runnable streamEvents method on a chat model", async () => { +test("Runnable streamEvents method on a chat model", async () => { const model = new FakeListChatModel({ responses: ["abc"], }); @@ -100,7 +100,7 @@ test.only("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "a" })}, + data: { chunk: new AIMessageChunk({ content: "a" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -108,7 +108,7 @@ test.only("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "b" })}, + data: { chunk: new AIMessageChunk({ content: "b" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -116,7 +116,7 @@ test.only("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "c" })}, + data: { chunk: new AIMessageChunk({ content: "c" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -129,7 +129,7 @@ test.only("Runnable streamEvents method on a chat model", async () => { name: "FakeListChatModel", metadata: expect.any(Object), run_id: expect.any(String), - tags: [] + tags: [], }, ]); }); diff --git a/langchain-core/src/tracers/event_stream.ts b/langchain-core/src/tracers/event_stream.ts index 9ad26247c527..5a05da40ed75 100644 --- a/langchain-core/src/tracers/event_stream.ts +++ b/langchain-core/src/tracers/event_stream.ts @@ -354,7 +354,7 @@ export class EventStreamCallbackHandler extends BaseTracer { if (runInfo === undefined) { throw new Error(`onLLMNewToken: Run ID ${run.id} not found in run map.`); } - // Covered by streamed chunks in tapOutputIterable + // Top-level streaming events are covered by tapOutputIterable if (run.parent_run_id === undefined) { return; } From 909d61133f2f0b80c06e837e258782ce439eb0df Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 19 Jul 2024 15:13:08 -0700 Subject: [PATCH 3/3] Fix format --- .../tests/runnable_stream_events_v2.test.ts | 48 ------------------- langchain-core/src/tracers/event_stream.ts | 13 ++++- 2 files changed, 11 insertions(+), 50 deletions(-) diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index e9e7c6762a3d..cb7b0dcea888 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -653,18 +653,6 @@ test("Runnable streamEvents method with llm", async () => { a: "b", }, }, - { - event: "on_llm_stream", - run_id: expect.any(String), - name: "my_model", - tags: ["my_model"], - metadata: { - a: "b", - }, - data: { - chunk: "h", - }, - }, { event: "on_llm_stream", data: { @@ -679,18 +667,6 @@ test("Runnable streamEvents method with llm", async () => { a: "b", }, }, - { - event: "on_llm_stream", - run_id: expect.any(String), - name: "my_model", - tags: ["my_model"], - metadata: { - a: "b", - }, - data: { - chunk: "e", - }, - }, { event: "on_llm_stream", data: { @@ -705,18 +681,6 @@ test("Runnable streamEvents method with llm", async () => { a: "b", }, }, - { - event: "on_llm_stream", - run_id: expect.any(String), - name: "my_model", - tags: ["my_model"], - metadata: { - a: "b", - }, - data: { - chunk: "y", - }, - }, { event: "on_llm_stream", data: { @@ -731,18 +695,6 @@ test("Runnable streamEvents method with llm", async () => { a: "b", }, }, - { - event: "on_llm_stream", - run_id: expect.any(String), - name: "my_model", - tags: ["my_model"], - metadata: { - a: "b", - }, - data: { - chunk: "!", - }, - }, { event: "on_llm_end", data: { diff --git a/langchain-core/src/tracers/event_stream.ts b/langchain-core/src/tracers/event_stream.ts index 5a05da40ed75..cf9c148c5275 100644 --- a/langchain-core/src/tracers/event_stream.ts +++ b/langchain-core/src/tracers/event_stream.ts @@ -244,6 +244,13 @@ export class EventStreamCallbackHandler extends BaseTracer { yield firstChunk.value; return; } + // Match format from handlers below + function _formatOutputChunk(eventType: string, data: unknown) { + if (eventType === "llm" && typeof data === "string") { + return new GenerationChunk({ text: data }); + } + return data; + } let tappedPromise = this.tappedPromises.get(runId); // if we are the first to tap, issue stream events if (tappedPromise === undefined) { @@ -264,7 +271,9 @@ export class EventStreamCallbackHandler extends BaseTracer { await this.send( { ...event, - data: { chunk: firstChunk.value }, + data: { + chunk: _formatOutputChunk(runInfo.runType, firstChunk.value), + }, }, runInfo ); @@ -276,7 +285,7 @@ export class EventStreamCallbackHandler extends BaseTracer { { ...event, data: { - chunk, + chunk: _formatOutputChunk(runInfo.runType, chunk), }, }, runInfo