From 1129dbe4cd75e1077c8d51563c9e60203b68bbb9 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Wed, 3 Apr 2024 18:00:52 +0200 Subject: [PATCH 1/4] WIP: Passing through Run callbacks Signed-off-by: Oleg Ivaniv --- .../agents/ConversationalAgent/execute.ts | 25 +++++++++++++------ .../nodes/chains/ChainLLM/ChainLlm.node.ts | 24 +++++++++++++++--- .../tools/ToolWorkflow/ToolWorkflow.node.ts | 15 ++++++++--- .../cli/src/WorkflowExecuteAdditionalData.ts | 2 ++ packages/core/src/NodeExecuteFunctions.ts | 5 ++++ packages/workflow/src/Interfaces.ts | 5 ++++ 6 files changed, 61 insertions(+), 15 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts index ea148c493cbb7..44ff6d31e9ab8 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts @@ -1,15 +1,12 @@ -import { - type IExecuteFunctions, - type INodeExecutionData, - NodeConnectionType, - NodeOperationError, -} from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; import { initializeAgentExecutorWithOptions } from 'langchain/agents'; import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import type { BaseOutputParser } from '@langchain/core/output_parsers'; import { PromptTemplate } from '@langchain/core/prompts'; import { CombiningOutputParser } from 'langchain/output_parsers'; +import type { CallbackManager } from '@langchain/core/callbacks/manager'; import { isChatInstance, getPromptInputByType, @@ -22,7 +19,12 @@ export async function conversationalAgentExecute( nodeVersion: number, ): Promise { this.logger.verbose('Executing Conversational Agent'); - + const parentRunManager = this.getParentRunManager + ? (this.getParentRunManager() as { tools: CallbackManager }) + : undefined; + if (parentRunManager) { + console.log(parentRunManager.tools); + } const model = await this.getInputConnectionData(NodeConnectionType.AiLanguageModel, 0); if (!isChatInstance(model)) { @@ -57,6 +59,7 @@ export async function conversationalAgentExecute( systemMessage: options.systemMessage, humanMessage: options.humanMessage, }, + // callbacks: parentRunManager?.tools ? [parentRunManager.tools] : undefined, }); const returnData: INodeExecutionData[] = []; @@ -104,7 +107,13 @@ export async function conversationalAgentExecute( input = (await prompt.invoke({ input })).value; } - let response = await agentExecutor.call({ input, outputParsers }); + let response = await agentExecutor + .withConfig({ + runName: this.getNode().name, + metadata: { execution_id: this.getExecutionId() }, + callbacks: parentRunManager?.tools, + }) + .invoke({ input, outputParsers }); if (outputParser) { response = { output: await outputParser.parse(response.output as string) }; diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts index 3aba05397e92a..84246a1cf2641 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts @@ -21,6 +21,7 @@ import { CombiningOutputParser } from 'langchain/output_parsers'; import { LLMChain } from 'langchain/chains'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { HumanMessage } from '@langchain/core/messages'; +import type { CallbackManager } from '@langchain/core/callbacks/manager'; import { getTemplateNoticeField } from '../../../utils/sharedFields'; import { getOptionalOutputParsers, @@ -151,12 +152,20 @@ async function createSimpleLLMChain( query: string, prompt: ChatPromptTemplate | PromptTemplate, ): Promise { + const parentRunManager = context.getParentRunManager + ? (context.getParentRunManager() as { tools: CallbackManager }) + : undefined; + const chain = new LLMChain({ llm, prompt, + }).withConfig({ + runName: context.getNode().name, + callbacks: parentRunManager?.tools, + metadata: { execution_id: context.getExecutionId() }, }); - const response = (await chain.call({ + const response = (await chain.invoke({ query, signal: context.getExecutionCancelSignal(), })) as string[]; @@ -203,8 +212,17 @@ async function getChain( ); const chain = prompt.pipe(llm).pipe(combinedOutputParser); - - const response = (await chain.invoke({ query })) as string | string[]; + const parentRunManager = context.getParentRunManager + ? (context.getParentRunManager() as { tools: CallbackManager }) + : undefined; + + const response = (await chain + .withConfig({ + runName: context.getNode().name, + callbacks: parentRunManager?.tools, + metadata: { execution_id: context.getExecutionId() }, + }) + .invoke({ query })) as string | string[]; return Array.isArray(response) ? response : [response]; } diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts index 373fd82077aea..cec18a5dc9474 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts @@ -16,6 +16,7 @@ import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode'; import { DynamicTool } from '@langchain/core/tools'; import get from 'lodash/get'; import isObject from 'lodash/isObject'; +import type { CallbackManagerForToolRun } from '@langchain/core/callbacks/manager'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; export class ToolWorkflow implements INodeType { @@ -320,7 +321,10 @@ export class ToolWorkflow implements INodeType { const name = this.getNodeParameter('name', itemIndex) as string; const description = this.getNodeParameter('description', itemIndex) as string; - const runFunction = async (query: string): Promise => { + const runFunction = async ( + query: string, + runManager?: CallbackManagerForToolRun, + ): Promise => { const source = this.getNodeParameter('source', itemIndex) as string; const responsePropertyName = this.getNodeParameter( 'responsePropertyName', @@ -384,8 +388,11 @@ export class ToolWorkflow implements INodeType { const items = [newItem] as INodeExecutionData[]; let receivedData: INodeExecutionData; + console.log('About to pass runManager to executeWorkflow', runManager?.runId); try { - receivedData = (await this.executeWorkflow(workflowInfo, items)) as INodeExecutionData; + receivedData = (await this.executeWorkflow(workflowInfo, items, { + tools: runManager?.getChild(), + })) as INodeExecutionData; } catch (error) { // Make sure a valid error gets returned that can by json-serialized else it will // not show up in the frontend @@ -413,13 +420,13 @@ export class ToolWorkflow implements INodeType { name, description, - func: async (query: string): Promise => { + func: async (query: string, runManager?: CallbackManagerForToolRun): Promise => { const { index } = this.addInputData(NodeConnectionType.AiTool, [[{ json: { query } }]]); let response: string = ''; let executionError: ExecutionError | undefined; try { - response = await runFunction(query); + response = await runFunction(query, runManager); } catch (error) { // TODO: Do some more testing. Issues here should actually fail the workflow // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 5b38259dced02..04c709f2a902e 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -754,6 +754,7 @@ async function executeWorkflow( loadedWorkflowData?: IWorkflowBase; loadedRunData?: IWorkflowExecutionDataProcess; parentWorkflowSettings?: IWorkflowSettings; + tools?: IDataObject; }, ): Promise | IWorkflowExecuteProcess> { const internalHooks = Container.get(InternalHooks); @@ -815,6 +816,7 @@ async function executeWorkflow( workflowData, ); additionalDataIntegrated.executionId = executionId; + additionalDataIntegrated.tools = options.tools; // Make sure we pass on the original executeWorkflow function we received // This one already contains changes to talk to parent process diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 51f66c1574572..c66a97890a89a 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -3482,6 +3482,7 @@ export function getExecuteFunctions( async executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], + tools?: IDataObject, ): Promise { return await additionalData .executeWorkflow(workflowInfo, additionalData, { @@ -3489,6 +3490,7 @@ export function getExecuteFunctions( inputData, parentWorkflowSettings: workflow.settings, node, + tools, }) .then( async (result) => @@ -3714,6 +3716,9 @@ export function getExecuteFunctions( msg, }); }, + getParentRunManager: () => { + return additionalData.tools; + }, }; })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 58a8515aaed7e..ffcbe770d2f42 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -863,6 +863,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], + tools?: IDataObject, ): Promise; getInputConnectionData( inputName: ConnectionTypes, @@ -902,6 +903,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise; copyInputItems(items: INodeExecutionData[], properties: string[]): IDataObject[]; }; + + getParentRunManager(): IDataObject | undefined; }; export interface IExecuteSingleFunctions extends BaseExecutionFunctions { @@ -2049,6 +2052,7 @@ export interface IWorkflowExecuteAdditionalData { loadedWorkflowData?: IWorkflowBase; loadedRunData?: any; parentWorkflowSettings?: IWorkflowSettings; + tools?: IDataObject; }, ) => Promise; executionId?: string; @@ -2080,6 +2084,7 @@ export interface IWorkflowExecuteAdditionalData { nodeType?: string; }, ) => Promise; + tools?: IDataObject; } export type WorkflowExecuteMode = From 0099088c6d48cc3e1e396c58817dce0a153a53f1 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Thu, 4 Apr 2024 16:18:07 +0200 Subject: [PATCH 2/4] Add tracing helper and clean-up previous code Signed-off-by: Oleg Ivaniv --- .../agents/ConversationalAgent/execute.ts | 15 ++--------- .../agents/OpenAiFunctionsAgent/execute.ts | 5 +++- .../agents/PlanAndExecuteAgent/execute.ts | 5 +++- .../agents/Agent/agents/ReActAgent/execute.ts | 6 ++++- .../agents/Agent/agents/SqlAgent/execute.ts | 3 ++- .../OpenAiAssistant/OpenAiAssistant.node.ts | 3 ++- .../nodes/chains/ChainLLM/ChainLlm.node.ts | 7 ++--- .../ChainRetrievalQA/ChainRetrievalQa.node.ts | 3 ++- .../RetrieverWorkflow.node.ts | 14 ++++++---- .../@n8n/nodes-langchain/utils/tracing.ts | 27 +++++++++++++++++++ 10 files changed, 59 insertions(+), 29 deletions(-) create mode 100644 packages/@n8n/nodes-langchain/utils/tracing.ts diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts index 44ff6d31e9ab8..67ebec8469bb6 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts @@ -6,25 +6,19 @@ import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import type { BaseOutputParser } from '@langchain/core/output_parsers'; import { PromptTemplate } from '@langchain/core/prompts'; import { CombiningOutputParser } from 'langchain/output_parsers'; -import type { CallbackManager } from '@langchain/core/callbacks/manager'; import { isChatInstance, getPromptInputByType, getOptionalOutputParsers, getConnectedTools, } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function conversationalAgentExecute( this: IExecuteFunctions, nodeVersion: number, ): Promise { this.logger.verbose('Executing Conversational Agent'); - const parentRunManager = this.getParentRunManager - ? (this.getParentRunManager() as { tools: CallbackManager }) - : undefined; - if (parentRunManager) { - console.log(parentRunManager.tools); - } const model = await this.getInputConnectionData(NodeConnectionType.AiLanguageModel, 0); if (!isChatInstance(model)) { @@ -59,7 +53,6 @@ export async function conversationalAgentExecute( systemMessage: options.systemMessage, humanMessage: options.humanMessage, }, - // callbacks: parentRunManager?.tools ? [parentRunManager.tools] : undefined, }); const returnData: INodeExecutionData[] = []; @@ -108,11 +101,7 @@ export async function conversationalAgentExecute( } let response = await agentExecutor - .withConfig({ - runName: this.getNode().name, - metadata: { execution_id: this.getExecutionId() }, - callbacks: parentRunManager?.tools, - }) + .withConfig(getTracingConfig(this)) .invoke({ input, outputParsers }); if (outputParser) { diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts index b2cc7b68a0b4a..7f9ea2040a782 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts @@ -17,6 +17,7 @@ import { getOptionalOutputParsers, getPromptInputByType, } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function openAiFunctionsAgentExecute( this: IExecuteFunctions, @@ -104,7 +105,9 @@ export async function openAiFunctionsAgentExecute( input = (await prompt.invoke({ input })).value; } - let response = await agentExecutor.call({ input, outputParsers }); + let response = await agentExecutor + .withConfig(getTracingConfig(this)) + .invoke({ input, outputParsers }); if (outputParser) { response = { output: await outputParser.parse(response.output as string) }; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts index 3912dffadf01b..8c4a9667e077f 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts @@ -15,6 +15,7 @@ import { getOptionalOutputParsers, getPromptInputByType, } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function planAndExecuteAgentExecute( this: IExecuteFunctions, @@ -79,7 +80,9 @@ export async function planAndExecuteAgentExecute( input = (await prompt.invoke({ input })).value; } - let response = await agentExecutor.call({ input, outputParsers }); + let response = await agentExecutor + .withConfig(getTracingConfig(this)) + .invoke({ input, outputParsers }); if (outputParser) { response = { output: await outputParser.parse(response.output as string) }; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts index e8f5ea0b5d910..94359aa47f0b2 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts @@ -17,6 +17,7 @@ import { getPromptInputByType, isChatInstance, } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function reActAgentAgentExecute( this: IExecuteFunctions, @@ -100,7 +101,10 @@ export async function reActAgentAgentExecute( input = (await prompt.invoke({ input })).value; } - let response = await agentExecutor.call({ input, outputParsers }); + let response = await agentExecutor + .withConfig(getTracingConfig(this)) + .invoke({ input, outputParsers }); + if (outputParser) { response = { output: await outputParser.parse(response.output as string) }; } diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts index 783a0a86a4bce..e8b989d865bed 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/SqlAgent/execute.ts @@ -14,6 +14,7 @@ import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import type { DataSource } from '@n8n/typeorm'; import { getPromptInputByType, serializeChatHistory } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; import { getSqliteDataSource } from './other/handlers/sqlite'; import { getPostgresDataSource } from './other/handlers/postgres'; import { SQL_PREFIX, SQL_SUFFIX } from './other/prompts'; @@ -126,7 +127,7 @@ export async function sqlAgentAgentExecute( let response: IDataObject; try { - response = await agentExecutor.call({ + response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({ input, signal: this.getExecutionCancelSignal(), chatHistory, diff --git a/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts index 4c7243c7d5957..5dafa6d187296 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts @@ -10,6 +10,7 @@ import type { } from 'n8n-workflow'; import type { OpenAIToolType } from 'langchain/dist/experimental/openai_assistant/schema'; import { getConnectedTools } from '../../../utils/helpers'; +import { getTracingConfig } from '../../../utils/tracing'; import { formatToOpenAIAssistantTool } from './utils'; export class OpenAiAssistant implements INodeType { @@ -373,7 +374,7 @@ export class OpenAiAssistant implements INodeType { tools, }); - const response = await agentExecutor.call({ + const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({ content: input, signal: this.getExecutionCancelSignal(), timeout: options.timeout ?? 10000, diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts index 84246a1cf2641..d01065faf2068 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts @@ -28,6 +28,7 @@ import { getPromptInputByType, isChatInstance, } from '../../../utils/helpers'; +import { getTracingConfig } from '../../../utils/tracing'; interface MessagesTemplate { type: string; @@ -159,11 +160,7 @@ async function createSimpleLLMChain( const chain = new LLMChain({ llm, prompt, - }).withConfig({ - runName: context.getNode().name, - callbacks: parentRunManager?.tools, - metadata: { execution_id: context.getExecutionId() }, - }); + }).withConfig(getTracingConfig(context)); const response = (await chain.invoke({ query, diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts index 07017dcf37471..0652f7cf47174 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts @@ -12,6 +12,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base'; import type { BaseRetriever } from '@langchain/core/retrievers'; import { getTemplateNoticeField } from '../../../utils/sharedFields'; import { getPromptInputByType } from '../../../utils/helpers'; +import { getTracingConfig } from '../../../utils/tracing'; export class ChainRetrievalQa implements INodeType { description: INodeTypeDescription = { @@ -176,7 +177,7 @@ export class ChainRetrievalQa implements INodeType { throw new NodeOperationError(this.getNode(), 'The β€˜queryβ€˜ parameter is empty.'); } - const response = await chain.call({ query }); + const response = await chain.withConfig(getTracingConfig(this)).invoke({ query }); returnData.push({ json: { response } }); } return await this.prepareOutputData(returnData); diff --git a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts index 4f1d288e8308e..acd2780cabde3 100644 --- a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts @@ -16,6 +16,7 @@ import { Document } from '@langchain/core/documents'; import type { SetField, SetNodeOptions } from 'n8n-nodes-base/dist/nodes/Set/v2/helpers/interfaces'; import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode'; +import type { CallbackManagerForRetrieverRun } from '@langchain/core/callbacks/manager'; import { logWrapper } from '../../../utils/logWrapper'; function objectToString(obj: Record | IDataObject, level = 0) { @@ -287,7 +288,11 @@ export class RetrieverWorkflow implements INodeType { this.executeFunctions = executeFunctions; } - async getRelevantDocuments(query: string): Promise { + async _getRelevantDocuments( + query: string, + config?: CallbackManagerForRetrieverRun, + ): Promise { + console.log('πŸš€ ~ WorkflowRetriever ~ getRelevantDocuments ~ callback:', config); const source = this.executeFunctions.getNodeParameter('source', itemIndex) as string; const baseMetadata: IDataObject = { @@ -357,10 +362,9 @@ export class RetrieverWorkflow implements INodeType { let receivedItems: INodeExecutionData[][]; try { - receivedItems = (await this.executeFunctions.executeWorkflow( - workflowInfo, - items, - )) as INodeExecutionData[][]; + receivedItems = (await this.executeFunctions.executeWorkflow(workflowInfo, items, { + tools: config?.getChild(), + })) as INodeExecutionData[][]; } catch (error) { // Make sure a valid error gets returned that can by json-serialized else it will // not show up in the frontend diff --git a/packages/@n8n/nodes-langchain/utils/tracing.ts b/packages/@n8n/nodes-langchain/utils/tracing.ts new file mode 100644 index 0000000000000..4454d28625590 --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/tracing.ts @@ -0,0 +1,27 @@ +import type { BaseCallbackConfig, CallbackManager } from '@langchain/core/callbacks/manager'; +import type { IExecuteFunctions } from 'n8n-workflow'; + +interface TracingConfig { + additionalMetadata?: Record; +} + +export function getTracingConfig( + context: IExecuteFunctions, + config: TracingConfig = {}, +): BaseCallbackConfig { + const parentRunManager = context.getParentRunManager?.() + ? (context.getParentRunManager() as { tools: CallbackManager }) + : undefined; + + console.log('πŸš€ ~ parentRunManager:', parentRunManager); + return { + runName: `[${context.getWorkflow().name}] ${context.getNode().name}`, + metadata: { + execution_id: context.getExecutionId(), + workflow: context.getWorkflow(), + node: context.getNode().name, + ...(config.additionalMetadata ?? {}), + }, + callbacks: parentRunManager?.tools, + }; +} From 705652e5ec76d7fc3ced7110409a5ea41483761b Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Mon, 8 Apr 2024 11:01:04 +0200 Subject: [PATCH 3/4] Cleanup types and add Summarization chain tracing Signed-off-by: Oleg Ivaniv --- .../nodes/chains/ChainLLM/ChainLlm.node.ts | 19 +++---------------- .../V2/ChainSummarizationV2.node.ts | 3 ++- .../RetrieverWorkflow.node.ts | 9 +++++---- .../tools/ToolWorkflow/ToolWorkflow.node.ts | 8 +++++--- .../actions/assistant/message.operation.ts | 3 ++- .../@n8n/nodes-langchain/utils/tracing.ts | 9 ++++----- .../cli/src/WorkflowExecuteAdditionalData.ts | 5 +++-- packages/core/src/NodeExecuteFunctions.ts | 9 ++++----- packages/workflow/package.json | 3 ++- packages/workflow/src/Interfaces.ts | 11 +++++++---- pnpm-lock.yaml | 14 ++++++-------- 11 files changed, 43 insertions(+), 50 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts index d01065faf2068..030eef34a26bc 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts @@ -21,7 +21,6 @@ import { CombiningOutputParser } from 'langchain/output_parsers'; import { LLMChain } from 'langchain/chains'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { HumanMessage } from '@langchain/core/messages'; -import type { CallbackManager } from '@langchain/core/callbacks/manager'; import { getTemplateNoticeField } from '../../../utils/sharedFields'; import { getOptionalOutputParsers, @@ -153,10 +152,6 @@ async function createSimpleLLMChain( query: string, prompt: ChatPromptTemplate | PromptTemplate, ): Promise { - const parentRunManager = context.getParentRunManager - ? (context.getParentRunManager() as { tools: CallbackManager }) - : undefined; - const chain = new LLMChain({ llm, prompt, @@ -209,17 +204,9 @@ async function getChain( ); const chain = prompt.pipe(llm).pipe(combinedOutputParser); - const parentRunManager = context.getParentRunManager - ? (context.getParentRunManager() as { tools: CallbackManager }) - : undefined; - - const response = (await chain - .withConfig({ - runName: context.getNode().name, - callbacks: parentRunManager?.tools, - metadata: { execution_id: context.getExecutionId() }, - }) - .invoke({ query })) as string | string[]; + const response = (await chain.withConfig(getTracingConfig(context)).invoke({ query })) as + | string + | string[]; return Array.isArray(response) ? response : [response]; } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts index c17b2edb0819f..30cab761c1638 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts @@ -18,6 +18,7 @@ import { N8nBinaryLoader } from '../../../../utils/N8nBinaryLoader'; import { getTemplateNoticeField } from '../../../../utils/sharedFields'; import { REFINE_PROMPT_TEMPLATE, DEFAULT_PROMPT_TEMPLATE } from '../prompt'; import { getChainPromptsArgs } from '../helpers'; +import { getTracingConfig } from '../../../../utils/tracing'; function getInputs(parameters: IDataObject) { const chunkingMode = parameters?.chunkingMode; @@ -364,7 +365,7 @@ export class ChainSummarizationV2 implements INodeType { ? await documentInput.processItem(item, itemIndex) : documentInput; - const response = await chain.call({ + const response = await chain.withConfig(getTracingConfig(this)).invoke({ input_documents: processedDocuments, }); diff --git a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts index acd2780cabde3..5b2d852178932 100644 --- a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts @@ -292,7 +292,6 @@ export class RetrieverWorkflow implements INodeType { query: string, config?: CallbackManagerForRetrieverRun, ): Promise { - console.log('πŸš€ ~ WorkflowRetriever ~ getRelevantDocuments ~ callback:', config); const source = this.executeFunctions.getNodeParameter('source', itemIndex) as string; const baseMetadata: IDataObject = { @@ -362,9 +361,11 @@ export class RetrieverWorkflow implements INodeType { let receivedItems: INodeExecutionData[][]; try { - receivedItems = (await this.executeFunctions.executeWorkflow(workflowInfo, items, { - tools: config?.getChild(), - })) as INodeExecutionData[][]; + receivedItems = (await this.executeFunctions.executeWorkflow( + workflowInfo, + items, + config?.getChild(), + )) as INodeExecutionData[][]; } catch (error) { // Make sure a valid error gets returned that can by json-serialized else it will // not show up in the frontend diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts index cec18a5dc9474..eea75056848f0 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts @@ -390,9 +390,11 @@ export class ToolWorkflow implements INodeType { let receivedData: INodeExecutionData; console.log('About to pass runManager to executeWorkflow', runManager?.runId); try { - receivedData = (await this.executeWorkflow(workflowInfo, items, { - tools: runManager?.getChild(), - })) as INodeExecutionData; + receivedData = (await this.executeWorkflow( + workflowInfo, + items, + runManager?.getChild(), + )) as INodeExecutionData; } catch (error) { // Make sure a valid error gets returned that can by json-serialized else it will // not show up in the frontend diff --git a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts index 6d7e669e3923b..f2e994d7eaf31 100644 --- a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts +++ b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts @@ -11,6 +11,7 @@ import { formatToOpenAIAssistantTool } from '../../helpers/utils'; import { assistantRLC } from '../descriptions'; import { getConnectedTools } from '../../../../../utils/helpers'; +import { getTracingConfig } from '../../../../../utils/tracing'; const properties: INodeProperties[] = [ assistantRLC, @@ -181,7 +182,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise | IWorkflowExecuteProcess> { const internalHooks = Container.get(InternalHooks); @@ -816,7 +817,7 @@ async function executeWorkflow( workflowData, ); additionalDataIntegrated.executionId = executionId; - additionalDataIntegrated.tools = options.tools; + additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager; // Make sure we pass on the original executeWorkflow function we received // This one already contains changes to talk to parent process diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index b812ce2b6270d..23b54432c84c1 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -98,6 +98,7 @@ import type { Workflow, WorkflowActivateMode, WorkflowExecuteMode, + CallbackManager, } from 'n8n-workflow'; import { ExpressionError, @@ -3487,7 +3488,7 @@ export function getExecuteFunctions( async executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], - tools?: IDataObject, + parentCallbackManager?: CallbackManager, ): Promise { return await additionalData .executeWorkflow(workflowInfo, additionalData, { @@ -3495,7 +3496,7 @@ export function getExecuteFunctions( inputData, parentWorkflowSettings: workflow.settings, node, - tools, + parentCallbackManager, }) .then( async (result) => @@ -3721,9 +3722,7 @@ export function getExecuteFunctions( msg, }); }, - getParentRunManager: () => { - return additionalData.tools; - }, + getParentCallbackManager: () => additionalData.parentCallbackManager, }; })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } diff --git a/packages/workflow/package.json b/packages/workflow/package.json index 448a8adbe95b5..3ea3049aeb5f1 100644 --- a/packages/workflow/package.json +++ b/packages/workflow/package.json @@ -65,6 +65,7 @@ "recast": "0.21.5", "title-case": "3.0.3", "transliteration": "2.3.5", - "xml2js": "0.6.2" + "xml2js": "0.6.2", + "@langchain/core": "0.1.41" } } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index cbe717af28ce0..2c6f3741f5de4 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -19,6 +19,7 @@ import type { WorkflowHooks } from './WorkflowHooks'; import type { NodeOperationError } from './errors/node-operation.error'; import type { NodeApiError } from './errors/node-api.error'; import type { AxiosProxyConfig } from 'axios'; +import type { CallbackManager as CallbackManagerLC } from '@langchain/core/callbacks/manager'; export interface IAdditionalCredentialOptions { oauth2?: IOAuth2Options; @@ -842,7 +843,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], - tools?: IDataObject, + parentCallbackManager?: CallbackManager, ): Promise; getInputConnectionData( inputName: ConnectionTypes, @@ -883,7 +884,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & copyInputItems(items: INodeExecutionData[], properties: string[]): IDataObject[]; }; - getParentRunManager(): IDataObject | undefined; + getParentCallbackManager(): CallbackManager | undefined; }; export interface IExecuteSingleFunctions extends BaseExecutionFunctions { @@ -2031,7 +2032,7 @@ export interface IWorkflowExecuteAdditionalData { loadedWorkflowData?: IWorkflowBase; loadedRunData?: any; parentWorkflowSettings?: IWorkflowSettings; - tools?: IDataObject; + parentCallbackManager?: CallbackManager; }, ) => Promise; executionId?: string; @@ -2063,7 +2064,7 @@ export interface IWorkflowExecuteAdditionalData { nodeType?: string; }, ) => Promise; - tools?: IDataObject; + parentCallbackManager?: CallbackManager; } export type WorkflowExecuteMode = @@ -2588,3 +2589,5 @@ export type BannerName = export type Functionality = 'regular' | 'configuration-node' | 'pairedItem'; export type Result = { ok: true; result: T } | { ok: false; error: E }; + +export type CallbackManager = CallbackManagerLC; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a7abbcb9eee3d..4f2d107c13456 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1515,6 +1515,9 @@ importers: packages/workflow: dependencies: + '@langchain/core': + specifier: 0.1.41 + version: 0.1.41 '@n8n/tournament': specifier: 1.0.2 version: 1.0.2 @@ -8985,7 +8988,7 @@ packages: ts-dedent: 2.2.0 type-fest: 2.19.0 vue: 3.4.21(typescript@5.4.2) - vue-component-type-helpers: 2.0.7 + vue-component-type-helpers: 2.0.11 transitivePeerDependencies: - encoding - supports-color @@ -12804,11 +12807,6 @@ packages: dependencies: safe-buffer: 5.2.1 - /content-type@1.0.4: - resolution: {integrity: sha512-hIP3EEPs8tB9AT1L+NUqtwOAps4mk2Zob89MWXMHjHWg9milF/j4osnnQLXBCBFBk/tvIG/tUc9mOUJiPBhPXA==} - engines: {node: '>= 0.6'} - dev: false - /content-type@1.0.5: resolution: {integrity: sha512-nTjqfcBFEipKdXCv4YDQWCfmcLZKm81ldF0pAopTvyrFGVbcR6P/VAAd5G7N+0tTr8QqiU0tFadD6FK4NtJwOA==} engines: {node: '>= 0.6'} @@ -25663,8 +25661,8 @@ packages: resolution: {integrity: sha512-0vOfAtI67UjeO1G6UiX5Kd76CqaQ67wrRZiOe7UAb9Jm6GzlUr/fC7CV90XfwapJRjpCMaZFhv1V0ajWRmE9Dg==} dev: true - /vue-component-type-helpers@2.0.7: - resolution: {integrity: sha512-7e12Evdll7JcTIocojgnCgwocX4WzIYStGClBQ+QuWPinZo/vQolv2EMq4a3lg16TKfwWafLimG77bxb56UauA==} + /vue-component-type-helpers@2.0.11: + resolution: {integrity: sha512-8aluKz5oVC8PvVQAYgyIefOlqzKVmAOTCx2imbrFBVLbF7mnJvyMsE2A7rqX/4f4uT6ee9o8u3GcoRpUWc0xsw==} dev: true /vue-demi@0.14.5(vue@3.4.21): From 3275d37a10fa96e6a070cea0ce5fa1555e2722ec Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Mon, 8 Apr 2024 16:09:09 +0200 Subject: [PATCH 4/4] Remove console.log Signed-off-by: Oleg Ivaniv --- .../nodes/tools/ToolWorkflow/ToolWorkflow.node.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts index eea75056848f0..5130cf7d2924d 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts @@ -388,7 +388,6 @@ export class ToolWorkflow implements INodeType { const items = [newItem] as INodeExecutionData[]; let receivedData: INodeExecutionData; - console.log('About to pass runManager to executeWorkflow', runManager?.runId); try { receivedData = (await this.executeWorkflow( workflowInfo,