Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB Atlas Integration - Chat Memory and Vector Store #1224

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/components/credentials/MongoDBUrlApi.credential.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { INodeParams, INodeCredential } from '../src/Interface'

class MongoDBUrlApi implements INodeCredential {
label: string
name: string
version: number
description: string
inputs: INodeParams[]

constructor() {
this.label = 'MongoDB ATLAS'
this.name = 'mongoDBUrlApi'
this.version = 1.0
this.inputs = [
{
label: 'ATLAS Connection URL',
name: 'mongoDBConnectUrl',
type: 'string',
placeholder: 'mongodb+srv://myDatabaseUser:D1fficultP%40ssw0rd@cluster0.example.mongodb.net/?retryWrites=true&w=majority'
}
]
}
}

module.exports = { credClass: MongoDBUrlApi }
146 changes: 146 additions & 0 deletions packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
import { BaseMessage, mapStoredMessageToChatMessage } from 'langchain/schema'
import { MongoClient } from 'mongodb'

class MongoDB_Memory implements INode {
label: string
name: string
version: number
description: string
type: string
icon: string
category: string
baseClasses: string[]
credential: INodeParams
inputs: INodeParams[]

constructor() {
this.label = 'MongoDB Atlas Chat Memory'
this.name = 'MongoDBAtlasChatMemory'
this.version = 1.0
this.type = 'MongoDBAtlasChatMemory'
this.icon = 'mongodb.png'
this.category = 'Memory'
this.description = 'Stores the conversation in MongoDB Atlas'
this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]
this.credential = {
label: 'Connect Credential',
name: 'credential',
type: 'credential',
credentialNames: ['mongoDBUrlApi']
}
this.inputs = [
{
label: 'Database',
name: 'databaseName',
placeholder: '<DB_NAME>',
type: 'string'
},
{
label: 'Collection Name',
name: 'collectionName',
placeholder: '<COLLECTION_NAME>',
type: 'string'
},
{
label: 'Session Id',
name: 'sessionId',
type: 'string',
description: 'If not specified, the first CHAT_MESSAGE_ID will be used as sessionId',
default: '',
additionalParams: true,
optional: true
},
{
label: 'Memory Key',
name: 'memoryKey',
type: 'string',
default: 'chat_history',
additionalParams: true
}
]
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
return initializeMongoDB(nodeData, options)
}

async clearSessionMemory(nodeData: INodeData, options: ICommonObject): Promise<void> {
const mongodbMemory = await initializeMongoDB(nodeData, options)
const sessionId = nodeData.inputs?.sessionId as string
const chatId = options?.chatId as string
options.logger.info(`Clearing MongoDB memory session ${sessionId ? sessionId : chatId}`)
await mongodbMemory.clear()
options.logger.info(`Successfully cleared MongoDB memory session ${sessionId ? sessionId : chatId}`)
}
}

const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
const databaseName = nodeData.inputs?.databaseName as string
const collectionName = nodeData.inputs?.collectionName as string
const sessionId = nodeData.inputs?.sessionId as string
const memoryKey = nodeData.inputs?.memoryKey as string
const chatId = options?.chatId as string

let isSessionIdUsingChatMessageId = false
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true

const credentialData = await getCredentialData(nodeData.credential ?? '', options)
let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData)

const client = new MongoClient(mongoDBConnectUrl)
await client.connect()
const collection = client.db(databaseName).collection(collectionName)

const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({
collection,
sessionId: sessionId ? sessionId : chatId
})

mongoDBChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => {
const document = await collection.findOne({
sessionId: (mongoDBChatMessageHistory as any).sessionId
})
const messages = document?.messages || []
return messages.map(mapStoredMessageToChatMessage)
}

mongoDBChatMessageHistory.addMessage = async (message: BaseMessage): Promise<void> => {
const messages = [message].map((msg) => msg.toDict())
await collection.updateOne(
{ sessionId: (mongoDBChatMessageHistory as any).sessionId },
{
$push: { messages: { $each: messages } }
},
{ upsert: true }
)
}

mongoDBChatMessageHistory.clear = async (): Promise<void> => {
await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId })
}

return new BufferMemoryExtended({
memoryKey,
chatHistory: mongoDBChatMessageHistory,
returnMessages: true,
isSessionIdUsingChatMessageId
})
}

interface BufferMemoryExtendedInput {
isSessionIdUsingChatMessageId: boolean
}

class BufferMemoryExtended extends BufferMemory {
isSessionIdUsingChatMessageId? = false

constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) {
super(fields)
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
}
}

module.exports = { nodeClass: MongoDB_Memory }
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ElasicsearchUpsert_VectorStores extends ElasticSearchBase implements INode
delete d.metadata.loc
})
// end of workaround
return super.init(nodeData, _, options, flattenDocs)
return super.init(nodeData, _, options, finalDocs)
}
}

Expand Down
145 changes: 145 additions & 0 deletions packages/components/nodes/vectorstores/MongoDB/MongoDBSearchBase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import {
getBaseClasses,
getCredentialData,
getCredentialParam,
ICommonObject,
INodeData,
INodeOutputsValue,
INodeParams
} from '../../../src'

