Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

langchain[patch],core[patch]: Fix agent executor stream event behavior #5614

Merged
merged 5 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ export abstract class Runnable<
): AsyncGenerator<StreamEvent> {
const eventStreamer = new EventStreamCallbackHandler({
...streamOptions,
autoClose: true,
autoClose: false,
});
const config = ensureConfig(options);
const runId = config.runId ?? uuidv4();
Expand All @@ -843,14 +843,18 @@ export abstract class Runnable<
// add each chunk to the output stream
const outerThis = this;
async function consumeRunnableStream() {
const runnableStream = await outerThis.stream(input, config);
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
try {
const runnableStream = await outerThis.stream(input, config);
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
}
} finally {
await eventStreamer.finish();
}
}
const runnableStreamConsumePromise = consumeRunnableStream();
Expand Down
15 changes: 9 additions & 6 deletions langchain-core/src/runnables/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,13 @@ export class RemoteRunnable<
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
_coerceToDict(input, "input"),
config.runId,
undefined,
undefined,
undefined,
undefined,
options?.runName
config.runName
);
delete config.runId;
let finalOutput: RunOutput | undefined;
let finalOutputSupported = true;
try {
Expand Down Expand Up @@ -476,12 +477,13 @@ export class RemoteRunnable<
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
_coerceToDict(input, "input"),
config.runId,
undefined,
undefined,
undefined,
undefined,
options?.runName
config.runName
);
delete config.runId;
// The type is in camelCase but the API only accepts snake_case.
const camelCaseStreamOptions = {
include_names: streamOptions?.includeNames,
Expand Down Expand Up @@ -547,12 +549,13 @@ export class RemoteRunnable<
const runManager = await callbackManager_?.handleChainStart(
outerThis.toJSON(),
_coerceToDict(input, "input"),
config.runId,
undefined,
undefined,
undefined,
undefined,
options?.runName
config.runName
);
delete config.runId;
// The type is in camelCase but the API only accepts snake_case.
const camelCaseStreamOptions = {
include_names: streamOptions?.includeNames,
Expand Down
27 changes: 10 additions & 17 deletions langchain-core/src/tracers/event_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ export class EventStreamCallbackHandler extends BaseTracer {

protected excludeTags?: string[];

protected rootId?: string;

private runInfoMap: Map<string, RunInfo> = new Map();

private tappedPromises: Map<string, Promise<void>> = new Map();
Expand Down Expand Up @@ -238,7 +236,10 @@ export class EventStreamCallbackHandler extends BaseTracer {
return;
}
const runInfo = this.runInfoMap.get(runId);
// run has finished, don't issue any stream events
// Run has finished, don't issue any stream events.
// An example of this is for runnables that use the default
// implementation of .stream(), which delegates to .invoke()
// and calls .onChainEnd() before passing it to the iterator.
if (runInfo === undefined) {
yield firstChunk.value;
return;
Expand Down Expand Up @@ -286,7 +287,7 @@ export class EventStreamCallbackHandler extends BaseTracer {
} finally {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
tappedPromiseResolver!();
// Don't delete from the map to keep track of which runs have been tapped.
// Don't delete from the promises map to keep track of which runs have been tapped.
}
} else {
// otherwise just pass through
Expand Down Expand Up @@ -607,18 +608,10 @@ export class EventStreamCallbackHandler extends BaseTracer {
);
}

async onRunCreate(run: Run): Promise<void> {
if (this.rootId === undefined) {
this.rootId = run.id;
}
}

async onRunUpdate(run: Run): Promise<void> {
if (run.id === this.rootId && this.autoClose) {
const pendingPromises = [...this.tappedPromises.values()];
void Promise.all(pendingPromises).finally(() => {
void this.writer.close();
});
}
async finish() {
const pendingPromises = [...this.tappedPromises.values()];
void Promise.all(pendingPromises).finally(() => {
void this.writer.close();
});
}
}
10 changes: 6 additions & 4 deletions langchain/src/agents/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,15 @@ export class AgentExecutorIterator
this.runManager = await callbackManager?.handleChainStart(
this.agentExecutor.toJSON(),
this.inputs,
undefined,
this.config?.runId,
undefined,
this.tags ?? this.config?.tags,
this.metadata ?? this.config?.metadata,
this.runName ?? this.config?.runName
);
if (this.config !== undefined) {
delete this.config.runId;
}
}
}

Expand Down Expand Up @@ -234,9 +237,7 @@ export class AgentExecutorIterator
this.intermediateSteps,
runManager
);
if (this.runManager) {
await this.runManager.handleChainEnd(output);
}
await this.runManager?.handleChainEnd(output);
await this.setFinalOutputs(output);
}
}
Expand All @@ -256,6 +257,7 @@ export class AgentExecutorIterator
this.runManager
);
await this.setFinalOutputs(returnedOutput);
await this.runManager?.handleChainEnd(returnedOutput);
return returnedOutput;
}

