Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: start parsing #1377

Merged
merged 6 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 169 additions & 18 deletions api/apps/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,35 @@
import warnings
from io import BytesIO

from elasticsearch_dsl import Q
from flask import request, send_file
from flask_login import login_required, current_user
from httpx import HTTPError
from minio import S3Error

from api.contants import NAME_LENGTH_LIMIT
from api.db import FileType, ParserType, FileSource
from api.db import FileType, ParserType, FileSource, TaskStatus
from api.db import StatusEnum
from api.db.db_models import File
from api.db.db_models import File, Task
from api.db.services import duplicate_name
from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.task_service import TaskService
from api.db.services.user_service import TenantService
from api.settings import RetCode
from api.utils import get_uuid
from api.utils.api_utils import construct_json_result, construct_error_response
from api.utils.api_utils import construct_result, validate_request
from api.utils.file_utils import filename_type, thumbnail
from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture
from rag.nlp import search
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO

MAXIMUM_OF_UPLOADING_FILES = 256


# ------------------------------ create a dataset ---------------------------------------

@manager.route("/", methods=["POST"])
Expand Down Expand Up @@ -116,6 +121,7 @@ def create_dataset():
except Exception as e:
return construct_error_response(e)


# -----------------------------list datasets-------------------------------------------------------

@manager.route("/", methods=["GET"])
Expand All @@ -135,6 +141,7 @@ def list_datasets():
except HTTPError as http_err:
return construct_json_result(http_err)


# ---------------------------------delete a dataset ----------------------------

@manager.route("/<dataset_id>", methods=["DELETE"])
Expand Down Expand Up @@ -162,13 +169,15 @@ def remove_dataset(dataset_id):

# delete the dataset
if not KnowledgebaseService.delete_by_id(dataset_id):
return construct_json_result(code=RetCode.DATA_ERROR, message="There was an error during the dataset removal process. "
"Please check the status of the RAGFlow server and try the removal again.")
return construct_json_result(code=RetCode.DATA_ERROR,
message="There was an error during the dataset removal process. "
"Please check the status of the RAGFlow server and try the removal again.")
# success
return construct_json_result(code=RetCode.SUCCESS, message=f"Remove dataset: {dataset_id} successfully")
except Exception as e:
return construct_error_response(e)


# ------------------------------ get details of a dataset ----------------------------------------

@manager.route("/<dataset_id>", methods=["GET"])
Expand All @@ -182,6 +191,7 @@ def get_dataset(dataset_id):
except Exception as e:
return construct_json_result(e)


# ------------------------------ update a dataset --------------------------------------------

@manager.route("/<dataset_id>", methods=["PUT"])
Expand Down Expand Up @@ -209,8 +219,9 @@ def update_dataset(dataset_id):
if name.lower() != dataset.name.lower() \
and len(KnowledgebaseService.query(name=name, tenant_id=current_user.id,
status=StatusEnum.VALID.value)) > 1:
return construct_json_result(code=RetCode.DATA_ERROR, message=f"The name: {name.lower()} is already used by other "
f"datasets. Please choose a different name.")
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"The name: {name.lower()} is already used by other "
f"datasets. Please choose a different name.")

dataset_updating_data = {}
chunk_num = req.get("chunk_num")
Expand All @@ -222,17 +233,21 @@ def update_dataset(dataset_id):
if chunk_num == 0:
dataset_updating_data["embd_id"] = req["embedding_model_id"]
else:
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
return construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
"dataset, so you cannot change the embedding "
"model.")
# only if chunk_num is 0, the user can update the chunk_method
if req.get("chunk_method"):
if chunk_num == 0:
dataset_updating_data['parser_id'] = req["chunk_method"]
else:
if "chunk_method" in req:
type_value = req["chunk_method"]
if is_illegal_value_for_enum(type_value, ParserType):
return construct_json_result(message=f"Illegal value {type_value} for 'chunk_method' field.",
code=RetCode.DATA_ERROR)
if chunk_num != 0:
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document "
"in this dataset, so you cannot "
"change the chunk method.")
dataset_updating_data["parser_id"] = req["template_type"]

# convert the photo parameter to avatar
if req.get("photo"):
dataset_updating_data["avatar"] = req["photo"]
Expand Down Expand Up @@ -265,6 +280,7 @@ def update_dataset(dataset_id):
except Exception as e:
return construct_error_response(e)


# --------------------------------content management ----------------------------------------------

# ----------------------------upload files-----------------------------------------------------
Expand Down Expand Up @@ -339,9 +355,10 @@ def upload_documents(dataset_id):
location += "_"

blob = file.read()

# the content is empty, raising a warning
if blob == b'':
warnings.warn(f"[WARNING]: The file {filename} is empty.")
warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")

MINIO.put(dataset_id, location, blob)

Expand Down Expand Up @@ -453,6 +470,7 @@ def list_documents(dataset_id):
except Exception as e:
return construct_error_response(e)


# ----------------------------update: enable rename-----------------------------------------------------
@manager.route("/<dataset_id>/documents/<document_id>", methods=["PUT"])
@login_required
Expand Down Expand Up @@ -555,6 +573,7 @@ def update_document(dataset_id, document_id):
def is_illegal_value_for_enum(value, enum_class):
return value not in enum_class.__members__.values()


