Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration): implementation #2191

Merged
merged 8 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ paulgraham.py
supabase/seed-airwallex.sql
airwallexpayouts.py
application.log
backend/celerybeat-schedule.db

6 changes: 5 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ watchdog = "*"
langchain-community = "*"
langchain-openai = "*"
pydantic-settings = "*"
unstructured = {extras = ["all-docs"], version = "*"}
langfuse = "*"
pandasai = "*"
colorlog = "*"
psycopg2-binary = "*"
psycopg2 = "*"
unstructured = {extras = ["all-docs"], version = "*"}

[dev-packages]
black = "*"
Expand Down
365 changes: 246 additions & 119 deletions Pipfile.lock

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions backend/logger.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
import logging
from logging.handlers import RotatingFileHandler

from colorlog import (
ColoredFormatter,
) # You need to install this package: pip install colorlog


def get_logger(logger_name, log_level=logging.INFO, log_file="application.log"):
logger = logging.getLogger(logger_name)
logger.setLevel(log_level)
logger.propagate = False # Prevent log propagation to avoid double logging

formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s [%(lineno)d]: %(message)s"
"[%(levelname)s] %(name)s [%(filename)s:%(lineno)d]: %(message)s"
)

color_formatter = ColoredFormatter(
"%(log_color)s[%(levelname)s]%(reset)s %(name)s [%(filename)s:%(lineno)d]: %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
reset=True,
style="%",
)

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setFormatter(color_formatter)

