Skip to content

Commit

Permalink
return internal s3 signed url if client request has header X-OPENCSG-… (
Browse files Browse the repository at this point in the history
#145)

* return internal s3 signed url if client request has header X-OPENCSG-S3-Internal

* STARHUB_SERVER_S3_INTERNAL_ENDPOINT default to empty

* set env S3_INTERNAL when deploy model or space if S3 internal endpoint is set

---------

Co-authored-by: Lei Da <da.lei@opencsg.com>
  • Loading branch information
Rader and Lei Da committed Oct 14, 2024
1 parent 5bb3ec6 commit e53bb4e
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 77 deletions.
10 changes: 10 additions & 0 deletions api/handler/git_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ func (h *GitHTTPHandler) LfsBatch(ctx *gin.Context) {
return
}

s3Internal := ctx.GetHeader("X-OPENCSG-S3-Internal")
if s3Internal == "true" {
ctx.Set("X-OPENCSG-S3-Internal", true)
}

objectResponse, err := h.c.BuildObjectResponse(ctx, batchRequest, isUpload)
if err != nil {
if errors.Is(err, component.ErrUnauthorized) {
Expand Down Expand Up @@ -235,6 +240,11 @@ func (h *GitHTTPHandler) LfsDownload(ctx *gin.Context) {
downloadRequest.CurrentUser = httpbase.GetCurrentUser(ctx)
downloadRequest.SaveAs = ctx.Query("save_as")

s3Internal := ctx.GetHeader("X-OPENCSG-S3-Internal")
if s3Internal == "true" {
ctx.Set("X-OPENCSG-S3-Internal", true)
}

url, err := h.c.LfsDownload(ctx, downloadRequest)
if err != nil {
httpbase.ServerError(ctx, err)
Expand Down
11 changes: 11 additions & 0 deletions api/handler/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ func (h *RepoHandler) DownloadFile(ctx *gin.Context) {
return
}
}

s3Internal := ctx.GetHeader("X-OPENCSG-S3-Internal")
if s3Internal == "true" {
ctx.Set("X-OPENCSG-S3-Internal", true)
}
reader, size, url, err := h.c.DownloadFile(ctx, req, currentUser)
if err != nil {
slog.Error("Failed to download repo file", slog.String("repo_type", string(req.RepoType)), slog.Any("error", err))
Expand Down Expand Up @@ -835,6 +840,12 @@ func (h *RepoHandler) handleDownload(ctx *gin.Context, isResolve bool) {
} else {
branch = ctx.Param("branch")
}

s3Internal := ctx.GetHeader("X-OPENCSG-S3-Internal")
if s3Internal == "true" {
ctx.Set("X-OPENCSG-S3-Internal", true)
}

req := &types.GetFileReq{
Namespace: namespace,
Name: name,
Expand Down
17 changes: 17 additions & 0 deletions builder/deploy/common/deploy_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import "time"

type DeployConfig struct {
ImageBuilderURL string
ImageRunnerURL string
MonitorInterval time.Duration
InternalRootDomain string
SpaceDeployTimeoutInMin int
ModelDeployTimeoutInMin int
ModelDownloadEndpoint string
PublicRootDomain string
SSHDomain string
//download lfs object from internal s3 address
S3Internal bool
}
17 changes: 3 additions & 14 deletions builder/deploy/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package deploy

import (
"fmt"
"time"

"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
"opencsg.com/csghub-server/builder/deploy/imagerunner"
"opencsg.com/csghub-server/builder/deploy/scheduler"
Expand All @@ -14,7 +14,7 @@ var (
defaultDeployer Deployer
)

func Init(c DeployConfig) error {
func Init(c common.DeployConfig) error {
// ib := imagebuilder.NewLocalBuilder()
ib, err := imagebuilder.NewRemoteBuilder(c.ImageBuilderURL)
if err != nil {
Expand All @@ -25,7 +25,7 @@ func Init(c DeployConfig) error {
panic(fmt.Errorf("failed to create image runner:%w", err))
}

fifoScheduler = scheduler.NewFIFOScheduler(ib, ir, c.SpaceDeployTimeoutInMin, c.ModelDeployTimeoutInMin, c.ModelDownloadEndpoint, c.PublicRootDomain)
fifoScheduler = scheduler.NewFIFOScheduler(ib, ir, c)
deployer, err := newDeployer(fifoScheduler, ib, ir)
if err != nil {
return fmt.Errorf("failed to create deployer:%w", err)
Expand All @@ -39,14 +39,3 @@ func Init(c DeployConfig) error {
func NewDeployer() Deployer {
return defaultDeployer
}

type DeployConfig struct {
ImageBuilderURL string
ImageRunnerURL string
MonitorInterval time.Duration
InternalRootDomain string
SpaceDeployTimeoutInMin int
ModelDeployTimeoutInMin int
ModelDownloadEndpoint string
PublicRootDomain string
}
51 changes: 22 additions & 29 deletions builder/deploy/scheduler/deploy_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,28 @@ import (
"opencsg.com/csghub-server/common/types"
)

type DeployTimeout struct {
deploySpaceTimeoutInMin int
deployModelTimeoutInMin int
}

// DeployRunner defines a k8s image running task
type DeployRunner struct {
repo *RepoInfo
task *database.DeployTask
ir imagerunner.Runner
store *database.DeployTaskStore
tokenStore *database.AccessTokenStore
deployStartTime time.Time
deployTimeout *DeployTimeout
modelDownloadEndpoint string
publicDomain string
repo *RepoInfo
task *database.DeployTask
ir imagerunner.Runner
store *database.DeployTaskStore
tokenStore *database.AccessTokenStore
deployStartTime time.Time
deployCfg common.DeployConfig
}

func NewDeployRunner(ir imagerunner.Runner, r *RepoInfo, t *database.DeployTask, dto *DeployTimeout, mdep, publicDomain string) Runner {
func NewDeployRunner(ir imagerunner.Runner, r *RepoInfo, t *database.DeployTask, deployCfg common.DeployConfig) Runner {
return &DeployRunner{
repo: r,
task: t,
ir: ir,
store: database.NewDeployTaskStore(),
deployStartTime: time.Now(),
deployTimeout: dto,
tokenStore: database.NewAccessTokenStore(),
modelDownloadEndpoint: mdep,
publicDomain: publicDomain,
repo: r,
task: t,
ir: ir,
store: database.NewDeployTaskStore(),
deployStartTime: time.Now(),
tokenStore: database.NewAccessTokenStore(),
deployCfg: deployCfg,
}

}

// Run call k8s image runner service to run a docker image
Expand Down Expand Up @@ -107,9 +99,9 @@ func (t *DeployRunner) Run(ctx context.Context) error {
switch resp.Code {
case common.Deploying:
duration := time.Since(t.deployStartTime).Minutes()
limitTime := t.deployTimeout.deploySpaceTimeoutInMin
limitTime := t.deployCfg.SpaceDeployTimeoutInMin
if t.task.Deploy.SpaceID == 0 && t.task.Deploy.ModelID > 0 {
limitTime = t.deployTimeout.deployModelTimeoutInMin
limitTime = t.deployCfg.ModelDeployTimeoutInMin
}
if duration >= float64(limitTime) {
// space or model deploy duration is greater than timeout defined in env (default is 30 mins)
Expand Down Expand Up @@ -246,6 +238,7 @@ func (t *DeployRunner) makeDeployRequest() (*types.RunRequest, error) {
}

// for space and models
envMap["S3_INTERNAL"] = fmt.Sprintf("%v", t.deployCfg.S3Internal)
envMap["HTTPCloneURL"] = t.getHttpCloneURLWithToken(t.repo.HTTPCloneURL, token.Token)
envMap["ACCESS_TOKEN"] = token.Token
envMap["REPO_ID"] = t.repo.Path // "namespace/name"
Expand All @@ -270,17 +263,17 @@ func (t *DeployRunner) makeDeployRequest() (*types.RunRequest, error) {
if deploy.Type == types.InferenceType || deploy.Type == types.ServerlessType {
// runtime framework port for model
envMap["port"] = strconv.Itoa(deploy.ContainerPort)
envMap["HF_ENDPOINT"] = t.modelDownloadEndpoint // "https://hub-stg.opencsg.com/"
envMap["HF_ENDPOINT"] = t.deployCfg.ModelDownloadEndpoint // "https://hub-stg.opencsg.com/"
envMap["HF_HUB_OFFLINE"] = "1"
}

if deploy.Type == types.FinetuneType {
envMap["port"] = strconv.Itoa(deploy.ContainerPort)
envMap["HF_ENDPOINT"], _ = url.JoinPath(t.modelDownloadEndpoint, "hf")
envMap["HF_ENDPOINT"], _ = url.JoinPath(t.deployCfg.ModelDownloadEndpoint, "hf")
envMap["HF_TOKEN"] = token.Token
}

if t.publicDomain == "" {
if t.deployCfg.PublicRootDomain == "" {
if deploy.Type == types.FinetuneType {
envMap["CONTEXT_PATH"] = "/endpoint/" + deploy.SvcName
}
Expand Down
31 changes: 10 additions & 21 deletions builder/deploy/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"sync"
"time"

"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
"opencsg.com/csghub-server/builder/deploy/imagerunner"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/common/types"
"opencsg.com/csghub-server/common/utils/common"
utilcommon "opencsg.com/csghub-server/common/utils/common"
)

type Scheduler interface {
Expand All @@ -35,15 +36,12 @@ type FIFOScheduler struct {
ib imagebuilder.Builder
ir imagerunner.Runner

nextLock *sync.Mutex
spaceDeployTimeoutInMin int
modelDeployTimeoutInMin int
modelDownloadEndpoint string
PublicRootDomain string
config *config.Config
nextLock *sync.Mutex
deployCfg common.DeployConfig
config *config.Config
}

func NewFIFOScheduler(ib imagebuilder.Builder, ir imagerunner.Runner, sdt, mdt int, mdep, prd string) Scheduler {
func NewFIFOScheduler(ib imagebuilder.Builder, ir imagerunner.Runner, confg common.DeployConfig) Scheduler {
s := &FIFOScheduler{}
// TODO:allow config
s.timeout = 30 * time.Minute
Expand All @@ -58,10 +56,8 @@ func NewFIFOScheduler(ib imagebuilder.Builder, ir imagerunner.Runner, sdt, mdt i
s.ib = ib
s.ir = ir
s.nextLock = &sync.Mutex{}
s.spaceDeployTimeoutInMin = sdt
s.modelDeployTimeoutInMin = mdt
s.modelDownloadEndpoint = mdep
s.PublicRootDomain = prd
s.deployCfg = confg
//TODO: avoid load config, use config from params
s.config, _ = config.LoadConfig()
return s
}
Expand Down Expand Up @@ -148,7 +144,7 @@ func (rs *FIFOScheduler) next() (Runner, error) {
var s *database.Space
s, err = rs.spaceStore.ByID(ctx, deployTask.Deploy.SpaceID)
if err == nil {
repoCloneInfo := common.BuildCloneInfo(rs.config, s.Repository)
repoCloneInfo := utilcommon.BuildCloneInfo(rs.config, s.Repository)
repo.Path = s.Repository.Path
repo.Name = s.Repository.Name
repo.Sdk = s.Sdk
Expand Down Expand Up @@ -196,14 +192,7 @@ func (rs *FIFOScheduler) next() (Runner, error) {
if deployTask.TaskType == 0 {
t = NewBuidRunner(rs.ib, &repo, deployTask)
} else {
t = NewDeployRunner(rs.ir, &repo, deployTask,
&DeployTimeout{
deploySpaceTimeoutInMin: rs.spaceDeployTimeoutInMin,
deployModelTimeoutInMin: rs.modelDeployTimeoutInMin,
},
rs.modelDownloadEndpoint,
rs.PublicRootDomain,
)
t = NewDeployRunner(rs.ir, &repo, deployTask, rs.deployCfg)
}

rs.last = deployTask
Expand Down
52 changes: 50 additions & 2 deletions builder/store/s3/minio.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,64 @@
package s3

import (
"context"
"fmt"
"log/slog"
"net/url"
"time"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"opencsg.com/csghub-server/common/config"
)

func NewMinio(cfg *config.Config) (*minio.Client, error) {
return minio.New(cfg.S3.Endpoint, &minio.Options{
func NewMinio(cfg *config.Config) (*Client, error) {
minioClient, err := minio.New(cfg.S3.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(cfg.S3.AccessKeyID, cfg.S3.AccessKeySecret, ""),
Secure: cfg.S3.EnableSSL,
BucketLookup: minio.BucketLookupAuto,
Region: cfg.S3.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to init s3 client, error:%w", err)
}
client := &Client{
Client: minioClient,
}
if len(cfg.S3.InternalEndpoint) > 0 {
minioClientInternal, err := minio.New(cfg.S3.InternalEndpoint, &minio.Options{
Creds: credentials.NewStaticV4(cfg.S3.AccessKeyID, cfg.S3.AccessKeySecret, ""),
Secure: cfg.S3.EnableSSL,
BucketLookup: minio.BucketLookupAuto,
Region: cfg.S3.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to init s3 internal client, error:%w", err)
}
client.internalClient = minioClientInternal
}
return client, nil
}

type Client struct {
*minio.Client
internalClient *minio.Client
}

// PresignedGetObject is a wrapper around minio.Client.PresignedGetObject that adds some extra customization.
func (c *Client) PresignedGetObject(ctx context.Context, bucketName, objectName string, expires time.Duration, reqParams url.Values) (*url.URL, error) {
if c.useInternalClient(ctx) && c.internalClient != nil {
slog.Info("use internal s3 client for presigned get object", slog.String("bucket_name", bucketName), slog.String("object_name", objectName))
return c.internalClient.PresignedGetObject(ctx, bucketName, objectName, expires, reqParams)
}
return c.Client.PresignedGetObject(ctx, bucketName, objectName, expires, reqParams)
}

func (c *Client) useInternalClient(ctx context.Context) bool {
v, success := ctx.Value("X-OPENCSG-S3-Internal").(bool)
if !success {
return false
}

return v
}
24 changes: 24 additions & 0 deletions builder/store/s3/minio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package s3

import (
"context"
"testing"
)

// test for useInternalClient
func Test_useInternalClient(t *testing.T) {
c := &Client{}
ctx := context.Background()
if c.useInternalClient(ctx) != false {
t.Errorf("should not use internal s3 client if context not defined")
}
ctxWrongValue := context.WithValue(ctx, "X-OPENCSG-S3-Internal", "test")
if c.useInternalClient(ctxWrongValue) != false {
t.Errorf("should not use internal s3 client if context value is not 'true'")
}
ctxWithRightValue := context.WithValue(ctx, "X-OPENCSG-S3-Internal", true)
if c.useInternalClient(ctxWithRightValue) != true {
t.Errorf("should not use internal s3 client if context value is 'true'")
}

}
4 changes: 2 additions & 2 deletions cmd/csghub-server/cmd/git/generate_lfs_meta_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var generateLfsMetaObjectsCmd = &cobra.Command{
},
}

func fetchAllPointersForRepo(config *config.Config, gitServer gitserver.GitServer, s3Client *minio.Client, lfsMetaObjectStore *database.LfsMetaObjectStore, repo database.Repository) error {
func fetchAllPointersForRepo(config *config.Config, gitServer gitserver.GitServer, s3Client *s3.Client, lfsMetaObjectStore *database.LfsMetaObjectStore, repo database.Repository) error {
namespace := strings.Split(repo.Path, "/")[0]
name := strings.Split(repo.Path, "/")[1]
ref := repo.DefaultBranch
Expand Down Expand Up @@ -124,7 +124,7 @@ func fetchAllPointersForRepo(config *config.Config, gitServer gitserver.GitServe
return nil
}

func checkAndUpdateLfsMetaObjects(config *config.Config, s3Client *minio.Client, lfsMetaObjectStore *database.LfsMetaObjectStore, repo database.Repository, pointer *types.Pointer) {
func checkAndUpdateLfsMetaObjects(config *config.Config, s3Client *s3.Client, lfsMetaObjectStore *database.LfsMetaObjectStore, repo database.Repository, pointer *types.Pointer) {
var exists bool
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down
Loading

0 comments on commit e53bb4e

Please sign in to comment.