Skip to content

Commit

Permalink
core[patch]: Fix parent run tracking when streaming runnable lambdas (#…
Browse files Browse the repository at this point in the history
…5993)

* Fix parent run tracking when streaming runnable lambdas

* lint

* Fix test
  • Loading branch information
jacoblee93 authored Jul 7, 2024
1 parent 177edaf commit afe5572
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 32 deletions.
26 changes: 13 additions & 13 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2318,15 +2318,19 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
}
}
}
const childConfig = patchConfig(config, {
callbacks: runManager?.getChild(),
recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1,
});
const output = await new Promise<RunOutput | Runnable>(
(resolve, reject) => {
void AsyncLocalStorageProviderSingleton.getInstance().run(
config,
childConfig,
async () => {
try {
const res = await this.func(finalChunk as RunInput, {
...config,
config,
...childConfig,
config: childConfig,
});
resolve(res);
} catch (e) {
Expand All @@ -2340,23 +2344,19 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
if (config?.recursionLimit === 0) {
throw new Error("Recursion limit reached.");
}
const stream = await output.stream(
finalChunk as RunInput,
patchConfig(config, {
callbacks: runManager?.getChild(),
recursionLimit:
(config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1,
})
);
const stream = await output.stream(finalChunk as RunInput, childConfig);
for await (const chunk of stream) {
yield chunk;
}
} else if (isAsyncIterable(output)) {
for await (const chunk of consumeAsyncIterableInContext(config, output)) {
for await (const chunk of consumeAsyncIterableInContext(
childConfig,
output
)) {
yield chunk as RunOutput;
}
} else if (isIterableIterator(output)) {
for (const chunk of consumeIteratorInContext(config, output)) {
for (const chunk of consumeIteratorInContext(childConfig, output)) {
yield chunk as RunOutput;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
event: "on_chat_model_start",
name: "my_model",
run_id: expect.any(String),
tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]),
tags: expect.arrayContaining(["my_model", "my_chain"]),
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1027,7 +1027,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
{
event: "on_chat_model_stream",
run_id: expect.any(String),
tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]),
tags: expect.arrayContaining(["my_chain", "my_model"]),
metadata: {
a: "b",
foo: "bar",
Expand All @@ -1040,7 +1040,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
{
event: "on_chat_model_stream",
run_id: expect.any(String),
tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]),
tags: expect.arrayContaining(["my_chain", "my_model"]),
metadata: {
a: "b",
foo: "bar",
Expand All @@ -1053,7 +1053,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
{
event: "on_chat_model_stream",
run_id: expect.any(String),
tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]),
tags: expect.arrayContaining(["my_chain", "my_model"]),
metadata: {
a: "b",
foo: "bar",
Expand All @@ -1066,7 +1066,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
{
event: "on_chat_model_stream",
run_id: expect.any(String),
tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]),
tags: expect.arrayContaining(["my_chain", "my_model"]),
metadata: {
a: "b",
foo: "bar",
Expand All @@ -1080,7 +1080,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s
event: "on_chat_model_end",
name: "my_model",
run_id: expect.any(String),
tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]),
tags: expect.arrayContaining(["my_model", "my_chain"]),
metadata: {
foo: "bar",
a: "b",
Expand Down Expand Up @@ -1227,7 +1227,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one
event: "on_chat_model_start",
name: "my_model",
run_id: expect.any(String),
tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]),
tags: expect.arrayContaining(["my_model", "my_chain"]),
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1245,7 +1245,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one
{
event: "on_chat_model_stream",
run_id: expect.any(String),
tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]),
tags: expect.arrayContaining(["my_chain", "my_model"]),
metadata: {
a: "b",
foo: "bar",
Expand All @@ -1259,7 +1259,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one
event: "on_chat_model_end",
name: "my_model",
run_id: expect.any(String),
tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]),
tags: expect.arrayContaining(["my_model", "my_chain"]),
metadata: {
foo: "bar",
a: "b",
Expand Down Expand Up @@ -1417,7 +1417,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
},
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
run_id: expect.any(String),
metadata: {
foo: "bar",
Expand All @@ -1433,7 +1433,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1448,7 +1448,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1463,7 +1463,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1478,7 +1478,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1504,7 +1504,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand Down Expand Up @@ -1654,7 +1654,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str
},
},
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
run_id: expect.any(String),
metadata: {
foo: "bar",
Expand All @@ -1670,7 +1670,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand All @@ -1696,7 +1696,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str
},
run_id: expect.any(String),
name: "my_model",
tags: ["seq:step:2", "my_model", "my_chain"],
tags: ["my_model", "my_chain"],
metadata: {
foo: "bar",
a: "b",
Expand Down
10 changes: 9 additions & 1 deletion langchain-core/src/singletons/tests/async_local_storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { test, expect } from "@jest/globals";
import { v4 } from "uuid";
import { AsyncLocalStorage } from "node:async_hooks";
import { AsyncLocalStorageProviderSingleton } from "../index.js";
import { RunnableLambda } from "../../runnables/base.js";
import { FakeListChatModel } from "../../utils/testing/index.js";
import { getCallbackManagerForConfig } from "../../runnables/config.js";

test("Config should be automatically populated after setting global async local storage", async () => {
const inner = RunnableLambda.from((_, config) => config);
Expand Down Expand Up @@ -136,10 +138,16 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const asyncLocalStorage = AsyncLocalStorageProviderSingleton.getInstance();
const chat = new FakeListChatModel({
responses: ["Hello"],
});
const outerRunId = v4();
const myFunc = async (input: string) => {
const outerCallbackManager = await getCallbackManagerForConfig(
asyncLocalStorage.getStore()
);
expect(outerCallbackManager?.getParentRunId()).toEqual(outerRunId);
for await (const _ of await chat.stream(input)) {
// no-op
}
Expand All @@ -150,8 +158,8 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a
const events = [];
for await (const event of myNestedLambda.streamEvents("hello", {
version: "v1",
runId: outerRunId,
})) {
console.log(event);
events.push(event);
}
const chatModelStreamEvent = events.find((event) => {
Expand Down

0 comments on commit afe5572

Please sign in to comment.