Expand Down
60 changes: 60 additions & 0 deletions langchain/src/agents/tests/create_tool_calling_agent.int.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
import { z } from "zod";
import { test, expect } from "@jest/globals";
import { ChatOpenAI } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { DynamicStructuredTool } from "@langchain/core/tools";
import { TavilySearchResults } from "../../util/testing/tools/tavily_search.js";
import { AgentExecutor, createToolCallingAgent } from "../index.js";

const syntaxErrorTool = new DynamicStructuredTool({
name: "query",
description:
"use this tool to generate and execute a query from a question using the index.",
schema: z.object({
index_name: z.string().describe("The name of the index to query."),
question: z.string().describe("The question to answer."),
}),
func: async (_params) => {
return JSON.stringify({
result: "-ERR Syntax error at offset 19 near Bronx",
query:
'FT.AGGREGATE bites "@Borough:{The Bronx} @Gender:{M}" GROUPBY 0 REDUCE COUNT 0',
});
},
});

const tools = [new TavilySearchResults({ maxResults: 1 })];

test("createToolCallingAgent works", async () => {
Expand Down Expand Up @@ -78,3 +97,44 @@ test("createToolCallingAgent stream events works", async () => {
}
}
});

test("createToolCallingAgent stream events works for multiple turns", async () => {
const prompt = ChatPromptTemplate.fromMessages([
["system", "You are a helpful assistant"],
["placeholder", "{chat_history}"],
["human", "{input}"],
["placeholder", "{agent_scratchpad}"],
]);
const llm = new ChatOpenAI({
modelName: "gpt-4o",
temperature: 0,
});
const agent = await createToolCallingAgent({
llm,
tools: [syntaxErrorTool],
prompt,
});
const agentExecutor = new AgentExecutor({
agent,
tools: [syntaxErrorTool],
maxIterations: 3,
});
const input =
"Generate a query that looks up how many animals have been bitten in the Bronx.";
const eventStream = agentExecutor.streamEvents(
{
input,
},
{
version: "v2",
}
);

for await (const event of eventStream) {
const eventType = event.event;
console.log("Event type: ", eventType);
if (eventType === "on_chat_model_stream") {
console.log("Content: ", event.data);
}
}
});
1 change: 1 addition & 0 deletions langchain/src/document_loaders/web/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export class S3Loader extends BaseDocumentLoader {
const docs = await unstructuredLoader.load();

return docs;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
throw new Error(
`Failed to load file ${filePath} using unstructured loader: ${e.message}`
Expand Down
3 changes: 3 additions & 0 deletions langchain/src/smith/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ export type EvaluatorInputFormatter = ({
}) => EvaluatorInputs;

export type DynamicRunEvaluatorParams<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Input extends Record<string, any> = Record<string, unknown>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Prediction extends Record<string, any> = Record<string, unknown>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Reference extends Record<string, any> = Record<string, unknown>
> = {
input: Input;
Expand Down
Loading