Skip to content

Commit

Permalink
add microservice level perf statistics (#135)
Browse files Browse the repository at this point in the history
* add statistics

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Spycsh and pre-commit-ci[bot] authored Jun 11, 2024
1 parent 70c23d1 commit 597b3ca
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 4 deletions.
3 changes: 3 additions & 0 deletions comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@

# Telemetry
from comps.cores.telemetry.opea_telemetry import opea_telemetry

# Statistics
from comps.cores.mega.base_statistics import statistics_dict, register_statistics
85 changes: 85 additions & 0 deletions comps/cores/mega/base_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import numpy as np

# name => statistic dict
statistics_dict = {}


class BaseStatistics:
"""Base class to store in-memory statistics of an entity for measurement in one service."""

def __init__(
self,
):
self.response_times = [] # store responses time for all requests
self.first_token_latencies = [] # store first token latencies for all requests

def append_latency(self, latency, first_token_latency=None):
self.response_times.append(latency)
if first_token_latency:
self.first_token_latencies.append(first_token_latency)

def calcuate_statistics(self):
if not self.response_times:
return {
"p50_latency": None,
"p99_latency": None,
"average_latency": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.response_times, 50)

# Calculate the P99
p99 = np.percentile(self.response_times, 99)

avg = np.average(self.response_times)

return {
"p50_latency": p50,
"p99_latency": p99,
"average_latency": avg,
}

def calcuate_first_token_statistics(self):
if not self.first_token_latencies:
return {
"p50_latency_first_token": None,
"p99_latency_first_token": None,
"average_latency_first_token": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.first_token_latencies, 50)

# Calculate the P99
p99 = np.percentile(self.first_token_latencies, 99)

avg = np.average(self.first_token_latencies)

return {
"p50_latency_first_token": p50,
"p99_latency_first_token": p99,
"average_latency_first_token": avg,
}


def register_statistics(
names,
):
def decorator(func):
for name in names:
statistics_dict[name] = BaseStatistics()
return func

return decorator


def collect_all_statistics():
results = {}
if statistics_dict:
for name, statistic in statistics_dict.items():
tmp_dict = statistic.calcuate_statistics()
tmp_dict.update(statistic.calcuate_first_token_statistics())
results.update({name: tmp_dict})
return results
11 changes: 11 additions & 0 deletions comps/cores/mega/http_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uvicorn import Config, Server

from .base_service import BaseService
from .base_statistics import collect_all_statistics


class HTTPService(BaseService):
Expand Down Expand Up @@ -66,6 +67,16 @@ async def _health_check():
"""Get the health status of this GenAI microservice."""
return {"Service Title": self.title, "Service Description": self.description}

@app.get(
path="/v1/statistics",
summary="Get the statistics of GenAI services",
tags=["Debug"],
)
async def _get_statistics():
"""Get the statistics of GenAI services."""
result = collect_all_statistics()
return result

return app

async def initialize_server(self):
Expand Down
14 changes: 13 additions & 1 deletion comps/embeddings/langchain/embedding_tei_gaudi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from langchain_community.embeddings import HuggingFaceHubEmbeddings
from langsmith import traceable

from comps import EmbedDoc768, ServiceType, TextDoc, opea_microservices, register_microservice
from comps import (
EmbedDoc768,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -19,10 +28,13 @@
output_datatype=EmbedDoc768,
)
@traceable(run_type="embedding")
@register_statistics(names=["opea_service@embedding_tgi_gaudi"])
def embedding(input: TextDoc) -> EmbedDoc768:
start = time.time()
embed_vector = embeddings.embed_query(input.text)
embed_vector = embed_vector[:768] # Keep only the first 768 elements
res = EmbedDoc768(text=input.text, embedding=embed_vector)
statistics_dict["opea_service@embedding_tgi_gaudi"].append_latency(time.time() - start, None)
return res


Expand Down
18 changes: 17 additions & 1 deletion comps/llms/text-generation/tgi/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from fastapi.responses import StreamingResponse
from langchain_community.llms import HuggingFaceEndpoint
from langsmith import traceable

from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice
from comps import (
GeneratedDoc,
LLMParamsDoc,
ServiceType,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -18,7 +27,9 @@
port=9000,
)
@traceable(run_type="llm")
@register_statistics(names=["opea_service@llm_tgi"])
def llm_generate(input: LLMParamsDoc):
start = time.time()
llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080")
llm = HuggingFaceEndpoint(
endpoint_url=llm_endpoint,
Expand All @@ -34,19 +45,24 @@ def llm_generate(input: LLMParamsDoc):

if input.streaming:

stream_gen_time = []

async def stream_generator():
chat_response = ""
async for text in llm.astream(input.query):
stream_gen_time.append(time.time() - start)
chat_response += text
chunk_repr = repr(text.encode("utf-8"))
print(f"[llm - chat_stream] chunk:{chunk_repr}")
yield f"data: {chunk_repr}\n\n"
print(f"[llm - chat_stream] stream response: {chat_response}")
statistics_dict["opea_service@llm_tgi"].append_latency(stream_gen_time[-1], stream_gen_time[0])
yield "data: [DONE]\n\n"

return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
response = llm.invoke(input.query)
statistics_dict["opea_service@llm_tgi"].append_latency(time.time() - start, None)
return GeneratedDoc(text=response, prompt=input.query)


Expand Down
14 changes: 13 additions & 1 deletion comps/reranks/langchain/reranking_tei_xeon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@

import json
import os
import time

import requests
from langchain_core.prompts import ChatPromptTemplate
from langsmith import traceable

from comps import LLMParamsDoc, SearchedDoc, ServiceType, opea_microservices, register_microservice
from comps import (
LLMParamsDoc,
SearchedDoc,
ServiceType,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


@register_microservice(
Expand All @@ -21,7 +30,9 @@
output_datatype=LLMParamsDoc,
)
@traceable(run_type="llm")
@register_statistics(names=["opea_service@reranking_tgi_gaudi"])
def reranking(input: SearchedDoc) -> LLMParamsDoc:
start = time.time()
docs = [doc.text for doc in input.retrieved_docs]
url = tei_reranking_endpoint + "/rerank"
data = {"query": input.initial_query, "texts": docs}
Expand All @@ -36,6 +47,7 @@ def reranking(input: SearchedDoc) -> LLMParamsDoc:
prompt = ChatPromptTemplate.from_template(template)
doc = input.retrieved_docs[best_response["index"]]
final_prompt = prompt.format(context=doc.text, question=input.initial_query)
statistics_dict["opea_service@reranking_tgi_gaudi"].append_latency(time.time() - start, None)
return LLMParamsDoc(query=final_prompt.strip())


Expand Down
15 changes: 14 additions & 1 deletion comps/retrievers/langchain/retriever_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@
# SPDX-License-Identifier: Apache-2.0

import os
import time

from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.vectorstores import Redis
from langsmith import traceable
from redis_config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, REDIS_URL

from comps import EmbedDoc768, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice
from comps import (
EmbedDoc768,
SearchedDoc,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)

tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT")

Expand All @@ -21,12 +31,15 @@
port=7000,
)
@traceable(run_type="retriever")
@register_statistics(names=["opea_service@retriever_redis"])
def retrieve(input: EmbedDoc768) -> SearchedDoc:
start = time.time()
search_res = vector_db.similarity_search_by_vector(embedding=input.embedding)
searched_docs = []
for r in search_res:
searched_docs.append(TextDoc(text=r.page_content))
result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text)
statistics_dict["opea_service@retriever_redis"].append_latency(time.time() - start, None)
return result


Expand Down

0 comments on commit 597b3ca

Please sign in to comment.