Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Chunk API #2855

Merged
merged 5 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 104 additions & 86 deletions api/apps/sdk/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,11 @@ def update_doc(tenant_id, dataset_id, document_id):
if informs:
e, file = FileService.get_by_id(informs[0].file_id)
FileService.update_by_id(file.id, {"name": req["name"]})
if "parser_config" in req:
DocumentService.update_parser_config(doc.id, req["parser_config"])
if "parser_method" in req:
if doc.parser_id.lower() == req["parser_method"].lower():
if "parser_config" in req:
if req["parser_config"] == doc.parser_config:
return get_result(retcode=RetCode.SUCCESS)
else:
return get_result(retcode=RetCode.SUCCESS)
return get_result()

if doc.type == FileType.VISUAL or re.search(
r"\.(ppt|pptx|pages)$", doc.name):
Expand All @@ -146,8 +144,6 @@ def update_doc(tenant_id, dataset_id, document_id):
return get_error_data_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(
Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id))
if "parser_config" in req:
DocumentService.update_parser_config(doc.id, req["parser_config"])

return get_result()

Expand Down Expand Up @@ -258,6 +254,8 @@ def parse(tenant_id,dataset_id):
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(retmsg=f"You don't own the dataset {dataset_id}.")
req = request.json
if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required")
for id in req["document_ids"]:
if not DocumentService.query(id=id,kb_id=dataset_id):
return get_error_data_result(retmsg=f"You don't own the document {id}.")
Expand All @@ -283,9 +281,14 @@ def stop_parsing(tenant_id,dataset_id):
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(retmsg=f"You don't own the dataset {dataset_id}.")
req = request.json
if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required")
for id in req["document_ids"]:
if not DocumentService.query(id=id,kb_id=dataset_id):
doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc:
return get_error_data_result(retmsg=f"You don't own the document {id}.")
if doc[0].progress == 100.0 or doc[0].progress == 0.0:
return get_error_data_result("Can't stop parsing document with progress at 0 or 100")
info = {"run": "2", "progress": 0}
DocumentService.update_by_id(id, info)
# if str(req["run"]) == TaskStatus.CANCEL.value:
Expand All @@ -297,7 +300,7 @@ def stop_parsing(tenant_id,dataset_id):

