Skip to content

Commit

Permalink
cherry-pick: nim support and persist data per user (#125)
Browse files Browse the repository at this point in the history
* persist volume per user and allow runtime image with full path URL

* support nim engine

---------

Co-authored-by: James <xzgan@opencsg.com>
  • Loading branch information
ganisback and James authored Sep 20, 2024
1 parent 1336516 commit 4324526
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 28 deletions.
12 changes: 7 additions & 5 deletions builder/deploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,13 @@ func (d *deployer) Purge(ctx context.Context, dr types.DeployRepo) error {
targetID = dr.DeployID // support model deploy with multi-instance
}
resp, err := d.ir.Purge(ctx, &types.PurgeRequest{
ID: targetID,
OrgName: dr.Namespace,
RepoName: dr.Name,
SvcName: dr.SvcName,
ClusterID: dr.ClusterID,
ID: targetID,
OrgName: dr.Namespace,
RepoName: dr.Name,
SvcName: dr.SvcName,
ClusterID: dr.ClusterID,
DeployType: dr.Type,
UserID: dr.UserUUID,
})
if err != nil {
slog.Error("deployer stop deploy", slog.Any("runner_resp", resp), slog.Int64("space_id", dr.SpaceID), slog.Any("deploy_id", dr.DeployID), slog.Any("error", err))
Expand Down
8 changes: 5 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ type Config struct {
}

Model struct {
DeployTimeoutInMin int `envconfig:"STARHUB_SERVER_MODEL_DEPLOY_TIMEOUT_IN_MINUTES" default:"60"`
DownloadEndpoint string `envconfig:"STARHUB_SERVER_MODEL_DOWNLOAD_ENDPOINT" default:"https://hub.opencsg.com"`
DockerRegBase string `envconfig:"STARHUB_SERVER_MODEL_DOCKER_REG_BASE" default:"opencsg-registry.cn-beijing.cr.aliyuncs.com/public/"`
DeployTimeoutInMin int `envconfig:"STARHUB_SERVER_MODEL_DEPLOY_TIMEOUT_IN_MINUTES" default:"60"`
DownloadEndpoint string `envconfig:"STARHUB_SERVER_MODEL_DOWNLOAD_ENDPOINT" default:"https://hub.opencsg.com"`
DockerRegBase string `envconfig:"STARHUB_SERVER_MODEL_DOCKER_REG_BASE" default:"opencsg-registry.cn-beijing.cr.aliyuncs.com/public/"`
NimDockerSecretName string `envconfig:"STARHUB_SERVER_MODEL_NIM_DOCKER_SECRET_NAME" default:"ngc-secret"`
NimNGCSecretName string `envconfig:"STARHUB_SERVER_MODEL_NIM_NGC_SECRET_NAME" default:"nvidia-nim-secrets"`
}
// send events
Event struct {
Expand Down
12 changes: 7 additions & 5 deletions common/types/service_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ type (
}

PurgeRequest struct {
ID int64 `json:"id"`
OrgName string `json:"org_name"`
RepoName string `json:"repo_name"`
ClusterID string `json:"cluster_id"`
SvcName string `json:"svc_name"`
ID int64 `json:"id"`
OrgName string `json:"org_name"`
RepoName string `json:"repo_name"`
ClusterID string `json:"cluster_id"`
SvcName string `json:"svc_name"`
DeployType int `json:"deploy_type"`
UserID string `json:"user_id"`
}

PurgeResponse struct {
Expand Down
52 changes: 42 additions & 10 deletions servicerunner/component/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"path"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewServiceComponent(config *config.Config, k8sNameSpace string) *ServiceCom
return sc
}

func (s *ServiceComponent) GenerateService(request types.SVCRequest, srvName string) (*v1.Service, error) {
func (s *ServiceComponent) GenerateService(ctx context.Context, cluster cluster.Cluster, request types.SVCRequest, srvName string) (*v1.Service, error) {
annotations := request.Annotation

environments := []corev1.EnvVar{}
Expand Down Expand Up @@ -93,10 +94,16 @@ func (s *ServiceComponent) GenerateService(request types.SVCRequest, srvName str
annotations[KeyUserID] = request.UserID
annotations[KeyDeploySKU] = request.Sku

containerImg := path.Join(s.spaceDockerRegBase, request.ImageID)
if request.RepoType == string(types.ModelRepo) {
// choose registry
containerImg = path.Join(s.modelDockerRegBase, request.ImageID)
containerImg := request.ImageID
// add prefix if image is not full path
if !strings.Contains(containerImg, "/") {
if request.RepoType == string(types.ModelRepo) {
// choose registry
containerImg = path.Join(s.modelDockerRegBase, request.ImageID)
} else if request.RepoType == string(types.SpaceRepo) {
// choose registry
containerImg = path.Join(s.spaceDockerRegBase, request.ImageID)
}
}

templateAnnotations := make(map[string]string)
Expand All @@ -120,6 +127,25 @@ func (s *ServiceComponent) GenerateService(request types.SVCRequest, srvName str
failureThreshold = s.env.Space.ReadnessFailureThreshold
}

imagePullSecrets := []corev1.LocalObjectReference{
{
Name: s.imagePullSecret,
},
}

// handle nim engine
if strings.Contains(containerImg, "nvcr.io/nim/") {
imagePullSecrets = append(imagePullSecrets, corev1.LocalObjectReference{
Name: s.env.Model.NimDockerSecretName,
})
ngc_api_key, err := s.GetNimSecret(ctx, cluster)
if err != nil {
return nil, fmt.Errorf("can not find secret %s in %s namespace , error: %w", s.env.Model.NimNGCSecretName, s.k8sNameSpace, err)
}
environments = append(environments, corev1.EnvVar{Name: "NGC_API_KEY", Value: ngc_api_key})
environments = append(environments, corev1.EnvVar{Name: "NIM_CACHE_PATH", Value: "/workspace"})
}

service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: srvName,
Expand Down Expand Up @@ -148,11 +174,7 @@ func (s *ServiceComponent) GenerateService(request types.SVCRequest, srvName str
FailureThreshold: int32(failureThreshold),
},
}},
ImagePullSecrets: []corev1.LocalObjectReference{
{
Name: s.imagePullSecret,
},
},
ImagePullSecrets: imagePullSecrets,
},
},
},
Expand All @@ -162,6 +184,16 @@ func (s *ServiceComponent) GenerateService(request types.SVCRequest, srvName str
return service, nil
}

// get secret from k8s
// notes: admin should create nim secret "ngc-secret" and "nvidia-nim-secrets" in related namespace before deploy
func (s *ServiceComponent) GetNimSecret(ctx context.Context, cluster cluster.Cluster) (string, error) {
secret, err := cluster.Client.CoreV1().Secrets(s.k8sNameSpace).Get(ctx, s.env.Model.NimNGCSecretName, metav1.GetOptions{})
if err != nil {
return "", err
}
return string(secret.Data["NGC_API_KEY"]), nil
}

func (s *ServiceComponent) GetServicePodsWithStatus(ctx context.Context, cluster cluster.Cluster, srvName string, namespace string) ([]types.Instance, error) {
labelSelector := fmt.Sprintf("serving.knative.dev/service=%s", srvName)
// Get the list of Pods based on the label selector
Expand Down
15 changes: 10 additions & 5 deletions servicerunner/handler/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *K8sHander) RunService(c *gin.Context) {
s.removeServiceForcely(c, cluster, srvName)
slog.Info("service already exists,delete it first", slog.String("srv_name", srvName), slog.Any("image_id", request.ImageID))
}
service, err := s.s.GenerateService(*request, srvName)
service, err := s.s.GenerateService(c, *cluster, *request, srvName)
if err != nil {
slog.Error("fail to generate service ", slog.Any("error", err), slog.Any("req", request))
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
Expand All @@ -97,11 +97,15 @@ func (s *K8sHander) RunService(c *gin.Context) {
MountPath: "/dev/shm",
})
}
pvcName := srvName
if request.DeployType == types.InferenceType {
pvcName = request.UserID
}
// add pvc if possible
// space image was built from user's code, model cache dir is hard to control
// so no PV cache for space case so far
if cluster.StorageClass != "" && request.DeployType != types.SpaceType {
err = s.s.NewPersistentVolumeClaim(srvName, c, *cluster, request.Hardware)
err = s.s.NewPersistentVolumeClaim(pvcName, c, *cluster, request.Hardware)
if err != nil {
slog.Error("Failed to create persist volume", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create persist volume"})
Expand All @@ -111,7 +115,7 @@ func (s *K8sHander) RunService(c *gin.Context) {
Name: "nas-pvc",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: srvName,
ClaimName: pvcName,
},
},
})
Expand Down Expand Up @@ -860,15 +864,16 @@ func (s *K8sHander) PurgeService(c *gin.Context) {
}

// 2 clean up pvc
if cluster.StorageClass != "" {
if cluster.StorageClass != "" && request.DeployType == types.FinetuneType {
err = cluster.Client.CoreV1().PersistentVolumeClaims(s.k8sNameSpace).Delete(c, srvName, metav1.DeleteOptions{})
if err != nil {
slog.Error("fail to delete pvc", slog.Any("error", err))
c.JSON(http.StatusInternalServerError, gin.H{"error": "fail to delete pvc"})
return
}
slog.Info("persistent volume claims deleted.", slog.String("srv_name", srvName))
}
slog.Info("service deleted, PVC deleted.", slog.String("srv_name", srvName))
slog.Info("service deleted.", slog.String("srv_name", srvName))
resp.Code = 0
resp.Message = "succeed to clean up service"
c.JSON(http.StatusOK, resp)
Expand Down

0 comments on commit 4324526

Please sign in to comment.