file_handler = RotatingFileHandler(
log_file, maxBytes=5000000, backupCount=5
Expand Down
44 changes: 44 additions & 0 deletions backend/manage_services.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash

SESSION_NAME="my_services"

start_services() {
# Create a new tmux session
tmux new-session -d -s $SESSION_NAME

# Split the window into panes for each service
tmux split-window -h
tmux split-window -v
tmux select-pane -t 0
tmux split-window -v

# Start each service in its pane
tmux send-keys -t $SESSION_NAME:0.0 'echo "Starting backend-core...";pipenv run uvicorn main:app --reload --host 0.0.0.0 --port 5050 --workers 6' C-m
tmux send-keys -t $SESSION_NAME:0.1 'echo "Starting worker...";pipenv run celery -A celery_worker worker -l info' C-m
tmux send-keys -t $SESSION_NAME:0.2 'echo "Starting beat...";pipenv run celery -A celery_worker beat -l info' C-m
tmux send-keys -t $SESSION_NAME:0.3 'echo "Starting flower...";pipenv run celery -A celery_worker flower -l info --port=5555' C-m

echo "Services started in tmux session '$SESSION_NAME'"
echo "Use 'tmux attach-session -t $SESSION_NAME' to view logs"
}

stop_services() {
# Kill the tmux session
tmux kill-session -t $SESSION_NAME
echo "Services stopped"
}

view_logs() {
# Attach to the tmux session to view logs
tmux attach-session -t $SESSION_NAME
}

if [ "$1" == "start" ]; then
start_services
elif [ "$1" == "stop" ]; then
stop_services
elif [ "$1" == "logs" ]; then
view_logs
else
echo "Usage: $0 {start|stop|logs}"
fi
Comment on lines +36 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crotte de nez pour la clope electronique, utilise switch case plus lisible

match "$1":
  case "start:
         start_services
    etc
    case _:
        action-default

2 changes: 1 addition & 1 deletion backend/modules/brain/entity/integration_brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class IntegrationDescriptionEntity(BaseModel):


class IntegrationEntity(BaseModel):
id: str
id: int
user_id: str
brain_id: str
integration_id: str
Expand Down
62 changes: 62 additions & 0 deletions backend/modules/brain/integrations/GPT4/Brain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
from typing import AsyncIterable
from uuid import UUID

from langchain_community.chat_models import ChatLiteLLM
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from modules.brain.knowledge_brain_qa import KnowledgeBrainQA
from modules.chat.dto.chats import ChatQuestion


class GPT4Brain(KnowledgeBrainQA):
"""This is the Notion brain class. it is a KnowledgeBrainQA has the data is stored locally.
It is going to call the Data Store internally to get the data.

Args:
KnowledgeBrainQA (_type_): A brain that store the knowledge internaly
"""

def __init__(
self,
**kwargs,
):
super().__init__(
**kwargs,
)

def get_chain(self):

prompt = ChatPromptTemplate.from_messages(
[
("system", "You are GPT-4 powered by Quivr. You are an assistant."),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)

chain = prompt | ChatLiteLLM(
model="gpt-4-0125-preview", max_tokens=self.max_tokens
)

return chain

async def generate_stream(
self, chat_id: UUID, question: ChatQuestion, save_answer: bool = True
) -> AsyncIterable:
conversational_qa_chain = self.get_chain()
transformed_history, streamed_chat_history = (
self.initialize_streamed_chat_history(chat_id, question)
)
response_tokens = []

async for chunk in conversational_qa_chain.astream(
{
"question": question.question,
"chat_history": transformed_history,
}
):
response_tokens.append(chunk.content)
streamed_chat_history.assistant = chunk.content
yield f"data: {json.dumps(streamed_chat_history.dict())}"

self.save_answer(question, response_tokens, streamed_chat_history, save_answer)
Empty file.
100 changes: 100 additions & 0 deletions backend/modules/brain/integrations/SQL/Brain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
from typing import AsyncIterable
from uuid import UUID

from langchain_community.chat_models import ChatLiteLLM
from langchain_community.utilities import SQLDatabase
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from modules.brain.integrations.SQL.SQL_connector import SQLConnector
from modules.brain.knowledge_brain_qa import KnowledgeBrainQA
from modules.brain.repository.integration_brains import IntegrationBrain
from modules.chat.dto.chats import ChatQuestion


class SQLBrain(KnowledgeBrainQA, IntegrationBrain):
"""This is the Notion brain class. it is a KnowledgeBrainQA has the data is stored locally.
It is going to call the Data Store internally to get the data.

Args:
KnowledgeBrainQA (_type_): A brain that store the knowledge internaly
"""

uri: str = None
db: SQLDatabase = None
sql_connector: SQLConnector = None

def __init__(
self,
**kwargs,
):
super().__init__(
**kwargs,
)
self.sql_connector = SQLConnector(self.brain_id, self.user_id)

def get_schema(self, _):
return self.db.get_table_info()

def run_query(self, query):
return self.db.run(query)

def get_chain(self):
template = """Based on the table schema below, write a SQL query that would answer the user's question:
{schema}

Question: {question}
SQL Query:"""
prompt = ChatPromptTemplate.from_template(template)

self.db = SQLDatabase.from_uri(self.sql_connector.credentials["uri"])

model = ChatLiteLLM(model=self.model)

sql_response = (
RunnablePassthrough.assign(schema=self.get_schema)
| prompt
| model.bind(stop=["\nSQLResult:"])
| StrOutputParser()
)

template = """Based on the table schema below, question, sql query, and sql response, write a natural language response and the query that was used to generate it.:
{schema}

Question: {question}
SQL Query: {query}
SQL Response: {response}"""
prompt_response = ChatPromptTemplate.from_template(template)

full_chain = (
RunnablePassthrough.assign(query=sql_response).assign(
schema=self.get_schema,
response=lambda x: self.db.run(x["query"]),
)
| prompt_response
| model
)

return full_chain

async def generate_stream(
self, chat_id: UUID, question: ChatQuestion, save_answer: bool = True
) -> AsyncIterable:

conversational_qa_chain = self.get_chain()
transformed_history, streamed_chat_history = (
self.initialize_streamed_chat_history(chat_id, question)
)
response_tokens = []

async for chunk in conversational_qa_chain.astream(
{
"question": question.question,
}
):
response_tokens.append(chunk.content)
streamed_chat_history.assistant = chunk.content
yield f"data: {json.dumps(streamed_chat_history.dict())}"

self.save_answer(question, response_tokens, streamed_chat_history, save_answer)
41 changes: 41 additions & 0 deletions backend/modules/brain/integrations/SQL/SQL_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from logger import get_logger
from modules.brain.entity.integration_brain import IntegrationEntity
from modules.brain.repository.integration_brains import IntegrationBrain
from modules.knowledge.repository.knowledge_interface import KnowledgeInterface
from modules.knowledge.service.knowledge_service import KnowledgeService

logger = get_logger(__name__)


class SQLConnector(IntegrationBrain):
"""A class to interact with an SQL database"""

credentials: dict[str, str] = None
integration_details: IntegrationEntity = None
brain_id: str = None
user_id: str = None
knowledge_service: KnowledgeInterface

def __init__(self, brain_id: str, user_id: str):
super().__init__()
self.brain_id = brain_id
self.user_id = user_id
self._load_credentials()
self.knowledge_service = KnowledgeService()

def _load_credentials(self) -> dict[str, str]:
"""Load the Notion credentials"""
self.integration_details = self.get_integration_brain(
self.brain_id, self.user_id
)
if self.credentials is None:
logger.info("Loading Notion credentials")
self.integration_details.credentials = {
"uri": self.integration_details.settings.get("uri", "")
}
self.update_integration_brain(
self.brain_id, self.user_id, self.integration_details
)
self.credentials = self.integration_details.credentials
else: # pragma: no cover
self.credentials = self.integration_details.credentials
Empty file.
Loading
Loading