Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor]: Adds streamEvents method to runnables #4349

Merged
merged 9 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
label: "Cookbook"
position: 2
position: 5
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
label: "How to"
position: 1
position: 4
2 changes: 1 addition & 1 deletion docs/core_docs/docs/expression_language/interface.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 0
sidebar_position: 2
---

import CodeBlock from "@theme/CodeBlock";
Expand Down
16 changes: 16 additions & 0 deletions docs/core_docs/docs/expression_language/streaming.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
sidebar_position: 3
---

# Streaming With LangChain

Streaming is critical in making applications based on LLMs feel responsive to end-users.

Important LangChain primitives like LLMs, parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.

This interface provides two general approaches to stream content:

- `.stream()`: a default implementation of streaming that streams the final output from the chain.
- `streamEvents()` and `streamLog()`: these provide a way to stream both intermediate steps and final output from the chain.

Let’s take a look at both approaches!
2 changes: 1 addition & 1 deletion docs/core_docs/docs/expression_language/why.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 0
sidebar_position: 1
---

# Why use LCEL?
Expand Down
26 changes: 25 additions & 1 deletion docs/core_docs/docs/modules/agents/how_to/streaming.mdx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import CodeBlock from "@theme/CodeBlock";
import StreamIntermediateStepsExample from "@examples/agents/stream_intermediate_steps.ts";
import StreamEventsExample from "@examples/agents/stream_events.ts";
import StreamLogExample from "@examples/agents/stream_log.ts";

# Streaming
Expand Down Expand Up @@ -27,7 +28,30 @@ You can see that we get back a bunch of different information. There are two way
1. By using the AgentAction or observation directly
2. By using the messages object

## Streaming tokens
## Custom streaming with events

Use the `streamEvents` API in case the default behavior of stream does not work for your application (e.g., if you need to stream individual tokens from the agent or surface steps occuring within tools).

:::warning
This is a beta API, meaning that some details might change slightly in the future based on usage. You can pass a `version` parameter to tweak the behavior.
:::

Let’s use this API to stream the following events:

1. Agent Start with inputs
2. Tool Start with inputs
3. Tool End with outputs
4. Stream the agent final anwer token by token
5. Agent End with outputs

<CodeBlock language="typescript">{StreamEventsExample}</CodeBlock>

## Other approaches

### `streamLog`

You can also use the astream_log API. This API produces a granular log of all events that occur during execution. The log format is based on the [JSONPatch](https://jsonpatch.com/) standard.
It’s granular, but requires effort to parse. For this reason, we created the `streamEvents` API as an easier alternative.

In addition to streaming the final result, you can also stream tokens from each individual step. This will require more complex parsing of the logs.

Expand Down
81 changes: 81 additions & 0 deletions examples/src/agents/stream_events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { TavilySearchResults } from "@langchain/community/tools/tavily_search";
import { ChatOpenAI } from "@langchain/openai";
import type { ChatPromptTemplate } from "@langchain/core/prompts";

import { pull } from "langchain/hub";
import { AgentExecutor, createOpenAIFunctionsAgent } from "langchain/agents";

// Define the tools the agent will have access to.
const tools = [new TavilySearchResults({})];

const llm = new ChatOpenAI({
modelName: "gpt-3.5-turbo-1106",
temperature: 0,
streaming: true,
});

// Get the prompt to use - you can modify this!
// If you want to see the prompt in full, you can at:
// https://smith.langchain.com/hub/hwchase17/openai-functions-agent
const prompt = await pull<ChatPromptTemplate>(
"hwchase17/openai-functions-agent"
);

const agent = await createOpenAIFunctionsAgent({
llm,
tools,
prompt,
});

const agentExecutor = new AgentExecutor({
agent,
tools,
}).withConfig({ runName: "Agent" });

const eventStream = await agentExecutor.streamEvents(
{
input: "what is the weather in SF",
},
{ version: "v1" }
);

for await (const event of eventStream) {
const eventType = event.event;
if (eventType === "on_chain_start") {
// Was assigned when creating the agent with `.withConfig({"runName": "Agent"})` above
if (event.name === "Agent") {
console.log("\n-----");
console.log(
`Starting agent: ${event.name} with input: ${JSON.stringify(
event.data.input
)}`
);
}
} else if (eventType === "on_chain_end") {
// Was assigned when creating the agent with `.withConfig({"runName": "Agent"})` above
if (event.name === "Agent") {
console.log("\n-----");
console.log(`Finished agent: ${event.name}\n`);
console.log(`Agent output was: ${event.data.output}`);
console.log("\n-----");
}
} else if (eventType === "on_llm_stream") {
const content = event.data?.chunk?.message?.content;
// Empty content in the context of OpenAI means
// that the model is asking for a tool to be invoked via function call.
// So we only print non-empty content
if (content !== undefined && content !== "") {
console.log(`| ${content}`);
}
} else if (eventType === "on_tool_start") {
console.log("\n-----");
console.log(
`Starting tool: ${event.name} with inputs: ${event.data.input}`
);
} else if (eventType === "on_tool_end") {
console.log("\n-----");
console.log(`Finished tool: ${event.name}\n`);
console.log(`Tool output was: ${event.data.output}`);
console.log("\n-----");
}
}
9 changes: 9 additions & 0 deletions examples/src/agents/stream_log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ const logStream = await agentExecutor.streamLog({
input: "what is the weather in SF",
});

// You can optionally aggregate the final state using the .concat() method
// as shown below.
let finalState;

for await (const chunk of logStream) {
if (!finalState) {
finalState = chunk;
} else {
finalState = finalState.concat(chunk);
}
console.log(JSON.stringify(chunk, null, 2));
}
/*
Expand Down
3 changes: 2 additions & 1 deletion langchain-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langchain/core",
"version": "0.1.26",
"version": "0.1.27",
"description": "Core LangChain.js abstractions and schemas",
"type": "module",
"engines": {
Expand All @@ -27,6 +27,7 @@
"prepack": "yarn build",
"release": "release-it --only-version --config .release-it.json",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --testPathIgnorePatterns=\\.int\\.test.ts --testTimeout 30000 --maxWorkers=50%",
"test:integration": "NODE_OPTIONS=--experimental-vm-modules jest --testPathPattern=\\.int\\.test.ts --testTimeout 100000 --maxWorkers=50%",
"test:watch": "NODE_OPTIONS=--experimental-vm-modules jest --watch --testPathIgnorePatterns=\\.int\\.test.ts",
"test:single": "NODE_OPTIONS=--experimental-vm-modules yarn run jest --config jest.config.cjs --testTimeout 100000",
"format": "prettier --config .prettierrc --write \"src\"",
Expand Down
Loading
Loading