Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Improve Langsmith traces for AI executions #9081

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
type IExecuteFunctions,
type INodeExecutionData,
NodeConnectionType,
NodeOperationError,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';

import { initializeAgentExecutorWithOptions } from 'langchain/agents';
import type { BaseChatMemory } from '@langchain/community/memory/chat_memory';
Expand All @@ -16,13 +12,13 @@ import {
getOptionalOutputParsers,
getConnectedTools,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

export async function conversationalAgentExecute(
this: IExecuteFunctions,
nodeVersion: number,
): Promise<INodeExecutionData[][]> {
this.logger.verbose('Executing Conversational Agent');

const model = await this.getInputConnectionData(NodeConnectionType.AiLanguageModel, 0);

if (!isChatInstance(model)) {
Expand Down Expand Up @@ -104,7 +100,9 @@ export async function conversationalAgentExecute(
input = (await prompt.invoke({ input })).value;
}

let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });

if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
getOptionalOutputParsers,
getPromptInputByType,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

export async function openAiFunctionsAgentExecute(
this: IExecuteFunctions,
Expand Down Expand Up @@ -104,7 +105,9 @@ export async function openAiFunctionsAgentExecute(
input = (await prompt.invoke({ input })).value;
}

let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });

if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
getOptionalOutputParsers,
getPromptInputByType,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

export async function planAndExecuteAgentExecute(
this: IExecuteFunctions,
Expand Down Expand Up @@ -79,7 +80,9 @@ export async function planAndExecuteAgentExecute(
input = (await prompt.invoke({ input })).value;
}

let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });

if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
getPromptInputByType,
isChatInstance,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

export async function reActAgentAgentExecute(
this: IExecuteFunctions,
Expand Down Expand Up @@ -100,7 +101,10 @@ export async function reActAgentAgentExecute(
input = (await prompt.invoke({ input })).value;
}

let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });

if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import type { DataSource } from '@n8n/typeorm';

import { getPromptInputByType, serializeChatHistory } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
import { getSqliteDataSource } from './other/handlers/sqlite';
import { getPostgresDataSource } from './other/handlers/postgres';
import { SQL_PREFIX, SQL_SUFFIX } from './other/prompts';
Expand Down Expand Up @@ -126,16 +127,16 @@

