Skip to content

Commit

Permalink
gmc: include ui as part of pipeline
Browse files Browse the repository at this point in the history
* add ui handler in GMC router logic
* add sample config file for chatqna with conversation ui

Signed-off-by: Ruoyu Ying <ruoyu.ying@intel.com>
  • Loading branch information
Ruoyu-y committed Oct 18, 2024
1 parent 7e7b8ab commit 0612d5c
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 6 deletions.
1 change: 1 addition & 0 deletions microservices-connector/api/v1alpha3/validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
"Whisper",
"WhisperGaudi",
"DataPrep",
"UI",
}
)

Expand Down
195 changes: 189 additions & 6 deletions microservices-connector/cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"io"
"mime/multipart"
"net/http"
"net/http/httputil"
"net/url"
"os"

// "regexp"
Expand All @@ -39,12 +41,15 @@ import (
)

const (
BufferSize = 1024
MaxGoroutines = 1024
ServiceURL = "serviceUrl"
ServiceNode = "node"
DataPrep = "DataPrep"
Parameters = "parameters"
BufferSize = 1024
MaxGoroutines = 1024
ServiceURL = "serviceUrl"
ServiceNode = "node"
DataPrep = "DataPrep"
Parameters = "parameters"
UI = "UI"
LLMKeyword = "query"
EmbeddingKeyword = "text"
)

var (
Expand All @@ -64,6 +69,15 @@ var (
Transport: transport,
Timeout: 30 * time.Second,
}
UnknownErr = errors.New("Unknown format")
defaultLlmParams = map[string]interface{}{
"max_tokens": 1024,
"top_k": 10,
"top_p": 0.95,
"temperature": 0.01,
"repetition_penalty": 1.03,
"streaming": true,
}
)

