Skip to content

Commit

Permalink
docs: update the website_rag example
Browse files Browse the repository at this point in the history
  • Loading branch information
jjleng committed May 24, 2024
1 parent 65dcdd5 commit b7fdc89
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 38 deletions.
File renamed without changes.
59 changes: 27 additions & 32 deletions examples/website_rag/cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,58 +1,53 @@
version: "1.0"
version: "1.2"
aws:
cluster:
name: website-rag
region: us-west-2
namespace: default
nodeType: t2.medium
nodeType: t3a.medium
minNodes: 2
maxNodes: 4
vectorStore:
nodeType: t2.small
nodeType: t3a.small
replicas: 1
prometheus:
enabled: true
tracing:
enabled: true
modelGroups:
- nodeType: c7a.xlarge
minInstances: 1
maxInstances: 3
name: gte-base
mixedModelGroups:
- name: gte-base
nodeType: c7a.xlarge
baseInstances: 0
maxOnDemandInstances: 1
spot:
minInstances: 1
maxInstances: 3
runtime:
image: ghcr.io/ggerganov/llama.cpp:server
model:
hfRepoId: jjleng/gte-base-gguf
files: ["*.q4_0.gguf"]
resourceRequest:
cpu: 3600m
memory: 6Gi
autoScaleTriggers:
- type: cpu
metadata:
type: Utilization
value: "50"
- nodeType: c7a.xlarge
minInstances: 1
maxInstances: 3
name: llama2-7b
- name: llama2-7b-chat
nodeType: g4dn.xlarge
gpu:
enabled: true # This model group runs on GPU-enabled instances
baseInstances: 0
maxOnDemandInstances: 1
spot:
minInstances: 1
maxInstances: 2
runtime:
image: ghcr.io/ggerganov/llama.cpp:server
image: vllm/vllm-openai:v0.4.2
model:
hfRepoId: TheBloke/Llama-2-7B-GGUF
files: ["*.Q4_0.gguf"]
resourceRequest:
cpu: 3600m
memory: 6Gi
hfRepoId: TheBloke/Llama-2-7B-Chat-GPTQ
autoScaleTriggers:
- type: cpu
metadata:
type: Utilization
value: "50"
- type: prometheus
metadata:
serverAddress: http://kube-prometheus-stack-prometheus.prometheus.svc.cluster.local:9090
metricName: request_duration_90percentile
threshold: '220000'
query: |
histogram_quantile(0.90, sum(rate(istio_request_duration_milliseconds_bucket{destination_service_name="llama2-7b", response_code="200"}[1m])) by (le))
serverAddress: http://kube-prometheus-stack-prometheus.prometheus.svc.cluster.local:9090 # Prometheus endpoint
metricName: latency_p95
threshold: '20000' # Set to 20s, tune as needed
query: | # Trigger scaling if p95 latency exceeds 20s
histogram_quantile(0.95, sum(rate(istio_request_duration_milliseconds_bucket{destination_service="llama2-7b-chat.default.svc.cluster.local"}[5m])) by (le))
2 changes: 1 addition & 1 deletion examples/website_rag/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
QDRANT_URL = "http://qdrant.qdrant.svc.cluster.local:6333"
LLM_URL = "http://llama2-7b"
LLM_URL = "http://llama2-7b-chat"
EMBEDDING_URL = "http://gte-base"
File renamed without changes.
6 changes: 4 additions & 2 deletions examples/website_rag/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from langchain_community.vectorstores import Qdrant
from langchain_core.runnables import RunnableLambda
from langserve import APIHandler, add_routes # type: ignore
from llama_cpp_llm import LlamaCpp
from qdrant_client import QdrantClient
from vllm import Vllm

logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -47,7 +47,8 @@
def run_llm(query: str) -> Any:
start_time = time.time()
logging.info(f"Running LLM with query: {query}")
llm = LlamaCpp(
llm = Vllm(
model="llama2-7b-chat",
model_url=LLM_URL,
temperature=0,
max_tokens=2500,
Expand All @@ -58,6 +59,7 @@ def run_llm(query: str) -> Any:
llm=llm, retriever=retriever, chain_type="stuff", return_source_documents=True
)

query = f"[INST] <<SYS>><</SYS>>\n\n{query} [/INST]\n"
result = qa.invoke({"query": query})
logging.info(f"LLM result: {result}")

Expand Down
245 changes: 245 additions & 0 deletions examples/website_rag/vllm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# Adapter for using the vLLM model from the LangChain API.
from __future__ import annotations

import json
import logging
from typing import Any, Dict, Iterator, List, Optional

import requests
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.utils import get_pydantic_field_names
from langchain_core.utils.utils import build_extra_kwargs
from sseclient import SSEClient

logger = logging.getLogger(__name__)


class Client:
def __init__(self, model_url: str) -> None:
self.url = model_url
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}

def invoke(self, **kwargs: Any) -> Any:
response = requests.post(
f"{self.url}/v1/completions",
headers=self.headers,
json=kwargs,
verify=False,
)
print(response.text)
return response.json()

def stream(self, **kwargs: Any) -> Any:
with requests.post(
f"{self.url}/v1/completions",
headers={**self.headers, "Accept": "text/event-stream"},
json=kwargs,
stream=True,
verify=False,
) as response:
client = SSEClient((chunk for chunk in response.iter_content()))
for event in client.events():
if not event.event == "message":
continue
if event.data.strip() == "[DONE]":
break
else:
json_line = json.loads(event.data)
yield json_line

