Skip to content

Commit

Permalink
Merge pull request #1200 from FlowiseAI/feature/OpenAI-Assistant
Browse files Browse the repository at this point in the history
Feature/open ai assistant
  • Loading branch information
HenryHengZJ authored Nov 9, 2023
2 parents be19cc6 + 4d2ef9b commit 3502ee7
Show file tree
Hide file tree
Showing 29 changed files with 774 additions and 138 deletions.
229 changes: 160 additions & 69 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams, IUsedTool } from '../../../src/Interface'
import OpenAI from 'openai'
import { DataSource } from 'typeorm'
import { getCredentialData, getCredentialParam, getUserHome } from '../../../src/utils'
import { MessageContentImageFile, MessageContentText } from 'openai/resources/beta/threads/messages/messages'
import * as fsDefault from 'node:fs'
import * as path from 'node:path'
import fetch from 'node-fetch'
import { flatten } from 'lodash'
import { zodToJsonSchema } from 'zod-to-json-schema'

class OpenAIAssistant_Agents implements INode {
label: string
Expand Down Expand Up @@ -33,6 +35,12 @@ class OpenAIAssistant_Agents implements INode {
name: 'selectedAssistant',
type: 'asyncOptions',
loadMethod: 'listAssistants'
},
{
label: 'Allowed Tools',
name: 'tools',
type: 'Tool',
list: true
}
]
}
Expand Down Expand Up @@ -78,19 +86,28 @@ class OpenAIAssistant_Agents implements INode {
id: selectedAssistantId
})

if (!assistant) throw new Error(`Assistant ${selectedAssistantId} not found`)
if (!assistant) {
options.logger.error(`Assistant ${selectedAssistantId} not found`)
return
}

if (!sessionId && options.chatId) {
const chatmsg = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
chatId: options.chatId
})
if (!chatmsg) throw new Error(`Chat Message with Chat Id: ${options.chatId} not found`)
if (!chatmsg) {
options.logger.error(`Chat Message with Chat Id: ${options.chatId} not found`)
return
}
sessionId = chatmsg.sessionId
}

const credentialData = await getCredentialData(assistant.credential ?? '', options)
const openAIApiKey = getCredentialParam('openAIApiKey', credentialData, nodeData)
if (!openAIApiKey) throw new Error(`OpenAI ApiKey not found`)
if (!openAIApiKey) {
options.logger.error(`OpenAI ApiKey not found`)
return
}

const openai = new OpenAI({ apiKey: openAIApiKey })
options.logger.info(`Clearing OpenAI Thread ${sessionId}`)
Expand All @@ -102,6 +119,9 @@ class OpenAIAssistant_Agents implements INode {
const selectedAssistantId = nodeData.inputs?.selectedAssistant as string
const appDataSource = options.appDataSource as DataSource
const databaseEntities = options.databaseEntities as IDatabaseEntity
let tools = nodeData.inputs?.tools
tools = flatten(tools)
const formattedTools = tools?.map((tool: any) => formatToOpenAIAssistantTool(tool)) ?? []

const assistant = await appDataSource.getRepository(databaseEntities['Assistant']).findOneBy({
id: selectedAssistantId
Expand All @@ -116,83 +136,143 @@ class OpenAIAssistant_Agents implements INode {
const openai = new OpenAI({ apiKey: openAIApiKey })

// Retrieve assistant
const assistantDetails = JSON.parse(assistant.details)
const openAIAssistantId = assistantDetails.id
const retrievedAssistant = await openai.beta.assistants.retrieve(openAIAssistantId)

const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
chatId: options.chatId
})
try {
const assistantDetails = JSON.parse(assistant.details)
const openAIAssistantId = assistantDetails.id
const retrievedAssistant = await openai.beta.assistants.retrieve(openAIAssistantId)

let threadId = ''
if (!chatmessage) {
const thread = await openai.beta.threads.create({})
threadId = thread.id
} else {
const thread = await openai.beta.threads.retrieve(chatmessage.sessionId)
threadId = thread.id
}

// Add message to thread
await openai.beta.threads.messages.create(threadId, {
role: 'user',
content: input
})

// Run assistant thread
const runThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id
})
if (formattedTools.length) {
await openai.beta.assistants.update(openAIAssistantId, { tools: formattedTools })
}

const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => {
const timeout = setInterval(async () => {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status
if (state === 'completed') {
clearInterval(timeout)
resolve(run)
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
clearInterval(timeout)
reject(new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
}
}, 500)
const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
chatId: options.chatId
})
}

// Polling run status
await promise(threadId, runThread.id)
let threadId = ''
if (!chatmessage) {
const thread = await openai.beta.threads.create({})
threadId = thread.id
} else {
const thread = await openai.beta.threads.retrieve(chatmessage.sessionId)
threadId = thread.id
}

