forked from FlowiseAI/Flowise
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request FlowiseAI#1224 from vinodkiran/FEATURE/mongodb
MongoDB Atlas Integration - Chat Memory and Vector Store
- Loading branch information
Showing
10 changed files
with
417 additions
and
2 deletions.
There are no files selected for viewing
25 changes: 25 additions & 0 deletions
25
packages/components/credentials/MongoDBUrlApi.credential.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import { INodeParams, INodeCredential } from '../src/Interface' | ||
|
||
class MongoDBUrlApi implements INodeCredential { | ||
label: string | ||
name: string | ||
version: number | ||
description: string | ||
inputs: INodeParams[] | ||
|
||
constructor() { | ||
this.label = 'MongoDB ATLAS' | ||
this.name = 'mongoDBUrlApi' | ||
this.version = 1.0 | ||
this.inputs = [ | ||
{ | ||
label: 'ATLAS Connection URL', | ||
name: 'mongoDBConnectUrl', | ||
type: 'string', | ||
placeholder: 'mongodb+srv://myDatabaseUser:D1fficultP%40ssw0rd@cluster0.example.mongodb.net/?retryWrites=true&w=majority' | ||
} | ||
] | ||
} | ||
} | ||
|
||
module.exports = { credClass: MongoDBUrlApi } |
146 changes: 146 additions & 0 deletions
146
packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' | ||
import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' | ||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory' | ||
import { BaseMessage, mapStoredMessageToChatMessage } from 'langchain/schema' | ||
import { MongoClient } from 'mongodb' | ||
|
||
class MongoDB_Memory implements INode { | ||
label: string | ||
name: string | ||
version: number | ||
description: string | ||
type: string | ||
icon: string | ||
category: string | ||
baseClasses: string[] | ||
credential: INodeParams | ||
inputs: INodeParams[] | ||
|
||
constructor() { | ||
this.label = 'MongoDB Atlas Chat Memory' | ||
this.name = 'MongoDBAtlasChatMemory' | ||
this.version = 1.0 | ||
this.type = 'MongoDBAtlasChatMemory' | ||
this.icon = 'mongodb.png' | ||
this.category = 'Memory' | ||
this.description = 'Stores the conversation in MongoDB Atlas' | ||
this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)] | ||
this.credential = { | ||
label: 'Connect Credential', | ||
name: 'credential', | ||
type: 'credential', | ||
credentialNames: ['mongoDBUrlApi'] | ||
} | ||
this.inputs = [ | ||
{ | ||
label: 'Database', | ||
name: 'databaseName', | ||
placeholder: '<DB_NAME>', | ||
type: 'string' | ||
}, | ||
{ | ||
label: 'Collection Name', | ||
name: 'collectionName', | ||
placeholder: '<COLLECTION_NAME>', | ||
type: 'string' | ||
}, | ||
{ | ||
label: 'Session Id', | ||
name: 'sessionId', | ||
type: 'string', | ||
description: 'If not specified, the first CHAT_MESSAGE_ID will be used as sessionId', | ||
default: '', | ||
additionalParams: true, | ||
optional: true | ||
}, | ||
{ | ||
label: 'Memory Key', | ||
name: 'memoryKey', | ||
type: 'string', | ||
default: 'chat_history', | ||
additionalParams: true | ||
} | ||
] | ||
} | ||
|
||
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> { | ||
return initializeMongoDB(nodeData, options) | ||
} | ||
|
||
async clearSessionMemory(nodeData: INodeData, options: ICommonObject): Promise<void> { | ||
const mongodbMemory = await initializeMongoDB(nodeData, options) | ||
const sessionId = nodeData.inputs?.sessionId as string | ||
const chatId = options?.chatId as string | ||
options.logger.info(`Clearing MongoDB memory session ${sessionId ? sessionId : chatId}`) | ||
await mongodbMemory.clear() | ||
options.logger.info(`Successfully cleared MongoDB memory session ${sessionId ? sessionId : chatId}`) | ||
} | ||
} | ||
|
||
const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => { | ||
const databaseName = nodeData.inputs?.databaseName as string | ||
const collectionName = nodeData.inputs?.collectionName as string | ||
const sessionId = nodeData.inputs?.sessionId as string | ||
const memoryKey = nodeData.inputs?.memoryKey as string | ||
const chatId = options?.chatId as string | ||
|
||
let isSessionIdUsingChatMessageId = false | ||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true | ||
|
||
const credentialData = await getCredentialData(nodeData.credential ?? '', options) | ||
let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) | ||
|
||
const client = new MongoClient(mongoDBConnectUrl) | ||
await client.connect() | ||
const collection = client.db(databaseName).collection(collectionName) | ||
|
||
const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ | ||
collection, | ||
sessionId: sessionId ? sessionId : chatId | ||
}) | ||
|
||
mongoDBChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => { | ||
const document = await collection.findOne({ | ||
sessionId: (mongoDBChatMessageHistory as any).sessionId | ||
}) | ||
const messages = document?.messages || [] | ||
return messages.map(mapStoredMessageToChatMessage) | ||
} | ||
|
||
mongoDBChatMessageHistory.addMessage = async (message: BaseMessage): Promise<void> => { | ||
const messages = [message].map((msg) => msg.toDict()) | ||
await collection.updateOne( | ||
{ sessionId: (mongoDBChatMessageHistory as any).sessionId }, | ||
{ | ||
$push: { messages: { $each: messages } } | ||
}, | ||
{ upsert: true } | ||
) | ||
} | ||
|
||
mongoDBChatMessageHistory.clear = async (): Promise<void> => { | ||
await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }) | ||
} | ||
|
||
return new BufferMemoryExtended({ | ||
memoryKey, | ||
chatHistory: mongoDBChatMessageHistory, | ||
returnMessages: true, | ||
isSessionIdUsingChatMessageId | ||
}) | ||
} | ||
|
||
interface BufferMemoryExtendedInput { | ||
isSessionIdUsingChatMessageId: boolean | ||
} | ||
|
||
class BufferMemoryExtended extends BufferMemory { | ||
isSessionIdUsingChatMessageId? = false | ||
|
||
constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) { | ||
super(fields) | ||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId | ||
} | ||
} | ||
|
||
module.exports = { nodeClass: MongoDB_Memory } |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
145 changes: 145 additions & 0 deletions
145
packages/components/nodes/vectorstores/MongoDB/MongoDBSearchBase.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
import { | ||
getBaseClasses, | ||
getCredentialData, | ||
getCredentialParam, | ||
ICommonObject, | ||
INodeData, | ||
INodeOutputsValue, | ||
INodeParams | ||
} from '../../../src' | ||
|
||
import { Embeddings } from 'langchain/embeddings/base' | ||
import { VectorStore } from 'langchain/vectorstores/base' | ||
import { Document } from 'langchain/document' | ||
import { MongoDBAtlasVectorSearch } from 'langchain/vectorstores/mongodb_atlas' | ||
import { Collection, MongoClient } from 'mongodb' | ||
|
||
export abstract class MongoDBSearchBase { | ||
label: string | ||
name: string | ||
version: number | ||
description: string | ||
type: string | ||
icon: string | ||
category: string | ||
baseClasses: string[] | ||
inputs: INodeParams[] | ||
credential: INodeParams | ||
outputs: INodeOutputsValue[] | ||
mongoClient: MongoClient | ||
|
||
protected constructor() { | ||
this.type = 'MongoDB Atlas' | ||
this.icon = 'mongodb.png' | ||
this.category = 'Vector Stores' | ||
this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever'] | ||
this.credential = { | ||
label: 'Connect Credential', | ||
name: 'credential', | ||
type: 'credential', | ||
credentialNames: ['mongoDBUrlApi'] | ||
} | ||
this.inputs = [ | ||
{ | ||
label: 'Embeddings', | ||
name: 'embeddings', | ||
type: 'Embeddings' | ||
}, | ||
{ | ||
label: 'Database', | ||
name: 'databaseName', | ||
placeholder: '<DB_NAME>', | ||
type: 'string' | ||
}, | ||
{ | ||
label: 'Collection Name', | ||
name: 'collectionName', | ||
placeholder: '<COLLECTION_NAME>', | ||
type: 'string' | ||
}, | ||
{ | ||
label: 'Index Name', | ||
name: 'indexName', | ||
placeholder: '<VECTOR_INDEX_NAME>', | ||
type: 'string' | ||
}, | ||
{ | ||
label: 'Content Field', | ||
name: 'textKey', | ||
description: 'Name of the field (column) that contains the actual content', | ||
type: 'string', | ||
default: 'text', | ||
additionalParams: true, | ||
optional: true | ||
}, | ||
{ | ||
label: 'Embedded Field', | ||
name: 'embeddingKey', | ||
description: 'Name of the field (column) that contains the Embedding', | ||
type: 'string', | ||
default: 'embedding', | ||
additionalParams: true, | ||
optional: true | ||
}, | ||
{ | ||
label: 'Top K', | ||
name: 'topK', | ||
description: 'Number of top results to fetch. Default to 4', | ||
placeholder: '4', | ||
type: 'number', | ||
additionalParams: true, | ||
optional: true | ||
} | ||
] | ||
this.outputs = [ | ||
{ | ||
label: 'MongoDB Retriever', | ||
name: 'retriever', | ||
baseClasses: this.baseClasses | ||
}, | ||
{ | ||
label: 'MongoDB Vector Store', | ||
name: 'vectorStore', | ||
baseClasses: [this.type, ...getBaseClasses(MongoDBAtlasVectorSearch)] | ||
} | ||
] | ||
} | ||
|
||
abstract constructVectorStore( | ||
embeddings: Embeddings, | ||
collection: Collection, | ||
indexName: string, | ||
textKey: string, | ||
embeddingKey: string, | ||
docs: Document<Record<string, any>>[] | undefined | ||
): Promise<VectorStore> | ||
|
||
async init(nodeData: INodeData, _: string, options: ICommonObject, docs: Document<Record<string, any>>[] | undefined): Promise<any> { | ||
const credentialData = await getCredentialData(nodeData.credential ?? '', options) | ||
const databaseName = nodeData.inputs?.databaseName as string | ||
const collectionName = nodeData.inputs?.collectionName as string | ||
const indexName = nodeData.inputs?.indexName as string | ||
let textKey = nodeData.inputs?.textKey as string | ||
let embeddingKey = nodeData.inputs?.embeddingKey as string | ||
const embeddings = nodeData.inputs?.embeddings as Embeddings | ||
const topK = nodeData.inputs?.topK as string | ||
const k = topK ? parseFloat(topK) : 4 | ||
const output = nodeData.outputs?.output as string | ||
|
||
let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) | ||
|
||
this.mongoClient = new MongoClient(mongoDBConnectUrl) | ||
const collection = this.mongoClient.db(databaseName).collection(collectionName) | ||
if (!textKey || textKey === '') textKey = 'text' | ||
if (!embeddingKey || embeddingKey === '') embeddingKey = 'embedding' | ||
const vectorStore = await this.constructVectorStore(embeddings, collection, indexName, textKey, embeddingKey, docs) | ||
|
||
if (output === 'retriever') { | ||
return vectorStore.asRetriever(k) | ||
} else if (output === 'vectorStore') { | ||
;(vectorStore as any).k = k | ||
return vectorStore | ||
} | ||
return vectorStore | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
packages/components/nodes/vectorstores/MongoDB/MongoDB_Existing.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import { Collection } from 'mongodb' | ||
import { MongoDBAtlasVectorSearch } from 'langchain/vectorstores/mongodb_atlas' | ||
import { Embeddings } from 'langchain/embeddings/base' | ||
import { VectorStore } from 'langchain/vectorstores/base' | ||
import { Document } from 'langchain/document' | ||
import { MongoDBSearchBase } from './MongoDBSearchBase' | ||
import { ICommonObject, INode, INodeData } from '../../../src/Interface' | ||
|
||
class MongoDBExisting_VectorStores extends MongoDBSearchBase implements INode { | ||
constructor() { | ||
super() | ||
this.label = 'MongoDB Atlas Load Existing Index' | ||
this.name = 'MongoDBIndex' | ||
this.version = 1.0 | ||
this.description = 'Load existing data from MongoDB Atlas (i.e: Document has been upserted)' | ||
} | ||
|
||
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> { | ||
return super.init(nodeData, _, options, undefined) | ||
} | ||
|
||
async constructVectorStore( | ||
embeddings: Embeddings, | ||
collection: Collection, | ||
indexName: string, | ||
textKey: string, | ||
embeddingKey: string, | ||
_: Document<Record<string, any>>[] | undefined | ||
): Promise<VectorStore> { | ||
return new MongoDBAtlasVectorSearch(embeddings, { | ||
collection: collection, | ||
indexName: indexName, | ||
textKey: textKey, | ||
embeddingKey: embeddingKey | ||
}) | ||
} | ||
} | ||
|
||
module.exports = { nodeClass: MongoDBExisting_VectorStores } |
Oops, something went wrong.