import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base'
import { Document } from 'langchain/document'
import { MongoDBAtlasVectorSearch } from 'langchain/vectorstores/mongodb_atlas'
import { Collection, MongoClient } from 'mongodb'

export abstract class MongoDBSearchBase {
label: string
name: string
version: number
description: string
type: string
icon: string
category: string
baseClasses: string[]
inputs: INodeParams[]
credential: INodeParams
outputs: INodeOutputsValue[]
mongoClient: MongoClient

protected constructor() {
this.type = 'MongoDB Atlas'
this.icon = 'mongodb.png'
this.category = 'Vector Stores'
this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever']
this.credential = {
label: 'Connect Credential',
name: 'credential',
type: 'credential',
credentialNames: ['mongoDBUrlApi']
}
this.inputs = [
{
label: 'Embeddings',
name: 'embeddings',
type: 'Embeddings'
},
{
label: 'Database',
name: 'databaseName',
placeholder: '<DB_NAME>',
type: 'string'
},
{
label: 'Collection Name',
name: 'collectionName',
placeholder: '<COLLECTION_NAME>',
type: 'string'
},
{
label: 'Index Name',
name: 'indexName',
placeholder: '<VECTOR_INDEX_NAME>',
type: 'string'
},
{
label: 'Content Field',
name: 'textKey',
description: 'Name of the field (column) that contains the actual content',
type: 'string',
default: 'text',
additionalParams: true,
optional: true
},
{
label: 'Embedded Field',
name: 'embeddingKey',
description: 'Name of the field (column) that contains the Embedding',
type: 'string',
default: 'embedding',
additionalParams: true,
optional: true
},
{
label: 'Top K',
name: 'topK',
description: 'Number of top results to fetch. Default to 4',
placeholder: '4',
type: 'number',
additionalParams: true,
optional: true
}
]
this.outputs = [
{
label: 'MongoDB Retriever',
name: 'retriever',
baseClasses: this.baseClasses
},
{
label: 'MongoDB Vector Store',
name: 'vectorStore',
baseClasses: [this.type, ...getBaseClasses(MongoDBAtlasVectorSearch)]
}
]
}

abstract constructVectorStore(
embeddings: Embeddings,
collection: Collection,
indexName: string,
textKey: string,
embeddingKey: string,
docs: Document<Record<string, any>>[] | undefined
): Promise<VectorStore>

async init(nodeData: INodeData, _: string, options: ICommonObject, docs: Document<Record<string, any>>[] | undefined): Promise<any> {
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const databaseName = nodeData.inputs?.databaseName as string
const collectionName = nodeData.inputs?.collectionName as string
const indexName = nodeData.inputs?.indexName as string
let textKey = nodeData.inputs?.textKey as string
let embeddingKey = nodeData.inputs?.embeddingKey as string
const embeddings = nodeData.inputs?.embeddings as Embeddings
const topK = nodeData.inputs?.topK as string
const k = topK ? parseFloat(topK) : 4
const output = nodeData.outputs?.output as string

let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData)

this.mongoClient = new MongoClient(mongoDBConnectUrl)
const collection = this.mongoClient.db(databaseName).collection(collectionName)
if (!textKey || textKey === '') textKey = 'text'
if (!embeddingKey || embeddingKey === '') embeddingKey = 'embedding'
const vectorStore = await this.constructVectorStore(embeddings, collection, indexName, textKey, embeddingKey, docs)

if (output === 'retriever') {
return vectorStore.asRetriever(k)
} else if (output === 'vectorStore') {
;(vectorStore as any).k = k
return vectorStore
}
return vectorStore
}
}
39 changes: 39 additions & 0 deletions packages/components/nodes/vectorstores/MongoDB/MongoDB_Existing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Collection } from 'mongodb'
import { MongoDBAtlasVectorSearch } from 'langchain/vectorstores/mongodb_atlas'
import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base'
import { Document } from 'langchain/document'
import { MongoDBSearchBase } from './MongoDBSearchBase'
import { ICommonObject, INode, INodeData } from '../../../src/Interface'

class MongoDBExisting_VectorStores extends MongoDBSearchBase implements INode {
constructor() {
super()
this.label = 'MongoDB Atlas Load Existing Index'
this.name = 'MongoDBIndex'
this.version = 1.0
this.description = 'Load existing data from MongoDB Atlas (i.e: Document has been upserted)'
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
return super.init(nodeData, _, options, undefined)
}

async constructVectorStore(
embeddings: Embeddings,
collection: Collection,
indexName: string,
textKey: string,
embeddingKey: string,
_: Document<Record<string, any>>[] | undefined
): Promise<VectorStore> {
return new MongoDBAtlasVectorSearch(embeddings, {
collection: collection,
indexName: indexName,
textKey: textKey,
embeddingKey: embeddingKey
})
}
}

module.exports = { nodeClass: MongoDBExisting_VectorStores }
Loading