Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

difference assistant #2701

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ langfuse = "*"
pandasai = "*"
colorlog = "*"
psycopg2-binary = "*"
psycopg2 = "*"
celery = {extras = ["redis", "sqs"], version = "*"}
unstructured = {extras = ["all-docs"], version = "*"}
unstructured = "*"
llama-parse = "*"
llama-index = "*"
lxml = {extras = ["html_clean"], version = "*"}
Expand All @@ -71,6 +70,8 @@ google-api-python-client = "*"
google-auth-httplib2 = "*"
google-auth-oauthlib = "*"
msal = "*"
llama-index-postprocessor-cohere-rerank = "*"
megaparse = "*"

[dev-packages]
black = "*"
Expand Down
1,487 changes: 612 additions & 875 deletions Pipfile.lock

Large diffs are not rendered by default.

116 changes: 116 additions & 0 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
import asyncio
import os
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import List
from uuid import UUID

from celery.schedules import crontab
from celery_config import celery
from fastapi import UploadFile
from logger import get_logger
from middlewares.auth.auth_bearer import AuthBearer
from models.files import File
from models.settings import get_supabase_client, get_supabase_db
from modules.assistant.dto.inputs import InputAssistant
from modules.assistant.ito.difference_assistant import DifferenceAssistant
from modules.assistant.ito.summary import SummaryAssistant
from modules.brain.integrations.Notion.Notion_connector import NotionConnector
from modules.brain.service.brain_service import BrainService
from modules.brain.service.brain_vector_service import BrainVectorService
from modules.notification.dto.inputs import NotificationUpdatableProperties
from modules.notification.entity.notification import NotificationsStatusEnum
from modules.notification.service.notification_service import NotificationService
from modules.onboarding.service.onboarding_service import OnboardingService
from modules.user.entity.user_identity import UserIdentity
from packages.files.crawl.crawler import CrawlWebsite, slugify
from packages.files.parsers.github import process_github
from packages.files.processors import filter_file
Expand Down Expand Up @@ -302,6 +310,110 @@ def check_if_is_premium_user():
return True


@celery.task(name="process_assistant_task")
def process_assistant_task(
input_in: str,
files_name: List[str],
current_user: dict,
notification_id=None,
) -> None:

logger.debug(f"Input: {input}")
logger.debug(type(input))
_input = InputAssistant.model_validate(input_in)
# _input = InputAssistant(**json.loads(input)) # type: ignore
chloedia marked this conversation as resolved.
Show resolved Hide resolved
# _input = InputAssistant(json.dumps(_input))
_current_user = UserIdentity(**current_user) # type: ignore
try:
files = []
supabase_client = get_supabase_client()

for file_name in files_name:
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)

with NamedTemporaryFile(suffix="_" + tmp_name, delete=False) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()

file_instance = File(
chloedia marked this conversation as resolved.
Show resolved Hide resolved
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
upload_file = UploadFile(
filename=file_instance.file_name,
size=file_instance.file_size,
file=BytesIO(file_instance.bytes_content),
headers='{"content-type": "application/pdf"}', # type : ignore
)
files.append(upload_file)

except Exception as e:
chloedia marked this conversation as resolved.
Show resolved Hide resolved
logger.exception(e)
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file: {e}",
),
)
return
loop = asyncio.get_event_loop()

asyncio.set_event_loop(asyncio.new_event_loop())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this code ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a file_descriptor problem if i don't reset the loop


if _input.name.lower() == "summary":
summary_assistant = SummaryAssistant(
input=_input, files=files, current_user=_current_user
)
try:
summary_assistant.check_input()
loop.run_until_complete(summary_assistant.process_assistant())
except ValueError as e:
logger.error(f"ValueError in SummaryAssistant: {e}")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"Error in summary processing: {e}",
),
)
elif _input.name.lower() == "difference":
difference_assistant = DifferenceAssistant(
input=_input, files=files, current_user=_current_user
)
try:
difference_assistant.check_input()
loop.run_until_complete(difference_assistant.process_assistant())
except ValueError as e:
logger.error(f"ValueError in DifferenceAssistant: {e}")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"Error in difference processing: {e}",
),
)
else:
logger.error("Invalid assistant name provided.")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description="Invalid assistant name provided.",
),
)


celery.conf.beat_schedule = {
"remove_onboarding_more_than_x_days_task": {
"task": f"{__name__}.remove_onboarding_more_than_x_days_task",
Expand All @@ -319,4 +431,8 @@ def check_if_is_premium_user():
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_assistant": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the assistant need to run every minute ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

"task": "process_assistant_task",
"schedule": crontab(minute="*/1", hour="*"),
},
}
47 changes: 22 additions & 25 deletions backend/modules/assistant/controller/assistant_routes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from typing import List

from fastapi import APIRouter, Depends, HTTPException, UploadFile
from celery_worker import process_assistant_task
from fastapi import APIRouter, Depends, UploadFile
from logger import get_logger
from middlewares.auth import AuthBearer, get_current_user
from modules.assistant.dto.inputs import InputAssistant
from modules.assistant.dto.outputs import AssistantOutput
from modules.assistant.ito.difference import DifferenceAssistant
from modules.assistant.ito.summary import SummaryAssistant, summary_inputs
from modules.assistant.ito.difference_assistant import difference_inputs
from modules.assistant.ito.summary import summary_inputs
from modules.assistant.service.assistant import Assistant
from modules.notification.service.notification_service import NotificationService
from modules.upload.service.upload_file import upload_file_storage
from modules.user.entity.user_identity import UserIdentity

assistant_router = APIRouter()
logger = get_logger(__name__)

assistant_service = Assistant()
notification_service = NotificationService()


@assistant_router.get(
Expand All @@ -27,10 +31,10 @@ async def list_assistants(
"""

summary = summary_inputs()
# difference = difference_inputs()
difference = difference_inputs()
# crawler = crawler_inputs()
# audio_transcript = audio_transcript_inputs()
return [summary]
return [summary, difference]


@assistant_router.post(
Expand All @@ -40,25 +44,18 @@ async def list_assistants(
)
async def process_assistant(
input: InputAssistant,
files: List[UploadFile] = None,
files: List[UploadFile] = None, # type: ignore
current_user: UserIdentity = Depends(get_current_user),
):
if input.name.lower() == "summary":
summary_assistant = SummaryAssistant(
input=input, files=files, current_user=current_user
)
try:
summary_assistant.check_input()
return await summary_assistant.process_assistant()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
elif input.name.lower() == "difference":
difference_assistant = DifferenceAssistant(
input=input, files=files, current_user=current_user
)
try:
difference_assistant.check_input()
return await difference_assistant.process_assistant()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"message": "Assistant not found"}
files_names = []
for file in files:
file_content = await file.read()
upload_file_storage(file_content, str(file.filename), upsert="true")
files_names.append(file.filename)

process_assistant_task.delay(
input_in=input.model_dump_json(),
files_name=files_names,
current_user=current_user.model_dump(),
)
return {"message": "Assistant is working in the back"}
3 changes: 2 additions & 1 deletion backend/modules/assistant/dto/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ class InputAssistant(BaseModel):
@model_validator(mode="before")
@classmethod
def to_py_dict(cls, data):
return json.loads(data)
if isinstance(data, str):
chloedia marked this conversation as resolved.
Show resolved Hide resolved
return json.loads(data)
Loading
Loading