Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agent api #1888

Merged
merged 33 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
69a1b56
Update wikipedia.py
guoyuhao2330 Jul 15, 2024
6bbb1df
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 15, 2024
4e3067a
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 17, 2024
bc8d5ad
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 17, 2024
678994d
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 17, 2024
268c0c7
Update requirements.txt
guoyuhao2330 Jul 17, 2024
757ca5b
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 18, 2024
dfe71cb
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 18, 2024
7aba129
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 18, 2024
e52e08f
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 19, 2024
c140c70
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 19, 2024
92cf451
Update llm_service.py
guoyuhao2330 Jul 19, 2024
ad5a5b8
Update index.tsx
guoyuhao2330 Jul 19, 2024
4b49bb9
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 22, 2024
12baffc
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 22, 2024
585a0f6
Update init_data.py
guoyuhao2330 Jul 22, 2024
f3b3451
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 25, 2024
31af72c
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 25, 2024
e23f5b5
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 26, 2024
941c29d
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 26, 2024
bace5b5
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 26, 2024
8936941
Merge branch 'infiniflow:main' into main
guoyuhao2330 Jul 29, 2024
4fe66d8
Merge branch 'infiniflow:main' into main
guoyuhao2330 Aug 2, 2024
3cad1dd
Merge branch 'infiniflow:main' into main
guoyuhao2330 Aug 5, 2024
4ce659e
Merge branch 'infiniflow:main' into main
guoyuhao2330 Aug 7, 2024
764de5b
Merge branch 'infiniflow:main' into main
guoyuhao2330 Aug 8, 2024
addfea5
Update __init__.py
guoyuhao2330 Aug 8, 2024
ab10427
Merge branch 'infiniflow:main' into main
guoyuhao2330 Aug 9, 2024
7ce918d
Update api_app.py
guoyuhao2330 Aug 9, 2024
b51f9fd
Update db_models.py
guoyuhao2330 Aug 9, 2024
30c2276
Update api_app.py
guoyuhao2330 Aug 9, 2024
ecbf507
Update api_app.py
guoyuhao2330 Aug 9, 2024
eb2c31a
Update api_app.py
guoyuhao2330 Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 152 additions & 56 deletions api/apps/api_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@
from api.utils.file_utils import filename_type, thumbnail
from rag.utils.minio_conn import MINIO

from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService
from agent.canvas import Canvas
from functools import partial


def generate_confirmation_token(tenent_id):
serializer = URLSafeTimedSerializer(tenent_id)
return "ragflow-" + serializer.dumps(get_uuid(), salt=tenent_id)[2:34]


@manager.route('/new_token', methods=['POST'])
@validate_request("dialog_id")
@login_required
def new_token():
req = request.json
Expand All @@ -57,12 +60,17 @@ def new_token():

tenant_id = tenants[0].tenant_id
obj = {"tenant_id": tenant_id, "token": generate_confirmation_token(tenant_id),
"dialog_id": req["dialog_id"],
"create_time": current_timestamp(),
"create_date": datetime_format(datetime.now()),
"update_time": None,
"update_date": None
}
if req.get("canvas_id"):
obj["dialog_id"] = req["canvas_id"]
obj["source"] = "agent"
else:
obj["dialog_id"] = req["dialog_id"]

if not APITokenService.save(**obj):
return get_data_error_result(retmsg="Fail to new a dialog!")

Expand Down Expand Up @@ -112,15 +120,15 @@ def stats():
"from_date",
(datetime.now() -
timedelta(
days=7)).strftime("%Y-%m-%d 24:00:00")),
days=7)).strftime("%Y-%m-%d 24:00:00")),
request.args.get(
"to_date",
datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
res = {
"pv": [(o["dt"], o["pv"]) for o in objs],
"uv": [(o["dt"], o["uv"]) for o in objs],
"speed": [(o["dt"], float(o["tokens"])/(float(o["duration"]+0.1))) for o in objs],
"tokens": [(o["dt"], float(o["tokens"])/1000.) for o in objs],
"speed": [(o["dt"], float(o["tokens"]) / (float(o["duration"] + 0.1))) for o in objs],
"tokens": [(o["dt"], float(o["tokens"]) / 1000.) for o in objs],
"round": [(o["dt"], o["round"]) for o in objs],
"thumb_up": [(o["dt"], o["thumb_up"]) for o in objs]
}
Expand All @@ -138,21 +146,31 @@ def set_conversation():
data=False, retmsg='Token is not valid!"', retcode=RetCode.AUTHENTICATION_ERROR)
req = request.json
try:
e, dia = DialogService.get_by_id(objs[0].dialog_id)
if not e:
return get_data_error_result(retmsg="Dialog not found")
conv = {
"id": get_uuid(),
"dialog_id": dia.id,
"user_id": request.args.get("user_id", ""),
"message": [{"role": "assistant", "content": dia.prompt_config["prologue"]}]
}
API4ConversationService.save(**conv)
e, conv = API4ConversationService.get_by_id(conv["id"])
if not e:
return get_data_error_result(retmsg="Fail to new a conversation!")
conv = conv.to_dict()
return get_json_result(data=conv)
if objs[0].source == "agent":
e, c = UserCanvasService.get_by_id(objs[0].dialog_id)
if not e:
return server_error_response("canvas not found.")
conv = {
"id": get_uuid(),
"dialog_id": c.id,
"user_id": request.args.get("user_id", ""),
"message": [{"role": "assistant", "content": "Hi there!"}],
"source": "agent"
}
API4ConversationService.save(**conv)
return get_json_result(data=conv)
else:
e, dia = DialogService.get_by_id(objs[0].dialog_id)
if not e:
return get_data_error_result(retmsg="Dialog not found")
conv = {
"id": get_uuid(),
"dialog_id": dia.id,
"user_id": request.args.get("user_id", ""),
"message": [{"role": "assistant", "content": dia.prompt_config["prologue"]}]
}
API4ConversationService.save(**conv)
return get_json_result(data=conv)
except Exception as e:
return server_error_response(e)