@manager.route('/dataset/<dataset_id>/document/<document_id>/chunk', methods=['GET'])
@token_required
def list_chunk(tenant_id,dataset_id,document_id):
def list_chunks(tenant_id,dataset_id,document_id):
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(retmsg=f"You don't own the dataset {dataset_id}.")
doc=DocumentService.query(id=document_id, kb_id=dataset_id)
Expand All @@ -309,57 +312,58 @@ def list_chunk(tenant_id,dataset_id,document_id):
page = int(req.get("offset", 1))
size = int(req.get("limit", 30))
question = req.get("keywords", "")
try:
query = {
"doc_ids": [doc_id], "page": page, "size": size, "question": question, "sort": True
query = {
"doc_ids": [doc_id], "page": page, "size": size, "question": question, "sort": True
}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
res = {"total": sres.total, "chunks": [], "doc": doc.to_dict()}
origin_chunks = []
sign = 0
for id in sres.ids:
d = {
"chunk_id": id,
"content_with_weight": rmSpace(sres.highlight[id]) if question and id in sres.highlight else sres.field[
id].get(
"content_with_weight", ""),
"doc_id": sres.field[id]["doc_id"],
"docnm_kwd": sres.field[id]["docnm_kwd"],
"important_kwd": sres.field[id].get("important_kwd", []),
"img_id": sres.field[id].get("img_id", ""),
"available_int": sres.field[id].get("available_int", 1),
"positions": sres.field[id].get("position_int", "").split("\t")
}
if "available_int" in req:
query["available_int"] = int(req["available_int"])
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
res = {"total": sres.total, "chunks": [], "doc": doc.to_dict()}

origin_chunks = []
for id in sres.ids:
d = {
"chunk_id": id,
"content_with_weight": rmSpace(sres.highlight[id]) if question and id in sres.highlight else sres.field[
id].get(
"content_with_weight", ""),
"doc_id": sres.field[id]["doc_id"],
"docnm_kwd": sres.field[id]["docnm_kwd"],
"important_kwd": sres.field[id].get("important_kwd", []),
"img_id": sres.field[id].get("img_id", ""),
"available_int": sres.field[id].get("available_int", 1),
"positions": sres.field[id].get("position_int", "").split("\t")
}
if len(d["positions"]) % 5 == 0:
poss = []
for i in range(0, len(d["positions"]), 5):
poss.append([float(d["positions"][i]), float(d["positions"][i + 1]), float(d["positions"][i + 2]),
float(d["positions"][i + 3]), float(d["positions"][i + 4])])
d["positions"] = poss

origin_chunks.append(d)
##rename keys
for chunk in origin_chunks:
key_mapping = {
"chunk_id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
"img_id": "image_id",
}
renamed_chunk = {}
for key, value in chunk.items():
new_key = key_mapping.get(key, key)
renamed_chunk[new_key] = value
res["chunks"].append(renamed_chunk)
return get_result(data=res)
except Exception as e:
if str(e).find("not_found") > 0:
return get_result(retmsg=f'No chunk found!',
retcode=RetCode.DATA_ERROR)
return server_error_response(e)
if len(d["positions"]) % 5 == 0:
poss = []
for i in range(0, len(d["positions"]), 5):
poss.append([float(d["positions"][i]), float(d["positions"][i + 1]), float(d["positions"][i + 2]),
float(d["positions"][i + 3]), float(d["positions"][i + 4])])
d["positions"] = poss

origin_chunks.append(d)
if req.get("id"):
if req.get("id") == id:
origin_chunks.clear()
origin_chunks.append(d)
sign = 1
break
if req.get("id"):
if sign == 0:
return get_error_data_result(f"Can't find this chunk {req.get('id')}")
for chunk in origin_chunks:
key_mapping = {
"chunk_id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
"img_id": "image_id",
}
renamed_chunk = {}
for key, value in chunk.items():
new_key = key_mapping.get(key, key)
renamed_chunk[new_key] = value
res["chunks"].append(renamed_chunk)
return get_result(data=res)



@manager.route('/dataset/<dataset_id>/document/<document_id>/chunk', methods=['POST'])
Expand All @@ -374,15 +378,18 @@ def create(tenant_id,dataset_id,document_id):
req = request.json
if not req.get("content"):
return get_error_data_result(retmsg="`content` is required")
if "important_keywords" in req:
if type(req["important_keywords"]) != list:
return get_error_data_result("`important_keywords` is required to be a list")
md5 = hashlib.md5()
md5.update((req["content"] + document_id).encode("utf-8"))

chunk_id = md5.hexdigest()
d = {"id": chunk_id, "content_ltks": rag_tokenizer.tokenize(req["content"]),
"content_with_weight": req["content"]}
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
d["important_kwd"] = req.get("important_kwd", [])
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req.get("important_kwd", [])))
d["important_kwd"] = req.get("important_keywords", [])
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req.get("important_keywords", [])))
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
d["kb_id"] = [doc.kb_id]
Expand Down Expand Up @@ -432,12 +439,12 @@ def rm_chunk(tenant_id,dataset_id,document_id):
req = request.json
if not req.get("chunk_ids"):
return get_error_data_result("`chunk_ids` is required")
query = {
"doc_ids": [doc.id], "page": 1, "size": 1024, "question": "", "sort": True}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
for chunk_id in req.get("chunk_ids"):
res = ELASTICSEARCH.get(
chunk_id, search.index_name(
tenant_id))
if not res.get("found"):
return server_error_response(f"Chunk {chunk_id} not found")
if chunk_id not in sres.ids:
return get_error_data_result(f"Chunk {chunk_id} not found")
if not ELASTICSEARCH.deleteByQuery(
Q("ids", values=req["chunk_ids"]), search.index_name(tenant_id)):
return get_error_data_result(retmsg="Index updating failure")
Expand All @@ -451,24 +458,36 @@ def rm_chunk(tenant_id,dataset_id,document_id):
@manager.route('/dataset/<dataset_id>/document/<document_id>/chunk/<chunk_id>', methods=['PUT'])
@token_required
def set(tenant_id,dataset_id,document_id,chunk_id):
res = ELASTICSEARCH.get(
try:
res = ELASTICSEARCH.get(
chunk_id, search.index_name(
tenant_id))
if not res.get("found"):
return get_error_data_result(f"Chunk {chunk_id} not found")
except Exception as e:
return get_error_data_result(f"Can't find this chunk {chunk_id}")
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(retmsg=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(retmsg=f"You don't own the document {document_id}.")
doc = doc[0]
query = {
"doc_ids": [document_id], "page": 1, "size": 1024, "question": "", "sort": True
}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
if chunk_id not in sres.ids:
return get_error_data_result(f"You don't own the chunk {chunk_id}")
req = request.json
content=res["_source"].get("content_with_weight")
d = {
"id": chunk_id,
"content_with_weight": req.get("content",res.get["content_with_weight"])}
d["content_ltks"] = rag_tokenizer.tokenize(req["content"])
"content_with_weight": req.get("content",content)}
d["content_ltks"] = rag_tokenizer.tokenize(d["content_with_weight"])
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
d["important_kwd"] = req.get("important_keywords",[])
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req["important_keywords"]))
if "important_keywords" in req:
if type(req["important_keywords"]) != list:
return get_error_data_result("`important_keywords` is required to be a list")
d["important_kwd"] = req.get("important_keywords")
d["important_tks"] = rag_tokenizer.tokenize(" ".join(req["important_keywords"]))
if "available" in req:
d["available_int"] = req["available"]
embd_id = DocumentService.get_embd_id(document_id)
Expand All @@ -478,41 +497,40 @@ def set(tenant_id,dataset_id,document_id,chunk_id):
arr = [
t for t in re.split(
r"[\n\t]",
req["content"]) if len(t) > 1]
d["content_with_weight"]) if len(t) > 1]
if len(arr) != 2:
return get_error_data_result(
retmsg="Q&A must be separated by TAB/ENTER key.")
q, a = rmPrefix(arr[0]), rmPrefix(arr[1])
d = beAdoc(d, arr[0], arr[1], not any(
[rag_tokenizer.is_chinese(t) for t in q + a]))