def tokenize(self, text: str) -> int:
response = requests.post(
f"{self.url}/v1/embeddings",
headers=self.headers,
json={"input": text},
verify=False,
)
return response.json()["usage"]["prompt_tokens"]


class Vllm(LLM):
"""Vllm model."""

client: Any = None #: :meta private:

model: str
"""The model name."""

model_url: str
"""The url of the model server."""

suffix: Optional[str] = None
"""A suffix to append to the generated text. If None, no suffix is appended."""

max_tokens: Optional[int] = 256
"""The maximum number of tokens to generate."""

temperature: Optional[float] = 0.8
"""The temperature to use for sampling."""

top_p: Optional[float] = 0.95
"""The top-p value to use for sampling."""

logprobs: Optional[int] = None
"""The number of logprobs to return. If None, no logprobs are returned."""

echo: Optional[bool] = False
"""Whether to echo the prompt."""

stop: Optional[List[str]] = []
"""A list of strings to stop generation when encountered."""

top_k: Optional[int] = 40
"""The top-k value to use for sampling."""

last_n_tokens_size: Optional[int] = 64
"""The number of tokens to look back when applying the repeat_penalty."""

model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Any additional parameters."""

streaming: bool = True
"""Whether to stream the results, token by token."""

@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
model_url = values["model_url"]

values["client"] = Client(model_url)

return values

@root_validator(pre=True)
def build_model_kwargs(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Build extra kwargs from additional params that were passed in."""
all_required_field_names = get_pydantic_field_names(cls)
extra = values.get("model_kwargs", {})
values["model_kwargs"] = build_extra_kwargs(
extra, values, all_required_field_names
)
return values

@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters."""
params = {
"model": self.model,
"suffix": self.suffix,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"top_p": self.top_p,
"logprobs": self.logprobs,
"echo": self.echo,
"stop": self.stop, # key here is convention among LLM classes
"top_k": self.top_k,
}
return params

@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters."""
return self._default_params

@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "openai"

def _get_parameters(self, stop: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Performs sanity check.
Args:
stop (Optional[List[str]]): List of stop sequences.
Returns:
Dictionary containing the combined parameters.
"""

# Raise error if stop sequences are in both input and default params
if self.stop and stop is not None:
raise ValueError("`stop` found in both the input and default params.")

params = self._default_params

# then sets it as configured, or default to an empty list:
params["stop"] = self.stop or stop or []

return params

def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call the Llama model and return the output.
Args:
prompt: The prompt to use for generation.
stop: A list of strings to stop generation when encountered.
Returns:
The generated text.
"""
if self.streaming:
# If streaming is enabled, we use the stream
# method that yields as they are generated
# and return the combined strings from the first choices's text:
combined_text_output = ""
for chunk in self._stream(
prompt=prompt,
stop=stop,
run_manager=run_manager,
**kwargs,
):
combined_text_output += chunk.text
return combined_text_output
else:
params = self._get_parameters(stop)
params = {**params, **kwargs}
result = self.client.invoke(prompt=prompt, stream=False, **params)
return result["choices"][0]["text"]

def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
"""Yields results objects as they are generated in real time.
It also calls the callback manager's on_llm_new_token event with
similar parameters to the OpenAI LLM class method of the same name.
Args:
prompt: The prompts to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
A generator representing the stream of tokens being generated.
"""
params = {**self._get_parameters(stop), **kwargs}
result = self.client.stream(prompt=prompt, stream=True, **params)
for part in result:
logprobs = part["choices"][0].get("logprobs", None)
chunk = GenerationChunk(
text=part["choices"][0]["text"],
generation_info={"logprobs": logprobs},
)
yield chunk
if run_manager:
run_manager.on_llm_new_token(
token=chunk.text, verbose=True, log_probs=logprobs
)

def get_num_tokens(self, text: str) -> int:
return self.client.tokenize(text)
2 changes: 1 addition & 1 deletion paka/cluster/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def odic_role_for_sa(

def get_ami_for_instance(ctx: Context, instance_type: str) -> str:
instance_info = get_instance_info(ctx.provider, ctx.region, instance_type)
gpu_count = instance_info.get("gpu_count", 0)
gpu_count = instance_info.get("gpu_count", 0) or 0
arch = instance_info.get("arch", "x86_64")

if gpu_count > 0:
Expand Down
4 changes: 2 additions & 2 deletions paka/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def get_instance_info(provider: str, region: str, instance_type: str) -> Dict[st
instance_type_info = instance_types[0]
gpu_info = instance_type_info.get("GpuInfo", {})
gpu, vram = gpu_info.get("Gpus", [{}])[0], gpu_info.get(
"TotalGpuMemoryInMiB"
"TotalGpuMemoryInMiB", 0
)
architectures = instance_type_info.get("ProcessorInfo", {}).get(
"SupportedArchitectures", []
Expand All @@ -399,7 +399,7 @@ def get_instance_info(provider: str, region: str, instance_type: str) -> Dict[st
return {
"cpu": instance_type_info.get("VCpuInfo", {}).get("DefaultVCpus"),
"memory": instance_type_info.get("MemoryInfo", {}).get("SizeInMiB"),
"gpu_count": gpu.get("Count"),
"gpu_count": gpu.get("Count", 0),
"vram": vram,
"gpu_manufacturer": gpu.get("Manufacturer"),
"gpu_name": gpu.get("Name"),
Expand Down

0 comments on commit b7fdc89

Please sign in to comment.