Expand All @@ -161,7 +179,8 @@ def set_conversation():
@validate_request("conversation_id", "messages")
def completion():
token = request.headers.get('Authorization').split()[1]
if not APIToken.query(token=token):
objs = APIToken.query(token=token)
if not objs:
return get_json_result(
data=False, retmsg='Token is not valid!"', retcode=RetCode.AUTHENTICATION_ERROR)
req = request.json
Expand All @@ -178,7 +197,100 @@ def completion():
continue
msg.append({"role": m["role"], "content": m["content"]})

def fillin_conv(ans):
nonlocal conv
if not conv.reference:
conv.reference.append(ans["reference"])
else:
conv.reference[-1] = ans["reference"]
conv.message[-1] = {"role": "assistant", "content": ans["answer"]}

def rename_field(ans):
reference = ans['reference']
if not isinstance(reference, dict):
return
for chunk_i in reference.get('chunks', []):
if 'docnm_kwd' in chunk_i:
chunk_i['doc_name'] = chunk_i['docnm_kwd']
chunk_i.pop('docnm_kwd')

try:
if conv.source == "agent":
stream = req.get("stream", True)
conv.message.append(msg[-1])
e, cvs = UserCanvasService.get_by_id(conv.dialog_id)
if not e:
return server_error_response("canvas not found.")
del req["conversation_id"]
del req["messages"]

if not isinstance(cvs.dsl, str):
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)

if not conv.reference:
conv.reference = []
conv.message.append({"role": "assistant", "content": ""})
conv.reference.append({"chunks": [], "doc_aggs": []})

final_ans = {"reference": [], "content": ""}
canvas = Canvas(cvs.dsl, objs[0].tenant_id)

canvas.messages.append(msg[-1])
canvas.add_user_input(msg[-1]["content"])
answer = canvas.run(stream=stream)

assert answer is not None, "Nothing. Is it over?"

if stream:
assert isinstance(answer, partial), "Nothing. Is it over?"

def sse():
nonlocal answer, cvs, conv
try:
for ans in answer():
for k in ans.keys():
final_ans[k] = ans[k]
ans = {"answer": ans["content"], "reference": ans.get("reference", [])}
fillin_conv(ans)
rename_field(ans)
yield "data:" + json.dumps({"retcode": 0, "retmsg": "", "data": ans},
ensure_ascii=False) + "\n\n"

canvas.messages.append({"role": "assistant", "content": final_ans["content"]})
if final_ans.get("reference"):
canvas.reference.append(final_ans["reference"])
cvs.dsl = json.loads(str(canvas))
API4ConversationService.append_message(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({"retcode": 500, "retmsg": str(e),
"data": {"answer": "**ERROR**: " + str(e), "reference": []}},
ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"retcode": 0, "retmsg": "", "data": True}, ensure_ascii=False) + "\n\n"

resp = Response(sse(), mimetype="text/event-stream")
resp.headers.add_header("Cache-control", "no-cache")
resp.headers.add_header("Connection", "keep-alive")
resp.headers.add_header("X-Accel-Buffering", "no")
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
return resp

final_ans["content"] = "\n".join(answer["content"]) if "content" in answer else ""
canvas.messages.append({"role": "assistant", "content": final_ans["content"]})
if final_ans.get("reference"):
canvas.reference.append(final_ans["reference"])
cvs.dsl = json.loads(str(canvas))

result = None
for ans in answer():
ans = {"answer": ans["content"], "reference": ans.get("reference", [])}
result = ans
fillin_conv(ans)
API4ConversationService.append_message(conv.id, conv.to_dict())
break
rename_field(result)
return get_json_result(data=result)

#******************For dialog******************
conv.message.append(msg[-1])
e, dia = DialogService.get_by_id(conv.dialog_id)
if not e:
Expand All @@ -191,35 +303,20 @@ def completion():
conv.message.append({"role": "assistant", "content": ""})
conv.reference.append({"chunks": [], "doc_aggs": []})

def fillin_conv(ans):
nonlocal conv
if not conv.reference:
conv.reference.append(ans["reference"])
else: conv.reference[-1] = ans["reference"]
conv.message[-1] = {"role": "assistant", "content": ans["answer"]}