v, c = embd_mdl.encode([doc.name, req["content"]])
v, c = embd_mdl.encode([doc.name, d["content_with_weight"]])
v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1]
d["q_%d_vec" % len(v)] = v.tolist()
ELASTICSEARCH.upsert([d], search.index_name(tenant_id))
return get_result()



@manager.route('/retrieval', methods=['GET'])
@manager.route('/retrieval', methods=['POST'])
@token_required
def retrieval_test(tenant_id):
req = request.args
req_json = request.json
if not req_json.get("datasets"):
req = request.json
if not req.get("datasets"):
return get_error_data_result("`datasets` is required.")
for id in req_json.get("datasets"):
kb_id = req["datasets"]
if isinstance(kb_id, str): kb_id = [kb_id]
for id in kb_id:
if not KnowledgebaseService.query(id=id,tenant_id=tenant_id):
return get_error_data_result(f"You don't own the dataset {id}.")
if "question" not in req_json:
if "question" not in req:
return get_error_data_result("`question` is required.")
page = int(req.get("offset", 1))
size = int(req.get("limit", 30))
question = req_json["question"]
kb_id = req_json["datasets"]
if isinstance(kb_id, str): kb_id = [kb_id]
doc_ids = req_json.get("documents", [])
similarity_threshold = float(req.get("similarity_threshold", 0.0))
question = req["question"]
doc_ids = req.get("documents", [])
similarity_threshold = float(req.get("similarity_threshold", 0.2))
vector_similarity_weight = float(req.get("vector_similarity_weight", 0.3))
top = int(req.get("top_k", 1024))
if req.get("highlight")=="False" or req.get("highlight")=="false":
Expand Down
Loading