Skip to content

Commit

Permalink
Cleanup types and add Summarization chain tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
  • Loading branch information
OlegIvaniv committed Apr 8, 2024
1 parent 4ca6d84 commit 705652e
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { CombiningOutputParser } from 'langchain/output_parsers';
import { LLMChain } from 'langchain/chains';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import { HumanMessage } from '@langchain/core/messages';
import type { CallbackManager } from '@langchain/core/callbacks/manager';
import { getTemplateNoticeField } from '../../../utils/sharedFields';
import {
getOptionalOutputParsers,
Expand Down Expand Up @@ -153,10 +152,6 @@ async function createSimpleLLMChain(
query: string,
prompt: ChatPromptTemplate | PromptTemplate,
): Promise<string[]> {
const parentRunManager = context.getParentRunManager
? (context.getParentRunManager() as { tools: CallbackManager })
: undefined;

const chain = new LLMChain({
llm,
prompt,
Expand Down Expand Up @@ -209,17 +204,9 @@ async function getChain(
);

const chain = prompt.pipe(llm).pipe(combinedOutputParser);
const parentRunManager = context.getParentRunManager
? (context.getParentRunManager() as { tools: CallbackManager })
: undefined;

const response = (await chain
.withConfig({
runName: context.getNode().name,
callbacks: parentRunManager?.tools,
metadata: { execution_id: context.getExecutionId() },
})
.invoke({ query })) as string | string[];
const response = (await chain.withConfig(getTracingConfig(context)).invoke({ query })) as
| string
| string[];

return Array.isArray(response) ? response : [response];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { N8nBinaryLoader } from '../../../../utils/N8nBinaryLoader';
import { getTemplateNoticeField } from '../../../../utils/sharedFields';
import { REFINE_PROMPT_TEMPLATE, DEFAULT_PROMPT_TEMPLATE } from '../prompt';
import { getChainPromptsArgs } from '../helpers';
import { getTracingConfig } from '../../../../utils/tracing';

function getInputs(parameters: IDataObject) {
const chunkingMode = parameters?.chunkingMode;
Expand Down Expand Up @@ -364,7 +365,7 @@ export class ChainSummarizationV2 implements INodeType {
? await documentInput.processItem(item, itemIndex)
: documentInput;

const response = await chain.call({
const response = await chain.withConfig(getTracingConfig(this)).invoke({
input_documents: processedDocuments,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ export class RetrieverWorkflow implements INodeType {
query: string,
config?: CallbackManagerForRetrieverRun,
): Promise<Document[]> {
console.log('🚀 ~ WorkflowRetriever ~ getRelevantDocuments ~ callback:', config);
const source = this.executeFunctions.getNodeParameter('source', itemIndex) as string;

const baseMetadata: IDataObject = {
Expand Down Expand Up @@ -362,9 +361,11 @@ export class RetrieverWorkflow implements INodeType {

let receivedItems: INodeExecutionData[][];
try {
receivedItems = (await this.executeFunctions.executeWorkflow(workflowInfo, items, {
tools: config?.getChild(),
})) as INodeExecutionData[][];
receivedItems = (await this.executeFunctions.executeWorkflow(
workflowInfo,
items,
config?.getChild(),
)) as INodeExecutionData[][];
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,11 @@ export class ToolWorkflow implements INodeType {
let receivedData: INodeExecutionData;
console.log('About to pass runManager to executeWorkflow', runManager?.runId);
try {
receivedData = (await this.executeWorkflow(workflowInfo, items, {
tools: runManager?.getChild(),
})) as INodeExecutionData;
receivedData = (await this.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
)) as INodeExecutionData;
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { formatToOpenAIAssistantTool } from '../../helpers/utils';
import { assistantRLC } from '../descriptions';

import { getConnectedTools } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';

const properties: INodeProperties[] = [
assistantRLC,
Expand Down Expand Up @@ -181,7 +182,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
tools: tools ?? [],
});

const response = await agentExecutor.invoke({
const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
content: input,
signal: this.getExecutionCancelSignal(),
timeout: options.timeout ?? 10000,
Expand Down
9 changes: 4 additions & 5 deletions packages/@n8n/nodes-langchain/utils/tracing.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { BaseCallbackConfig, CallbackManager } from '@langchain/core/callbacks/manager';
import type { BaseCallbackConfig } from '@langchain/core/callbacks/manager';
import type { IExecuteFunctions } from 'n8n-workflow';

interface TracingConfig {
Expand All @@ -9,11 +9,10 @@ export function getTracingConfig(
context: IExecuteFunctions,
config: TracingConfig = {},
): BaseCallbackConfig {
const parentRunManager = context.getParentRunManager?.()
? (context.getParentRunManager() as { tools: CallbackManager })
const parentRunManager = context.getParentCallbackManager
? context.getParentCallbackManager()
: undefined;

console.log('🚀 ~ parentRunManager:', parentRunManager);
return {
runName: `[${context.getWorkflow().name}] ${context.getNode().name}`,
metadata: {
Expand All @@ -22,6 +21,6 @@ export function getTracingConfig(
node: context.getNode().name,
...(config.additionalMetadata ?? {}),
},
callbacks: parentRunManager?.tools,
callbacks: parentRunManager,
};
}
5 changes: 3 additions & 2 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type {
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
CallbackManager,
} from 'n8n-workflow';
import {
ApplicationError,
Expand Down Expand Up @@ -754,7 +755,7 @@ async function executeWorkflow(
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
tools?: IDataObject;
parentCallbackManager?: CallbackManager;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const internalHooks = Container.get(InternalHooks);
Expand Down Expand Up @@ -816,7 +817,7 @@ async function executeWorkflow(
workflowData,
);
additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.tools = options.tools;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;

// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
Expand Down
9 changes: 4 additions & 5 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import type {
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
CallbackManager,
} from 'n8n-workflow';
import {
ExpressionError,
Expand Down Expand Up @@ -3487,15 +3488,15 @@ export function getExecuteFunctions(
async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
tools?: IDataObject,
parentCallbackManager?: CallbackManager,
): Promise<any> {
return await additionalData
.executeWorkflow(workflowInfo, additionalData, {
parentWorkflowId: workflow.id?.toString(),
inputData,
parentWorkflowSettings: workflow.settings,
node,
tools,
parentCallbackManager,
})
.then(
async (result) =>
Expand Down Expand Up @@ -3721,9 +3722,7 @@ export function getExecuteFunctions(
msg,
});
},
getParentRunManager: () => {
return additionalData.tools;
},
getParentCallbackManager: () => additionalData.parentCallbackManager,
};
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/workflow/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"recast": "0.21.5",
"title-case": "3.0.3",
"transliteration": "2.3.5",
"xml2js": "0.6.2"
"xml2js": "0.6.2",
"@langchain/core": "0.1.41"
}
}
11 changes: 7 additions & 4 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { WorkflowHooks } from './WorkflowHooks';
import type { NodeOperationError } from './errors/node-operation.error';
import type { NodeApiError } from './errors/node-api.error';
import type { AxiosProxyConfig } from 'axios';
import type { CallbackManager as CallbackManagerLC } from '@langchain/core/callbacks/manager';

export interface IAdditionalCredentialOptions {
oauth2?: IOAuth2Options;
Expand Down Expand Up @@ -842,7 +843,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
tools?: IDataObject,
parentCallbackManager?: CallbackManager,
): Promise<any>;
getInputConnectionData(
inputName: ConnectionTypes,
Expand Down Expand Up @@ -883,7 +884,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
copyInputItems(items: INodeExecutionData[], properties: string[]): IDataObject[];
};

getParentRunManager(): IDataObject | undefined;
getParentCallbackManager(): CallbackManager | undefined;
};

export interface IExecuteSingleFunctions extends BaseExecutionFunctions {
Expand Down Expand Up @@ -2031,7 +2032,7 @@ export interface IWorkflowExecuteAdditionalData {
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: any;
parentWorkflowSettings?: IWorkflowSettings;
tools?: IDataObject;
parentCallbackManager?: CallbackManager;
},
) => Promise<any>;
executionId?: string;
Expand Down Expand Up @@ -2063,7 +2064,7 @@ export interface IWorkflowExecuteAdditionalData {
nodeType?: string;
},
) => Promise<void>;
tools?: IDataObject;
parentCallbackManager?: CallbackManager;
}

export type WorkflowExecuteMode =
Expand Down Expand Up @@ -2588,3 +2589,5 @@ export type BannerName =
export type Functionality = 'regular' | 'configuration-node' | 'pairedItem';

export type Result<T, E> = { ok: true; result: T } | { ok: false; error: E };

export type CallbackManager = CallbackManagerLC;
14 changes: 6 additions & 8 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 705652e

Please sign in to comment.