def rename_field(ans):
reference = ans['reference']
if not isinstance(reference, dict):
return
for chunk_i in reference.get('chunks', []):
if 'docnm_kwd' in chunk_i:
chunk_i['doc_name'] = chunk_i['docnm_kwd']
chunk_i.pop('docnm_kwd')

def stream():
nonlocal dia, msg, req, conv
try:
for ans in chat(dia, msg, True, **req):
fillin_conv(ans)
rename_field(ans)
yield "data:" + json.dumps({"retcode": 0, "retmsg": "", "data": ans}, ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"retcode": 0, "retmsg": "", "data": ans},
ensure_ascii=False) + "\n\n"
API4ConversationService.append_message(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({"retcode": 500, "retmsg": str(e),
"data": {"answer": "**ERROR**: "+str(e), "reference": []}},
"data": {"answer": "**ERROR**: " + str(e), "reference": []}},
ensure_ascii=False) + "\n\n"
yield "data:"+json.dumps({"retcode": 0, "retmsg": "", "data": True}, ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"retcode": 0, "retmsg": "", "data": True}, ensure_ascii=False) + "\n\n"

if req.get("stream", True):
resp = Response(stream(), mimetype="text/event-stream")
Expand All @@ -228,16 +325,15 @@ def stream():
resp.headers.add_header("X-Accel-Buffering", "no")
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
return resp
else:
answer = None
for ans in chat(dia, msg, **req):
answer = ans
fillin_conv(ans)
API4ConversationService.append_message(conv.id, conv.to_dict())
break

rename_field(answer)
return get_json_result(data=answer)

answer = None
for ans in chat(dia, msg, **req):
answer = ans
fillin_conv(ans)
API4ConversationService.append_message(conv.id, conv.to_dict())
break
rename_field(answer)
return get_json_result(data=answer)

except Exception as e:
return server_error_response(e)
Expand Down Expand Up @@ -332,7 +428,7 @@ def upload():
"thumbnail": thumbnail(filename, blob)
}

form_data=request.form
form_data = request.form
if "parser_id" in form_data.keys():
if request.form.get("parser_id").strip() in list(vars(ParserType).values())[1:-3]:
doc["parser_id"] = request.form.get("parser_id").strip()
Expand Down Expand Up @@ -361,15 +457,15 @@ def upload():
if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!")

#e, doc = DocumentService.get_by_id(doc["id"])
# e, doc = DocumentService.get_by_id(doc["id"])
TaskService.filter_delete([Task.doc_id == doc["id"]])
e, doc = DocumentService.get_by_id(doc["id"])
doc = doc.to_dict()
doc["tenant_id"] = tenant_id
bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"])
queue_tasks(doc, bucket, name)
except Exception as e:
return server_error_response(e)
return server_error_response(e)

return get_json_result(data=doc_result.to_json())

Expand Down Expand Up @@ -448,7 +544,7 @@ def list_kb_docs():
docs = [{"doc_id": doc['id'], "doc_name": doc['name']} for doc in docs]

return get_json_result(data={"total": tol, "docs": docs})

except Exception as e:
return server_error_response(e)

Expand Down Expand Up @@ -549,7 +645,8 @@ def fillin_conv(ans):
nonlocal conv
if not conv.reference:
conv.reference.append(ans["reference"])
else: conv.reference[-1] = ans["reference"]
else:
conv.reference[-1] = ans["reference"]
conv.message[-1] = {"role": "assistant", "content": ans["answer"]}

data_type_picture = {
Expand Down Expand Up @@ -638,4 +735,3 @@ def retrieval():
return get_json_result(data=False, retmsg=f'No chunk found! Check the chunk status please!',
retcode=RetCode.DATA_ERROR)
return server_error_response(e)

16 changes: 16 additions & 0 deletions api/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ class APIToken(DataBaseModel):
tenant_id = CharField(max_length=32, null=False, index=True)
token = CharField(max_length=255, null=False, index=True)
dialog_id = CharField(max_length=32, null=False, index=True)
source = CharField(max_length=16, null=True, help_text="none|agent|dialog", index=True)

class Meta:
db_table = "api_token"
Expand All @@ -871,6 +872,7 @@ class API4Conversation(DataBaseModel):
message = JSONField(null=True)
reference = JSONField(null=True, default=[])
tokens = IntegerField(default=0)
source = CharField(max_length=16, null=True, help_text="none|agent|dialog", index=True)

duration = FloatField(default=0, index=True)
round = IntegerField(default=0, index=True)
Expand Down Expand Up @@ -949,3 +951,17 @@ def migrate_db():
)
except Exception as e:
pass
try:
migrate(
migrator.add_column('api_token', 'source',
CharField(max_length=16, null=True, help_text="none|agent|dialog", index=True))
)
except Exception as e:
pass
try:
migrate(
migrator.add_column('api_4_conversation', 'source',
CharField(max_length=16, null=True, help_text="none|agent|dialog", index=True))
)
except Exception as e:
pass