diff --git a/lib/api-routes/chat-routes.js b/lib/api-routes/chat-routes.js index 4595c696..fec09bff 100644 --- a/lib/api-routes/chat-routes.js +++ b/lib/api-routes/chat-routes.js @@ -36,340 +36,350 @@ function getDateValue(str) { async function init(args) { const { server, call, CORS_CONFIG } = args; - server.route({ - method: 'POST', - path: '/v1/chat/{account}', - - async handler(request, h) { - try { - // throws if account does not exist - let accountObject = new Account({ redis, account: request.params.account, call, secret: await getSecret() }); - let accountData = await accountObject.loadAccountData(); - - let documentStoreEnabled = await settings.get('documentStoreEnabled'); - let documentStoreGenerateEmbeddings = (await settings.get('documentStoreGenerateEmbeddings')) || false; - let hasOpenAiAPIKey = !!(await settings.get('openAiAPIKey')); - - if (!documentStoreEnabled) { - let error = Boom.boomify(new Error('Document store is not enabled'), { statusCode: 403 }); - error.output.payload.code = 'DocumentStoreDisabled'; - throw error; - } - - if (!documentStoreGenerateEmbeddings) { - let error = Boom.boomify(new Error('Chat is not enabled'), { statusCode: 403 }); - error.output.payload.code = 'ChatDisabled'; - throw error; - } + async function processChatRequest(opts) { + let { account, question, index, client } = opts; - if (!hasOpenAiAPIKey) { - let error = Boom.boomify(new Error('OpenAI API key not set'), { statusCode: 403 }); - error.output.payload.code = 'ApiKeyNotSet'; - throw error; - } + // throws if account does not exist + let accountObject = new Account({ redis, account, call, secret: await getSecret() }); + let accountData = await accountObject.loadAccountData(); - let processPipeline = []; + let documentStoreEnabled = await settings.get('documentStoreEnabled'); + let documentStoreGenerateEmbeddings = (await settings.get('documentStoreGenerateEmbeddings')) || false; + let hasOpenAiAPIKey = !!(await settings.get('openAiAPIKey')); - const esClient = await h.getESClient(request.logger); - const { index, client } = esClient; - - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'Received a question', - question: request.payload.question - }); + if (!documentStoreEnabled) { + let error = Boom.boomify(new Error('Document store is not enabled'), { statusCode: 403 }); + error.output.payload.code = 'DocumentStoreDisabled'; + throw error; + } - // Step 1. Embeddings for the request + if (!documentStoreGenerateEmbeddings) { + let error = Boom.boomify(new Error('Chat is not enabled'), { statusCode: 403 }); + error.output.payload.code = 'ChatDisabled'; + throw error; + } - let embeddingsResult; - try { - embeddingsResult = await call({ - cmd: 'generateChunkEmbeddings', - data: { - message: request.payload.question, - account: request.params.account - } - }); - } catch (err) { - err.processPipeline = processPipeline; - throw err; - } + if (!hasOpenAiAPIKey) { + let error = Boom.boomify(new Error('OpenAI API key not set'), { statusCode: 403 }); + error.output.payload.code = 'ApiKeyNotSet'; + throw error; + } - if (!embeddingsResult || !embeddingsResult.embedding) { - let error = new Error('Failed to generate embeddings for query'); - error.processPipeline = processPipeline; - throw error; - } + let processPipeline = []; - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'Generated embeddings for vector search', - model: embeddingsResult?.model, - tokens: embeddingsResult?.usage?.total_tokens - }); + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'Received a question', + question + }); - // Step 2. define sorting options + // Step 1. Embeddings for the request - let sortingResponse; - try { - sortingResponse = await call({ - cmd: 'questionQuery', - data: { - question: request.payload.question, - account: request.params.account - }, - timeout: 3 * 60 * 1000 - }); - } catch (err) { - err.processPipeline = processPipeline; - throw err; + let embeddingsResult; + try { + embeddingsResult = await call({ + cmd: 'generateChunkEmbeddings', + data: { + message: question, + account } + }); + } catch (err) { + err.processPipeline = processPipeline; + throw err; + } - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'Retrieved the sorting options', - ordering: sortingResponse?.ordering, - startTime: sortingResponse?.start_time, - endTime: sortingResponse?.end_time, - model: sortingResponse?.model, - tokens: sortingResponse.tokens - }); + if (!embeddingsResult || !embeddingsResult.embedding) { + let error = new Error('Failed to generate embeddings for query'); + error.processPipeline = processPipeline; + throw error; + } - // Step 3. find matching vectors + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'Generated embeddings for vector search', + model: embeddingsResult?.model, + tokens: embeddingsResult?.usage?.total_tokens + }); + + // Step 2. define sorting options + + let sortingResponse; + try { + sortingResponse = await call({ + cmd: 'questionQuery', + data: { + question, + account + }, + timeout: 3 * 60 * 1000 + }); + } catch (err) { + err.processPipeline = processPipeline; + throw err; + } - const vectorsFilter = { - bool: { - must: [ - { - term: { - account: request.params.account - } - } - ] + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'Retrieved the sorting options', + ordering: sortingResponse?.ordering, + startTime: sortingResponse?.start_time, + endTime: sortingResponse?.end_time, + model: sortingResponse?.model, + tokens: sortingResponse.tokens + }); + + // Step 3. find matching vectors + + const vectorsFilter = { + bool: { + must: [ + { + term: { + account + } } - }; - - let startDate, endDate; - if (sortingResponse?.start_time) { - startDate = getDateValue(sortingResponse?.start_time); - } - if (sortingResponse?.end_time) { - endDate = getDateValue(sortingResponse?.end_time); - } - if (startDate && endDate && startDate.getTime() === endDate.getTime()) { - // use next day value - endDate = new Date(endDate.getTime() + 24 * 3600 * 1000); - } - let dateMatch = {}; - if (startDate) { - dateMatch.gte = startDate; - } - if (endDate) { - dateMatch.lte = endDate; - } - if (Object.keys(dateMatch).length) { - vectorsFilter.bool.must.push({ - range: { date: dateMatch } - }); - } + ] + } + }; - let knnResult; - try { - knnResult = await client.knnSearch({ - index: `${index}.embeddings`, - knn: { - field: 'embeddings', + let startDate, endDate; + if (sortingResponse?.start_time) { + startDate = getDateValue(sortingResponse?.start_time); + } + if (sortingResponse?.end_time) { + endDate = getDateValue(sortingResponse?.end_time); + } + if (startDate && endDate && startDate.getTime() === endDate.getTime()) { + // use next day value + endDate = new Date(endDate.getTime() + 24 * 3600 * 1000); + } + let dateMatch = {}; + if (startDate) { + dateMatch.gte = startDate; + } + if (endDate) { + dateMatch.lte = endDate; + } + if (Object.keys(dateMatch).length) { + vectorsFilter.bool.must.push({ + range: { date: dateMatch } + }); + } - query_vector: embeddingsResult.embedding, - k: 10, - num_candidates: 100 - }, + let knnResult; + try { + knnResult = await client.knnSearch({ + index: `${index}.embeddings`, + knn: { + field: 'embeddings', - filter: vectorsFilter, + query_vector: embeddingsResult.embedding, + k: 10, + num_candidates: 100 + }, - fields: ['id', 'account', 'chunk', 'messageId', 'chunkNr', 'date', 'created'] - }); - } catch (err) { - err.processPipeline = processPipeline; - throw err; - } + filter: vectorsFilter, - if (!knnResult?.hits?.hits?.length) { - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'No matching vectors found from the database', - filter: vectorsFilter?.bool?.must - }); - return { - success: true, - answer: null, - processPipeline - }; - } + fields: ['id', 'account', 'chunk', 'messageId', 'chunkNr', 'date', 'created'] + }); + } catch (err) { + err.processPipeline = processPipeline; + throw err; + } - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'Retrieved matching vectors from the database', - matches: knnResult?.hits?.hits?.length || 0, - filter: vectorsFilter?.bool?.must - }); + if (!knnResult?.hits?.hits?.length) { + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'No matching vectors found from the database', + filter: vectorsFilter?.bool?.must + }); + return { + success: true, + answer: null, + processPipeline + }; + } - let results = []; - knnResult.hits.hits - .map(entry => { - let headerPos = entry._source.chunk.indexOf('\n\n'); - return { - account: entry._source.account, - messageId: entry._source.messageId, - chunkNr: entry._source.chunkNr, - header: entry._source.chunk.substring(0, headerPos), - body: entry._source.chunk.substring(headerPos + 2), - date: entry._source.date, - created: entry._source.created - }; - }) - .forEach(entry => { - let existing = results.find(elm => elm.messageId === entry.messageId); - if (!existing) { - results.push({ - messageId: entry.messageId, - header: `${entry.header}\nMessage-ID: ${entry.messageId}`, - chunks: [{ chunkNr: entry.chunkNr, body: entry.body }], - date: new Date(entry.date || entry.created) - }); - } else { - existing.chunks.push({ chunkNr: entry.chunkNr, body: entry.body }); - } + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'Retrieved matching vectors from the database', + matches: knnResult?.hits?.hits?.length || 0, + filter: vectorsFilter?.bool?.must + }); + + let results = []; + knnResult.hits.hits + .map(entry => { + let headerPos = entry._source.chunk.indexOf('\n\n'); + return { + account: entry._source.account, + messageId: entry._source.messageId, + chunkNr: entry._source.chunkNr, + header: entry._source.chunk.substring(0, headerPos), + body: entry._source.chunk.substring(headerPos + 2), + date: entry._source.date, + created: entry._source.created + }; + }) + .forEach(entry => { + let existing = results.find(elm => elm.messageId === entry.messageId); + if (!existing) { + results.push({ + messageId: entry.messageId, + header: `${entry.header}\nMessage-ID: ${entry.messageId}`, + chunks: [{ chunkNr: entry.chunkNr, body: entry.body }], + date: new Date(entry.date || entry.created) }); - - // sort and slice - switch (sortingResponse?.ordering) { - case 'newer_first': - results = results.sort((a, b) => b.date - a.date); - break; - case 'older_first': - results = results.sort((a, b) => a.date - b.date); - break; + } else { + existing.chunks.push({ chunkNr: entry.chunkNr, body: entry.body }); } - results = results.slice(0, 6); - - let payloadData = results.map((entry, nr) => { - entry.chunks.sort((a, b) => a.chunkNr - b.chunkNr); - return `- EMAIL #${nr + 1}:\n${entry.header}\n\n${entry.chunks - .slice(0, 3) // limit chunks for a single email - .map(chunk => chunk.body) - .join('\n')}`; - }); + }); + + // sort and slice + switch (sortingResponse?.ordering) { + case 'newer_first': + results = results.sort((a, b) => b.date - a.date); + break; + case 'older_first': + results = results.sort((a, b) => a.date - b.date); + break; + } + results = results.slice(0, 6); + + let payloadData = results.map((entry, nr) => { + entry.chunks.sort((a, b) => a.chunkNr - b.chunkNr); + return `- EMAIL #${nr + 1}:\n${entry.header}\n\n${entry.chunks + .slice(0, 3) // limit chunks for a single email + .map(chunk => chunk.body) + .join('\n')}`; + }); + + // Step 4. Send the question with context emails + + let responseData = {}; + + let queryResponse; + try { + queryResponse = await call({ + cmd: 'embeddingsQuery', + data: { + question, + contextChunks: payloadData.join('\n\n'), + account, + userData: { name: accountData.name, email: accountData.email } + }, + timeout: 3 * 60 * 1000 + }); + } catch (err) { + err.processPipeline = processPipeline; + throw err; + } - // Step 4. Send the question with context emails + processPipeline.push({ + timestamp: new Date().toISOString(), + message: 'Retrieved the answer', + messages: queryResponse?.messageId?.length || 0, + model: queryResponse?.model, + tokens: queryResponse.tokens + }); - let responseData = {}; + if (queryResponse?.answer) { + responseData.answer = queryResponse?.answer; + } - let queryResponse; - try { - queryResponse = await call({ - cmd: 'embeddingsQuery', - data: { - question: request.payload.question, - contextChunks: payloadData.join('\n\n'), - account: request.params.account, - userData: { name: accountData.name, email: accountData.email } + if (queryResponse?.messageId) { + let searchQuery = { + bool: { + must: [ + { + term: { + account + } }, - timeout: 3 * 60 * 1000 - }); - } catch (err) { - err.processPipeline = processPipeline; - throw err; + { + terms: { + messageId: queryResponse.messageId + } + } + ] } + }; - processPipeline.push({ - timestamp: new Date().toISOString(), - message: 'Retrieved the answer', - messages: queryResponse?.messageId?.length || 0, - model: queryResponse?.model, - tokens: queryResponse.tokens + // find the originating message this bounce applies for + let searchResult; + try { + searchResult = await client.search({ + index, + size: 20, + from: 0, + query: searchQuery, + sort: { uid: 'desc' }, + _source_excludes: 'headers,text' }); + } catch (err) { + err.processPipeline = processPipeline; + throw err; + } - if (queryResponse?.answer) { - responseData.answer = queryResponse?.answer; - } + if (LOG_VERBOSE) { + console.error(util.inspect({ searchQuery, searchResult }, false, 8, true)); + } - if (queryResponse?.messageId) { - let searchQuery = { - bool: { - must: [ - { - term: { - account: request.params.account - } - }, - { - terms: { - messageId: queryResponse.messageId - } - } - ] + if (searchResult && searchResult.hits && searchResult.hits.hits && searchResult.hits.hits.length) { + let seenIds = new Set(); + responseData.messages = searchResult.hits.hits + .map(message => message._source) + .sort((a, b) => { + if (a.messageSpecialUse === '\\Inbox') { + return -1; } - }; - - // find the originating message this bounce applies for - let searchResult; - try { - searchResult = await client.search({ - index, - size: 20, - from: 0, - query: searchQuery, - sort: { uid: 'desc' }, - _source_excludes: 'headers,text' - }); - } catch (err) { - err.processPipeline = processPipeline; - throw err; - } + if (b.messageSpecialUse === '\\Inbox') { + return 1; + } + return new Date(a.date || a.created) - new Date(b.date || b.created); + }) + .filter(message => { + if (seenIds.has(message.messageId)) { + return false; + } + seenIds.add(message.messageId); + return true; + }) + .sort((a, b) => new Date(b.date) - new Date(a.date)) + .map(message => { + let responseData = {}; + for (let key of ['id', 'path', 'date', 'from', 'to', 'cc', 'bcc', 'subject', 'messageSpecialUse']) { + if (message[key]) { + responseData[key] = message[key]; + } + } + return responseData; + }); + } + } - if (LOG_VERBOSE) { - console.error(util.inspect({ searchQuery, searchResult }, false, 8, true)); - } + return { + success: !!(responseData.answer || responseData.message), + ...responseData, + processPipeline + }; + } - if (searchResult && searchResult.hits && searchResult.hits.hits && searchResult.hits.hits.length) { - let seenIds = new Set(); - responseData.messages = searchResult.hits.hits - .map(message => message._source) - .sort((a, b) => { - if (a.messageSpecialUse === '\\Inbox') { - return -1; - } - if (b.messageSpecialUse === '\\Inbox') { - return 1; - } - return new Date(a.date || a.created) - new Date(b.date || b.created); - }) - .filter(message => { - if (seenIds.has(message.messageId)) { - return false; - } - seenIds.add(message.messageId); - return true; - }) - .sort((a, b) => new Date(b.date) - new Date(a.date)) - .map(message => { - let responseData = {}; - for (let key of ['id', 'path', 'date', 'from', 'to', 'cc', 'bcc', 'subject', 'messageSpecialUse']) { - if (message[key]) { - responseData[key] = message[key]; - } - } - return responseData; - }); - } - } + server.route({ + method: 'POST', + path: '/v1/chat/{account}', - return { - success: !!(responseData.answer || responseData.message), - ...responseData, - processPipeline - }; + async handler(request, h) { + try { + const esClient = await h.getESClient(request.logger); + const { index, client } = esClient; + return await processChatRequest({ + account: request.params.account, + question: request.payload.question, + index, + client + }); } catch (err) { request.logger.error({ msg: 'API request failed', err }); if (Boom.isBoom(err)) { @@ -459,6 +469,56 @@ async function init(args) { } } }); + + server.route({ + method: 'POST', + path: '/admin/config/document-store/chat/test', + async handler(request, h) { + try { + const esClient = await h.getESClient(request.logger); + const { index, client } = esClient; + + return await processChatRequest({ + account: request.payload.account, + question: request.payload.question, + index, + client + }); + } catch (err) { + request.logger.error({ + msg: 'Failed posting request', + command: 'info', + err + }); + return { + success: false, + error: err.message + }; + } + }, + options: { + validate: { + options: { + stripUnknown: true, + abortEarly: false, + convert: true + }, + + failAction, + + payload: Joi.object({ + account: accountIdSchema.required(), + question: Joi.string() + .trim() + .max(1024) + .example('When did Jason last message me?') + .description('Chat message to use') + .label('ChatMessage') + .required() + }) + } + } + }); } module.exports = init; diff --git a/views/config/document-store/chat.hbs b/views/config/document-store/chat.hbs index f221c154..b4faaa1d 100644 --- a/views/config/document-store/chat.hbs +++ b/views/config/document-store/chat.hbs @@ -41,26 +41,46 @@ emails processed when the feature is active. To generate embeddings for previously stored emails, utilize the Flush API method. This will force EmailEngine to re-process all - emails - in the Document Store. + emails in the Document Store. + + + + -