// List messages
const messages = await openai.beta.threads.messages.list(threadId)
const messageData = messages.data ?? []
const assistantMessages = messageData.filter((msg) => msg.role === 'assistant')
if (!assistantMessages.length) return ''
// Add message to thread
await openai.beta.threads.messages.create(threadId, {
role: 'user',
content: input
})

let returnVal = ''
for (let i = 0; i < assistantMessages[0].content.length; i += 1) {
if (assistantMessages[0].content[i].type === 'text') {
const content = assistantMessages[0].content[i] as MessageContentText
returnVal += content.text.value
// Run assistant thread
const runThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id
})

//TODO: handle annotations
} else {
const content = assistantMessages[0].content[i] as MessageContentImageFile
const fileId = content.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', `${fileObj.filename}.png`)
const usedTools: IUsedTool[] = []

const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => {
const timeout = setInterval(async () => {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status
if (state === 'completed') {
clearInterval(timeout)
resolve(state)
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
clearInterval(timeout)
const actions: ICommonObject[] = []
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
const args = JSON.parse(functionCall.arguments)
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
})

const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue
const toolOutput = await tool.call(actions[i].toolInput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
}

if (submitToolOutputs.length) {
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
tool_outputs: submitToolOutputs
})
resolve(state)
} else {
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}. submit_tool_outputs.tool_calls are empty`
)
)
}
}
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
clearInterval(timeout)
reject(new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
}
}, 500)
})
}

await downloadFile(fileObj, filePath, openAIApiKey)
// Polling run status
let state = await promise(threadId, runThread.id)
while (state === 'requires_action') {
state = await promise(threadId, runThread.id)
}

const bitmap = fsDefault.readFileSync(filePath)
const base64String = Buffer.from(bitmap).toString('base64')
// List messages
const messages = await openai.beta.threads.messages.list(threadId)
const messageData = messages.data ?? []
const assistantMessages = messageData.filter((msg) => msg.role === 'assistant')
if (!assistantMessages.length) return ''

let returnVal = ''
for (let i = 0; i < assistantMessages[0].content.length; i += 1) {
if (assistantMessages[0].content[i].type === 'text') {
const content = assistantMessages[0].content[i] as MessageContentText
returnVal += content.text.value

//TODO: handle annotations
} else {
const content = assistantMessages[0].content[i] as MessageContentImageFile
const fileId = content.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', `${fileObj.filename}.png`)

await downloadFile(fileObj, filePath, openAIApiKey)

const bitmap = fsDefault.readFileSync(filePath)
const base64String = Buffer.from(bitmap).toString('base64')

const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
returnVal += imgHTML
}
}

const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
returnVal += imgHTML
return {
text: returnVal,
usedTools,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThread.id, messages: messageData }
}
} catch (error) {
throw new Error(error)
}

return { text: returnVal, assistant: { assistantId: openAIAssistantId, threadId, runId: runThread.id, messages: messageData } }
}
}

Expand Down Expand Up @@ -221,4 +301,15 @@ const downloadFile = async (fileObj: any, filePath: string, openAIApiKey: string
}
}

const formatToOpenAIAssistantTool = (tool: any): OpenAI.Beta.AssistantCreateParams.AssistantToolsFunction => {
return {
type: 'function',
function: {
name: tool.name,
description: tool.description,
parameters: zodToJsonSchema(tool.schema)
}
}
}

module.exports = { nodeClass: OpenAIAssistant_Agents }
3 changes: 2 additions & 1 deletion packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
"srt-parser-2": "^1.2.3",
"vm2": "^3.9.19",
"weaviate-ts-client": "^1.1.0",
"ws": "^8.9.0"
"ws": "^8.9.0",
"zod-to-json-schema": "^3.21.4"
},
"devDependencies": {
"@types/gulp": "4.0.9",
Expand Down
6 changes: 6 additions & 0 deletions packages/components/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ export interface IMessage {
type: MessageType
}

export interface IUsedTool {
tool: string
toolInput: object
toolOutput: string | object
}

/**
* Classes
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"description": "Agent optimized for vector retrieval during conversation and answering questions based on previous dialogue.",
"badge": "POPULAR",
"nodes": [
{
"width": 300,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"description": "Text file QnA using conversational retrieval QA chain",
"badge": "POPULAR",
"nodes": [
{
"width": 300,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"description": "Flowise Docs Github QnA using conversational retrieval QA chain",
"badge": "POPULAR",
"nodes": [
{
"width": 300,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"description": "Return response as a list (array) instead of a string/text",
"badge": "NEW",
"nodes": [
{
"width": 300,
Expand Down
Loading

0 comments on commit 3502ee7

Please sign in to comment.