type EnsembleStepOutput struct {
Expand Down Expand Up @@ -727,10 +741,179 @@ func handleMultipartError(writer *multipart.Writer, err error) {
log.Error(err, "Error during multipart creation")
}

// create a handler to handle traffic to /ui
// if the payload is empty, redirect to ui service
// if there's payload, format the payload and redirect to backend service
func mcUiHandler(w http.ResponseWriter, req *http.Request) {
// redirect traffic to ui pod if payload is empty
// redirect traffic to mcGraphHandler if payload is not empty
var finishProcessing bool
defaultNode := mcGraph.Spec.Nodes[defaultNodeName]
for i := range defaultNode.Steps {
step := &defaultNode.Steps[i]
if UI == step.StepName {
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// if no payload included in the request, redirect request to UI
if len(body) == 0 {
serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace)
targetURL, err := url.Parse(serviceURL)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.ServeHTTP(w, req)
finishProcessing = true
} else {
// if payload exists, format payload and redirect to backend services
var data map[string]interface{}
parsedData := map[string]interface{}{}

// find the first hop for the pipeline
var nextHop *mcv1alpha3.Step
for i := range defaultNode.Steps {
nextHop = &defaultNode.Steps[i]
if nextHop.InternalService.IsDownstreamService {
// skip downstream service
continue
}
break
}

// set the corresponding keyword for input data
var key string
switch nextHop.StepName {
case "Embedding":
key = EmbeddingKeyword
case "Llm":
key = LLMKeyword
default:
log.Info("Unsupported step. Failed to find the corresponding keyword. Using default one...")
key = LLMKeyword
}

// resolve the data input
err = json.Unmarshal(body, &data)
if err != nil {
log.Error(err, "Failed to parse data.")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// check if the pattern "messages" exists
if val, ok := data["messages"]; ok {
switch ms := val.(type) {
// value is in the format of string
case string:
parsedData[key] = ms
// value is in the format of array
case []interface{}:
for _, item := range ms {
// check if item is in the format of map
if m, ok := item.(map[string]interface{}); ok {
// find the key "role"
if v, ok := m["role"]; ok {
strRole := fmt.Sprintf("%v", v)
content := ""
// find the key "content"
// currently consume everything as string
if c, ok := m["content"]; ok {
content = fmt.Sprintf("%v", c)
}
switch strRole {
// concatenate system prompt
case "system":
parsedData[key] = content + "/n"
// concatenate others
default:
parsedData[key] = fmt.Sprintf("%v", parsedData[key]) + strRole + ":" + content + "/n"
}
}
}
}
// others
default:
log.Error(UnknownErr, "Unknown format in payload messages.")
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

// attach the default llm parameters if the llm is the first hop
if key == LLMKeyword {
for k, v := range defaultLlmParams {
parsedData[k] = v
}
}

// append rest of the data
// treat everything as string
for k, v := range data {
if k == "messages" {
continue
}
parsedData[k] = v
}

marshalData, err := json.Marshal(parsedData)
if err != nil {
log.Error(err, "Failed to marshal prompt to json")
http.Error(w, err.Error(), http.StatusInternalServerError)
}

// create a new http request with the formatted data
proxyReq, err := http.NewRequest(req.Method, req.URL.String(), bytes.NewReader(marshalData))
if err != nil {
log.Error(err, "Failed to generate new http request with formatted payload.")
http.Error(w, err.Error(), http.StatusInternalServerError)
}
mcGraphHandler(w, proxyReq)
finishProcessing = true
}

}
}
// check if the request has been processed
if !finishProcessing {
log.Info("UI step not found in the graph or other errors happened. Please check the logs.")
}
}

// accessing UI result in access to assets under /assets uri
// create a handler to redirect that request to ui endpoint
func mcAssetHandler(w http.ResponseWriter, req *http.Request) {
// Determine the asset type based on the URL path
defaultNode := mcGraph.Spec.Nodes[defaultNodeName]
found := false
for i := range defaultNode.Steps {
step := &defaultNode.Steps[i]
if UI == step.StepName {
serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace)
targetURL, err := url.Parse(serviceURL)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.ServeHTTP(w, req)
return
}
}
if !found {
log.Info("UI step not found in the graph.")
}
}

func initializeRoutes() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/", mcGraphHandler)
mux.HandleFunc("/dataprep", mcDataHandler)
mux.HandleFunc("/assets/", mcAssetHandler)
mux.HandleFunc("/ui", mcUiHandler)
return mux
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

apiVersion: gmc.opea.io/v1alpha3
kind: GMConnector
metadata:
labels:
app.kubernetes.io/name: gmconnector
app.kubernetes.io/managed-by: kustomize
gmc/platform: xeon
name: chatqa
namespace: chatqa
spec:
routerConfig:
name: router
serviceName: router-service
nodes:
root:
routerType: Sequence
steps:
- name: Embedding
internalService:
serviceName: embedding-svc
config:
endpoint: /v1/embeddings
TEI_EMBEDDING_ENDPOINT: tei-embedding-svc
- name: TeiEmbedding
internalService:
serviceName: tei-embedding-svc
isDownstreamService: true
- name: Retriever
data: $response
internalService:
serviceName: retriever-svc
config:
endpoint: /v1/retrieval
REDIS_URL: redis-vector-db
TEI_EMBEDDING_ENDPOINT: tei-embedding-svc
- name: VectorDB
internalService:
serviceName: redis-vector-db
isDownstreamService: true
- name: Reranking
data: $response
internalService:
serviceName: reranking-svc
config:
endpoint: /v1/reranking
TEI_RERANKING_ENDPOINT: tei-reranking-svc
- name: TeiReranking
internalService:
serviceName: tei-reranking-svc
config:
endpoint: /rerank
isDownstreamService: true
- name: Llm
data: $response
internalService:
serviceName: llm-svc
config:
endpoint: /v1/chat/completions
TGI_LLM_ENDPOINT: tgi-service-m
- name: Tgi
internalService:
serviceName: tgi-service-m
config:
endpoint: /generate
isDownstreamService: true
- name: DataPrep
internalService:
serviceName: data-prep-svc
config:
endpoint: /v1/dataprep
REDIS_URL: redis-vector-db
TEI_ENDPOINT: tei-embedding-svc
isDownstreamService: true
- name: UI
internalService:
serviceName: ui-svc
config:
endpoint: /
isDownstreamService: true
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
SpeechT5Gaudi = "SpeechT5Gaudi"
Whisper = "Whisper"
WhisperGaudi = "WhisperGaudi"
UI = "UI"
)

var yamlDict = map[string]string{
Expand All @@ -95,6 +96,7 @@ var yamlDict = map[string]string{
Whisper: yaml_dir + "whisper.yaml",
WhisperGaudi: yaml_dir + "whisper_gaudi.yaml",
DataPrep: yaml_dir + "data-prep.yaml",
UI: yaml_dir + "ui.yaml",
}

var (
Expand Down

0 comments on commit 0612d5c

Please sign in to comment.