diff --git a/langchain-core/src/language_models/chat_models.ts b/langchain-core/src/language_models/chat_models.ts index 9239ff52c03b..a80776ff0ba3 100644 --- a/langchain-core/src/language_models/chat_models.ts +++ b/langchain-core/src/language_models/chat_models.ts @@ -264,6 +264,10 @@ export abstract class BaseChatModel< callOptions, runManagers?.[0] )) { + if (chunk.message.id == null) { + const runId = runManagers?.at(0)?.runId; + if (runId != null) chunk.message._updateId(`run-${runId}`); + } chunk.message.response_metadata = { ...chunk.generationInfo, ...chunk.message.response_metadata, @@ -362,6 +366,10 @@ export abstract class BaseChatModel< ); let aggregated; for await (const chunk of stream) { + if (chunk.message.id == null) { + const runId = runManagers?.at(0)?.runId; + if (runId != null) chunk.message._updateId(`run-${runId}`); + } if (aggregated === undefined) { aggregated = chunk; } else { @@ -397,6 +405,10 @@ export abstract class BaseChatModel< if (pResult.status === "fulfilled") { const result = pResult.value; for (const generation of result.generations) { + if (generation.message.id == null) { + const runId = runManagers?.at(0)?.runId; + if (runId != null) generation.message._updateId(`run-${runId}`); + } generation.message.response_metadata = { ...generation.generationInfo, ...generation.message.response_metadata, diff --git a/langchain-core/src/messages/base.ts b/langchain-core/src/messages/base.ts index 8b74c7f158e1..639fcf226025 100644 --- a/langchain-core/src/messages/base.ts +++ b/langchain-core/src/messages/base.ts @@ -289,6 +289,16 @@ export abstract class BaseMessage }; } + // this private method is used to update the ID for the runtime + // value as well as in lc_kwargs for serialisation + _updateId(value: string | undefined) { + this.id = value; + + // lc_attributes wouldn't work here, because jest compares the + // whole object + this.lc_kwargs.id = value; + } + get [Symbol.toStringTag]() { // eslint-disable-next-line @typescript-eslint/no-explicit-any return (this.constructor as any).lc_name(); diff --git a/langchain-core/src/runnables/tests/runnable_history.test.ts b/langchain-core/src/runnables/tests/runnable_history.test.ts index 4efe19047e7a..0614f35c71ce 100644 --- a/langchain-core/src/runnables/tests/runnable_history.test.ts +++ b/langchain-core/src/runnables/tests/runnable_history.test.ts @@ -22,6 +22,8 @@ import { import { ChatPromptTemplate, MessagesPlaceholder } from "../../prompts/chat.js"; import { StringOutputParser } from "../../output_parsers/string.js"; +const anyString = expect.any(String) as unknown as string; + // For `BaseChatMessageHistory` async function getGetSessionHistory(): Promise< (sessionId: string) => Promise @@ -107,9 +109,15 @@ test("Runnable with message history with a chat model", async () => { const sessionHistory = await getMessageHistory("2"); expect(await sessionHistory.getMessages()).toEqual([ new HumanMessage("hello"), - new AIMessage("Hello world!"), + new AIMessage({ + id: anyString, + content: "Hello world!", + }), new HumanMessage("good bye"), - new AIMessageChunk("Hello world!"), + new AIMessageChunk({ + id: anyString, + content: "Hello world!", + }), ]); }); @@ -129,6 +137,7 @@ test("Runnable with message history with a messages in, messages out chain", asy config: {}, getMessageHistory, }); + const config: RunnableConfig = { configurable: { sessionId: "2" } }; const output = await withHistory.invoke([new HumanMessage("hello")], config); expect(output.content).toBe("So long and thanks for the fish!!"); @@ -147,9 +156,15 @@ test("Runnable with message history with a messages in, messages out chain", asy const sessionHistory = await getMessageHistory("2"); expect(await sessionHistory.getMessages()).toEqual([ new HumanMessage("hello"), - new AIMessage("So long and thanks for the fish!!"), + new AIMessage({ + id: anyString, + content: "So long and thanks for the fish!!", + }), new HumanMessage("good bye"), - new AIMessageChunk("So long and thanks for the fish!!"), + new AIMessageChunk({ + id: anyString, + content: "So long and thanks for the fish!!", + }), ]); }); diff --git a/langchain-core/src/runnables/tests/runnable_stream_events.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events.test.ts index e9ab07a36fae..7f6bce337ce8 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events.test.ts @@ -30,6 +30,8 @@ function reverse(s: string) { return s.split("").reverse().join(""); } +const anyString = expect.any(String) as unknown as string; + const originalCallbackValue = process.env.LANGCHAIN_CALLBACKS_BACKGROUND; afterEach(() => { @@ -733,7 +735,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_chain_stream", @@ -743,7 +745,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_llm_stream", @@ -756,7 +758,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("O") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "O" }) }, }, { event: "on_chain_stream", @@ -766,7 +768,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("O") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "O" }) }, }, { event: "on_llm_stream", @@ -779,7 +781,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("A") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "A" }) }, }, { event: "on_chain_stream", @@ -789,7 +791,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("A") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "A" }) }, }, { event: "on_llm_stream", @@ -802,7 +804,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_chain_stream", @@ -812,7 +814,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_llm_end", @@ -836,7 +838,7 @@ test("Runnable streamEvents method with chat model chain", async () => { [ new ChatGenerationChunk({ generationInfo: {}, - message: new AIMessageChunk("ROAR"), + message: new AIMessageChunk({ id: anyString, content: "ROAR" }), text: "ROAR", }), ], @@ -853,7 +855,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, data: { - output: new AIMessageChunk("ROAR"), + output: new AIMessageChunk({ id: anyString, content: "ROAR" }), }, }, ]); diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index d1fc1ea7fc65..1702d226aa4b 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -90,6 +90,10 @@ test("Runnable streamEvents method on a chat model", async () => { for await (const event of eventStream) { events.push(event); } + + // used here to avoid casting every ID + const anyString = expect.any(String) as unknown as string; + expect(events).toMatchObject([ { data: { input: "hello" }, @@ -100,7 +104,7 @@ test("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "a" }) }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "a" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -108,7 +112,7 @@ test("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "b" }) }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "b" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -116,7 +120,7 @@ test("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { chunk: new AIMessageChunk({ content: "c" }) }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "c" }) }, event: "on_chat_model_stream", name: "FakeListChatModel", metadata: expect.any(Object), @@ -124,7 +128,7 @@ test("Runnable streamEvents method on a chat model", async () => { tags: [], }, { - data: { output: new AIMessageChunk({ content: "abc" }) }, + data: { output: new AIMessageChunk({ id: anyString, content: "abc" }) }, event: "on_chat_model_end", name: "FakeListChatModel", metadata: expect.any(Object), @@ -748,6 +752,9 @@ test("Runnable streamEvents method with chat model chain", async () => { for await (const event of eventStream) { events.push(event); } + + // used here to avoid casting every ID + const anyString = expect.any(String) as unknown as string; expect(events).toEqual([ { run_id: expect.any(String), @@ -816,7 +823,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ content: "R", id: anyString }) }, }, { event: "on_chain_stream", @@ -826,7 +833,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ content: "R", id: anyString }) }, }, { event: "on_chat_model_stream", @@ -839,7 +846,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("O") }, + data: { chunk: new AIMessageChunk({ content: "O", id: anyString }) }, }, { event: "on_chain_stream", @@ -849,7 +856,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("O") }, + data: { chunk: new AIMessageChunk({ content: "O", id: anyString }) }, }, { event: "on_chat_model_stream", @@ -862,7 +869,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("A") }, + data: { chunk: new AIMessageChunk({ content: "A", id: anyString }) }, }, { event: "on_chain_stream", @@ -872,7 +879,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("A") }, + data: { chunk: new AIMessageChunk({ content: "A", id: anyString }) }, }, { event: "on_chat_model_stream", @@ -885,7 +892,7 @@ test("Runnable streamEvents method with chat model chain", async () => { ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ content: "R", id: anyString }) }, }, { event: "on_chain_stream", @@ -895,7 +902,7 @@ test("Runnable streamEvents method with chat model chain", async () => { foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ content: "R", id: anyString }) }, }, { event: "on_chat_model_end", @@ -914,7 +921,7 @@ test("Runnable streamEvents method with chat model chain", async () => { [new SystemMessage("You are Godzilla"), new HumanMessage("hello")], ], }, - output: new AIMessageChunk("ROAR"), + output: new AIMessageChunk({ content: "ROAR", id: anyString }), }, }, { @@ -925,9 +932,7 @@ test("Runnable streamEvents method with chat model chain", async () => { metadata: { foo: "bar", }, - data: { - output: new AIMessageChunk("ROAR"), - }, + data: { output: new AIMessageChunk({ content: "ROAR", id: anyString }) }, }, ]); }); @@ -965,6 +970,10 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s for await (const event of eventStream) { events.push(event); } + + // used here to avoid casting every ID + const anyString = expect.any(String) as unknown as string; + expect(events).toEqual([ { run_id: expect.any(String), @@ -1043,7 +1052,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_chat_model_stream", @@ -1056,7 +1065,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("O") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "O" }) }, }, { event: "on_chat_model_stream", @@ -1069,7 +1078,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("A") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "A" }) }, }, { event: "on_chat_model_stream", @@ -1082,7 +1091,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("R") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "R" }) }, }, { event: "on_chat_model_end", @@ -1101,7 +1110,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s [new SystemMessage("You are Godzilla"), new HumanMessage("hello")], ], }, - output: new AIMessageChunk("ROAR"), + output: new AIMessageChunk({ id: anyString, content: "ROAR" }), }, }, { @@ -1112,7 +1121,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s foo: "bar", }, name: "RunnableLambda", - data: { chunk: new AIMessageChunk("ROAR") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "ROAR" }) }, }, { event: "on_chain_stream", @@ -1122,7 +1131,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessageChunk("ROAR") }, + data: { chunk: new AIMessageChunk({ id: anyString, content: "ROAR" }) }, }, { event: "on_chain_end", @@ -1134,7 +1143,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s }, data: { input: await template.invoke({ question: "hello" }), - output: new AIMessageChunk("ROAR"), + output: new AIMessageChunk({ id: anyString, content: "ROAR" }), }, }, { @@ -1146,7 +1155,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s foo: "bar", }, data: { - output: new AIMessageChunk("ROAR"), + output: new AIMessageChunk({ id: anyString, content: "ROAR" }), }, }, ]); @@ -1183,6 +1192,8 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one for await (const event of eventStream) { events.push(event); } + + const anyString = expect.any(String) as unknown as string; expect(events).toEqual([ { run_id: expect.any(String), @@ -1261,7 +1272,12 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one ls_stop: undefined, }, name: "my_model", - data: { chunk: new AIMessageChunk("You are Godzilla\nhello") }, + data: { + chunk: new AIMessageChunk({ + id: anyString, + content: "You are Godzilla\nhello", + }), + }, }, { event: "on_chat_model_end", @@ -1280,7 +1296,10 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one [new SystemMessage("You are Godzilla"), new HumanMessage("hello")], ], }, - output: new AIMessage("You are Godzilla\nhello"), + output: new AIMessage({ + id: anyString, + content: "You are Godzilla\nhello", + }), }, }, { @@ -1291,7 +1310,12 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one foo: "bar", }, name: "RunnableLambda", - data: { chunk: new AIMessage("You are Godzilla\nhello") }, + data: { + chunk: new AIMessage({ + id: anyString, + content: "You are Godzilla\nhello", + }), + }, }, { event: "on_chain_stream", @@ -1301,7 +1325,12 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one foo: "bar", }, name: "my_chain", - data: { chunk: new AIMessage("You are Godzilla\nhello") }, + data: { + chunk: new AIMessage({ + id: anyString, + content: "You are Godzilla\nhello", + }), + }, }, { event: "on_chain_end", @@ -1313,7 +1342,10 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one }, data: { input: await template.invoke({ question: "hello" }), - output: new AIMessage("You are Godzilla\nhello"), + output: new AIMessage({ + id: anyString, + content: "You are Godzilla\nhello", + }), }, }, { @@ -1325,7 +1357,10 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one foo: "bar", }, data: { - output: new AIMessage("You are Godzilla\nhello"), + output: new AIMessage({ + id: anyString, + content: "You are Godzilla\nhello", + }), }, }, ]); diff --git a/langchain-core/src/tracers/event_stream.ts b/langchain-core/src/tracers/event_stream.ts index cf9c148c5275..015faa20e5c8 100644 --- a/langchain-core/src/tracers/event_stream.ts +++ b/langchain-core/src/tracers/event_stream.ts @@ -370,7 +370,7 @@ export class EventStreamCallbackHandler extends BaseTracer { if (runInfo.runType === "chat_model") { eventName = "on_chat_model_stream"; if (kwargs?.chunk === undefined) { - chunk = new AIMessageChunk({ content: token }); + chunk = new AIMessageChunk({ content: token, id: `run-${run.id}` }); } else { chunk = kwargs.chunk.message; } diff --git a/langchain-core/src/tracers/log_stream.ts b/langchain-core/src/tracers/log_stream.ts index 004a682dec1b..13b97349b04b 100644 --- a/langchain-core/src/tracers/log_stream.ts +++ b/langchain-core/src/tracers/log_stream.ts @@ -457,7 +457,10 @@ export class LogStreamCallbackHandler extends BaseTracer { if (isChatGenerationChunk(kwargs?.chunk)) { streamedOutputValue = kwargs?.chunk; } else { - streamedOutputValue = new AIMessageChunk(token); + streamedOutputValue = new AIMessageChunk({ + id: `run-${run.id}`, + content: token, + }); } } else { streamedOutputValue = token;