# ----------------------------download a file-----------------------------------------------------
@manager.route("/<dataset_id>/documents/<document_id>", methods=["GET"])
@login_required
Expand All @@ -563,7 +582,8 @@ def download_document(dataset_id, document_id):
# Check whether there is this dataset
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR, message=f"This dataset '{dataset_id}' cannot be found!")
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")

# Check whether there is this document
exist, document = DocumentService.get_by_id(document_id)
Expand Down Expand Up @@ -591,8 +611,142 @@ def download_document(dataset_id, document_id):
except Exception as e:
return construct_error_response(e)

# ----------------------------start parsing-----------------------------------------------------

# ----------------------------start parsing a document-----------------------------------------------------
# helper method for parsing
def dummy(prog=None, msg=""):
JinHai-CN marked this conversation as resolved.
Show resolved Hide resolved
pass


def doc_parse(binary, doc_name, parser_name, tenant_id):
match parser_name:
case "book":
book.chunk(doc_name, binary=binary, callback=dummy)
case "laws":
laws.chunk(doc_name, binary=binary, callback=dummy)
case "manual":
manual.chunk(doc_name, binary=binary, callback=dummy)
case "naive":
cecilia-uu marked this conversation as resolved.
Show resolved Hide resolved
# It's the mode by default, which is general in the front-end
naive.chunk(doc_name, binary=binary, callback=dummy)
case "one":
one.chunk(doc_name, binary=binary, callback=dummy)
case "paper":
paper.chunk(doc_name, binary=binary, callback=dummy)
case "picture":
picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese", callback=dummy)
case "presentation":
presentation.chunk(doc_name, binary=binary, callback=dummy)
case "qa":
qa.chunk(doc_name, binary=binary, callback=dummy)
case "resume":
resume.chunk(doc_name, binary=binary, callback=dummy)
case "table":
table.chunk(doc_name, binary=binary, callback=dummy)
case _:
return False

return True


@manager.route("/<dataset_id>/documents/<document_id>/status", methods=["POST"])
@login_required
def parse_document(dataset_id, document_id):
try:
# valid dataset
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")
message = ""
res = get_message_during_parsing_document(document_id, message)
if isinstance(res, str):
message += res
return construct_json_result(code=RetCode.SUCCESS, message=message)
else:
return res

except Exception as e:
return construct_error_response(e)


# ----------------------------start parsing documents-----------------------------------------------------
@manager.route("/<dataset_id>/documents/status", methods=["POST"])
@login_required
def parse_documents(dataset_id):
doc_ids = request.json["doc_ids"]
try:
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")

def process(doc_ids):
message = ""
# for loop
for id in doc_ids:
res = get_message_during_parsing_document(id, message)
if isinstance(res, str):
message += res
else:
return res
return construct_json_result(data=True, code=RetCode.SUCCESS, message=message)

# two conditions
if doc_ids:
return process(doc_ids)
else:
# documents inside the dataset
docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time",
True, "")
doc_ids = [doc["id"] for doc in docs]
return process(doc_ids)

except Exception as e:
return construct_error_response(e)


# helper method for getting message or response when parsing the document
def get_message_during_parsing_document(id, message):
try:
# Check whether there is this document
exist, document = DocumentService.get_by_id(id)
if not exist:
return construct_json_result(message=f"This document '{id}' cannot be found!",
code=RetCode.ARGUMENT_ERROR)

tenant_id = DocumentService.get_tenant_id(id)
if not tenant_id:
return construct_json_result(message="Tenant not found!", code=RetCode.AUTHENTICATION_ERROR)

info = {"run": "1", "progress": 0}
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0

DocumentService.update_by_id(id, info)

ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id))

_, doc_attributes = DocumentService.get_by_id(id)
doc_attributes = doc_attributes.to_dict()
doc_id = doc_attributes["id"]

bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
binary = MINIO.get(bucket, doc_name)
parser_name = doc_attributes["parser_id"]
if binary:
res = doc_parse(binary, doc_name, parser_name, tenant_id)
if res is False:
message += f"The parser id: {parser_name} of the document {doc_id} is not supported; "
else:
message += f"Empty data in the document: {doc_name}; "
# failed in parsing
if doc_attributes["status"] == TaskStatus.FAIL.value:
message += f"Failed in parsing the document: {doc_id}; "
return message
except Exception as e:
return construct_error_response(e)
# ----------------------------stop parsing-----------------------------------------------------

# ----------------------------show the status of the file-----------------------------------------------------
Expand All @@ -610,6 +764,3 @@ def download_document(dataset_id, document_id):
# ----------------------------get a specific chunk-----------------------------------------------------

# ----------------------------retrieval test-----------------------------------------------------



12 changes: 12 additions & 0 deletions sdk/python/ragflow/ragflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,19 @@ def download_file(self, dataset_id, document_id):
with open(file_path, "wb") as file:
file.write(content)
return {"code": RetCode.SUCCESS, "data": content}

# ----------------------------start parsing-----------------------------------------------------
def start_parsing_document(self, dataset_id, document_id):
endpoint = f"{self.dataset_url}/{dataset_id}/documents/{document_id}/status"
res = requests.post(endpoint, headers=self.authorization_header)

return res.json()

def start_parsing_documents(self, dataset_id, doc_ids=None):
endpoint = f"{self.dataset_url}/{dataset_id}/documents/status"
res = requests.post(endpoint, headers=self.authorization_header, json={"doc_ids": doc_ids})

return res.json()

# ----------------------------stop parsing-----------------------------------------------------

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/test/test_data/lol.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
llll
ooooo
llll
Loading