let response: IDataObject;
try {
response = await agentExecutor.call({
response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
input,
signal: this.getExecutionCancelSignal(),
chatHistory,
});
} catch (error) {
if ((error.message as IDataObject)?.output) {

Check warning on line 136 in packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts

View workflow job for this annotation

GitHub Actions / Lint changes

Unsafe member access .message on an `any` value
response = error.message as IDataObject;

Check warning on line 137 in packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts

View workflow job for this annotation

GitHub Actions / Lint changes

Unsafe member access .message on an `any` value
} else {
throw new NodeOperationError(this.getNode(), error.message as string, { itemIndex: i });

Check warning on line 139 in packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts

View workflow job for this annotation

GitHub Actions / Lint changes

Unsafe member access .message on an `any` value
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
} from 'n8n-workflow';
import type { OpenAIToolType } from 'langchain/dist/experimental/openai_assistant/schema';
import { getConnectedTools } from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';
import { formatToOpenAIAssistantTool } from './utils';

export class OpenAiAssistant implements INodeType {
Expand Down Expand Up @@ -373,7 +374,7 @@ export class OpenAiAssistant implements INodeType {
tools,
});

const response = await agentExecutor.call({
const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
content: input,
signal: this.getExecutionCancelSignal(),
timeout: options.timeout ?? 10000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
getPromptInputByType,
isChatInstance,
} from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';

interface MessagesTemplate {
type: string;
Expand Down Expand Up @@ -154,9 +155,9 @@ async function createSimpleLLMChain(
const chain = new LLMChain({
llm,
prompt,
});
}).withConfig(getTracingConfig(context));

const response = (await chain.call({
const response = (await chain.invoke({
query,
signal: context.getExecutionCancelSignal(),
})) as string[];
Expand Down Expand Up @@ -203,8 +204,9 @@ async function getChain(
);

const chain = prompt.pipe(llm).pipe(combinedOutputParser);

const response = (await chain.invoke({ query })) as string | string[];
const response = (await chain.withConfig(getTracingConfig(context)).invoke({ query })) as
| string
| string[];

return Array.isArray(response) ? response : [response];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { BaseRetriever } from '@langchain/core/retrievers';
import { getTemplateNoticeField } from '../../../utils/sharedFields';
import { getPromptInputByType } from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';

export class ChainRetrievalQa implements INodeType {
description: INodeTypeDescription = {
Expand Down Expand Up @@ -176,7 +177,7 @@ export class ChainRetrievalQa implements INodeType {
throw new NodeOperationError(this.getNode(), 'The ‘query‘ parameter is empty.');
}

const response = await chain.call({ query });
const response = await chain.withConfig(getTracingConfig(this)).invoke({ query });
returnData.push({ json: { response } });
}
return await this.prepareOutputData(returnData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { N8nBinaryLoader } from '../../../../utils/N8nBinaryLoader';
import { getTemplateNoticeField } from '../../../../utils/sharedFields';
import { REFINE_PROMPT_TEMPLATE, DEFAULT_PROMPT_TEMPLATE } from '../prompt';
import { getChainPromptsArgs } from '../helpers';
import { getTracingConfig } from '../../../../utils/tracing';

function getInputs(parameters: IDataObject) {
const chunkingMode = parameters?.chunkingMode;
Expand Down Expand Up @@ -364,7 +365,7 @@ export class ChainSummarizationV2 implements INodeType {
? await documentInput.processItem(item, itemIndex)
: documentInput;

const response = await chain.call({
const response = await chain.withConfig(getTracingConfig(this)).invoke({
input_documents: processedDocuments,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Document } from '@langchain/core/documents';

import type { SetField, SetNodeOptions } from 'n8n-nodes-base/dist/nodes/Set/v2/helpers/interfaces';
import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import type { CallbackManagerForRetrieverRun } from '@langchain/core/callbacks/manager';
import { logWrapper } from '../../../utils/logWrapper';

function objectToString(obj: Record<string, string> | IDataObject, level = 0) {
Expand Down Expand Up @@ -287,7 +288,10 @@ export class RetrieverWorkflow implements INodeType {
this.executeFunctions = executeFunctions;
}

async getRelevantDocuments(query: string): Promise<Document[]> {
async _getRelevantDocuments(
query: string,
config?: CallbackManagerForRetrieverRun,
): Promise<Document[]> {
const source = this.executeFunctions.getNodeParameter('source', itemIndex) as string;

const baseMetadata: IDataObject = {
Expand Down Expand Up @@ -360,6 +364,7 @@ export class RetrieverWorkflow implements INodeType {
receivedItems = (await this.executeFunctions.executeWorkflow(
workflowInfo,
items,
config?.getChild(),
)) as INodeExecutionData[][];
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import { DynamicTool } from '@langchain/core/tools';
import get from 'lodash/get';
import isObject from 'lodash/isObject';
import type { CallbackManagerForToolRun } from '@langchain/core/callbacks/manager';
import { getConnectionHintNoticeField } from '../../../utils/sharedFields';

export class ToolWorkflow implements INodeType {
Expand Down Expand Up @@ -320,7 +321,10 @@ export class ToolWorkflow implements INodeType {
const name = this.getNodeParameter('name', itemIndex) as string;
const description = this.getNodeParameter('description', itemIndex) as string;

const runFunction = async (query: string): Promise<string> => {
const runFunction = async (
query: string,
runManager?: CallbackManagerForToolRun,
): Promise<string> => {
const source = this.getNodeParameter('source', itemIndex) as string;
const responsePropertyName = this.getNodeParameter(
'responsePropertyName',
Expand Down Expand Up @@ -385,7 +389,11 @@ export class ToolWorkflow implements INodeType {

let receivedData: INodeExecutionData;
try {
receivedData = (await this.executeWorkflow(workflowInfo, items)) as INodeExecutionData;
receivedData = (await this.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
)) as INodeExecutionData;
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
Expand Down Expand Up @@ -413,13 +421,13 @@ export class ToolWorkflow implements INodeType {
name,
description,

func: async (query: string): Promise<string> => {
func: async (query: string, runManager?: CallbackManagerForToolRun): Promise<string> => {
const { index } = this.addInputData(NodeConnectionType.AiTool, [[{ json: { query } }]]);

let response: string = '';
let executionError: ExecutionError | undefined;
try {
response = await runFunction(query);
response = await runFunction(query, runManager);
} catch (error) {
// TODO: Do some more testing. Issues here should actually fail the workflow
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { formatToOpenAIAssistantTool } from '../../helpers/utils';
import { assistantRLC } from '../descriptions';

import { getConnectedTools } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

const properties: INodeProperties[] = [
assistantRLC,
Expand Down Expand Up @@ -181,7 +182,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
tools: tools ?? [],
});

const response = await agentExecutor.invoke({
const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
content: input,
signal: this.getExecutionCancelSignal(),
timeout: options.timeout ?? 10000,
Expand Down
26 changes: 26 additions & 0 deletions packages/@n8n/nodes-langchain/utils/tracing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { BaseCallbackConfig } from '@langchain/core/callbacks/manager';
import type { IExecuteFunctions } from 'n8n-workflow';

interface TracingConfig {
additionalMetadata?: Record<string, unknown>;
}

export function getTracingConfig(
context: IExecuteFunctions,
config: TracingConfig = {},
): BaseCallbackConfig {
const parentRunManager = context.getParentCallbackManager
? context.getParentCallbackManager()
: undefined;

return {
runName: `[${context.getWorkflow().name}] ${context.getNode().name}`,
metadata: {
execution_id: context.getExecutionId(),
workflow: context.getWorkflow(),
node: context.getNode().name,
...(config.additionalMetadata ?? {}),
},
callbacks: parentRunManager,
};
}
3 changes: 3 additions & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type {
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
CallbackManager,
} from 'n8n-workflow';
import {
ApplicationError,
Expand Down Expand Up @@ -754,6 +755,7 @@ async function executeWorkflow(
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const internalHooks = Container.get(InternalHooks);
Expand Down Expand Up @@ -815,6 +817,7 @@ async function executeWorkflow(
workflowData,
);
additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;

// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import type {
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
CallbackManager,
} from 'n8n-workflow';
import {
ExpressionError,
Expand Down Expand Up @@ -3487,13 +3488,15 @@ export function getExecuteFunctions(
async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
): Promise<any> {
return await additionalData
.executeWorkflow(workflowInfo, additionalData, {
parentWorkflowId: workflow.id?.toString(),
inputData,
parentWorkflowSettings: workflow.settings,
node,
parentCallbackManager,
})
.then(
async (result) =>
Expand Down Expand Up @@ -3719,6 +3722,7 @@ export function getExecuteFunctions(
msg,
});
},
getParentCallbackManager: () => additionalData.parentCallbackManager,
};
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/workflow/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"recast": "0.21.5",
"title-case": "3.0.3",
"transliteration": "2.3.5",
"xml2js": "0.6.2"
"xml2js": "0.6.2",
"@langchain/core": "0.1.41"
}
}
Loading
Loading