Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update resource director #2243

Merged
merged 4 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions client/dfget/dfget.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,14 @@ func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemonv1.

_url, err := url.Parse(cfg.URL)
if err == nil {
inj, ok := source.ShouldInjectAuthInfo(_url.Scheme)
director, ok := source.HasDirector(_url.Scheme)
if ok {
err = inj.Inject(_url, request.UrlMeta)
if err != nil {
logger.Errorf("inject auth info error: %s", err)
err = director.Direct(_url, request.UrlMeta)
if err == nil {
// write back new url
request.Url = _url.String()
} else {
logger.Errorf("direct resource error: %s", err)
}
}
}
Expand Down
161 changes: 116 additions & 45 deletions pkg/source/clients/orasprotocol/oras_source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package orasprotocol

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -39,10 +40,12 @@ const (
scheme = "oras"
configFilePath = "/.singularity/docker-config.json"

authHeader = "X-Dragonfly-Oras-Authorization"
authHeader = "X-Dragonfly-Oras-Authorization"
tokenHeader = "X-Dragonfly-Oras-Token"
blobDigest = "digest"
)

var _ source.ResourceClient = (*orasSourceClient)(nil)
var client *orasSourceClient

type Blob struct {
Digest string `json:"digest"`
Expand All @@ -53,9 +56,13 @@ type Manifest struct {
}

func init() {
client = &orasSourceClient{
httpClient: http.DefaultClient,
}

source.RegisterBuilder(scheme,
source.NewPlainResourceClientBuilder(Builder),
source.WithAuthInfoInjector(source.NewPlainAuthInfoInjector(AuthInfoInjector)))
source.WithDirector(source.NewPlainDirector(Director)))
}

func Builder(optionYaml []byte) (source.ResourceClient, source.RequestAdapter, []source.Hook, error) {
Expand All @@ -64,19 +71,53 @@ func Builder(optionYaml []byte) (source.ResourceClient, source.RequestAdapter, [
if err != nil {
return nil, nil, nil, err
}
client := &orasSourceClient{
httpClient: httpClient,
}
client.httpClient = httpClient
return client, client.adaptor, nil, nil
}

func AuthInfoInjector(_url *url.URL, urlMeta *commonv1.UrlMeta) error {
auth, err := fetchAuthInfo(_url.Host, false)
func Director(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error {
// 1. fetch auth info from local user
auth, err := fetchAuthInfo(rawURL.Host, false)
if err != nil {
return err
}
var authHdr string
if auth != "" {
authHdr = "Basic " + auth
}

path, tag, err := parseURL(rawURL.Path)
if err != nil {
return err
}

ctx := context.TODO()
host := rawURL.Host

// 2. fetch token with auth info
tokenFetchURL := formatTokenURL(host, path)
token, err := client.fetchTokenWithHeader(ctx, authHdr, tokenFetchURL)
if err != nil {
return err
}

urlMeta.Header[authHeader] = "Basic " + auth
// 3. fetch manifest digest, normal is sha256
digest, err := client.fetchManifest(ctx, host, token, path, tag)
if err != nil {
return err
}

// 4. update unique blob digest in url
values := rawURL.Query()
values.Set(blobDigest, digest)
rawURL.RawQuery = values.Encode()

// 5. update digest for peer data check
urlMeta.Digest = digest

// 6. update token in header
urlMeta.Header[tokenHeader] = token

return nil
}

Expand All @@ -102,22 +143,40 @@ func (client *orasSourceClient) IsExpired(request *source.Request, info *source.
}

func (client *orasSourceClient) Download(request *source.Request) (*source.Response, error) {
path, tag, err := parseURL(request, request.URL.Path)
ctx := request.Context()
host := request.URL.Host

path, tag, err := parseURL(request.URL.Path)
if err != nil {
return nil, err
}

token, err := client.fetchToken(request, path)
var (
digest string
token string
)

// if there is blob sha256 and token, just goto fetch image
if digestQuery, ok1 := request.URL.Query()[blobDigest]; ok1 && len(digestQuery) > 0 {
if tokenHdr, ok2 := request.Header[tokenHeader]; ok2 && len(tokenHdr) > 0 {
digest = digestQuery[0]
token = tokenHdr[0]
goto fetch
}
}

token, err = client.fetchToken(request, path)
if err != nil {
return nil, err
}

sha256, err := client.fetchManifest(request, token, path, tag)
digest, err = client.fetchManifest(ctx, host, token, path, tag)
if err != nil {
return nil, err
}

imageFetchResponse, err := client.fetchImage(request, token, sha256, path, tag)
fetch:
imageFetchResponse, err := client.fetchImage(ctx, host, token, path, digest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -154,53 +213,61 @@ func fetchAuthInfo(host string, skipCheckExist bool) (string, error) {
}

func (client *orasSourceClient) fetchToken(request *source.Request, path string) (string, error) {
var response *http.Response
var err error
tokenFetchURL := fmt.Sprintf("https://%s/service/token/?scope=repository:%s:pull&service=harbor-registry", request.URL.Host, path)
if authHeaderVal := request.Header.Get(authHeader); authHeaderVal != "" {
tokenFetchURL := formatTokenURL(request.URL.Host, path)

var authHeaderVal string

if authHeaderVal = request.Header.Get(authHeader); authHeaderVal != "" {
// remove the internal auth header
request.Header.Del(authHeader)
response, err = client.doRequest(request, jsonAcceptHeader, authHeaderVal, tokenFetchURL)
if err != nil {
return "", err
}
} else if fileExists(os.Getenv("HOME") + configFilePath) {
var auth string
auth, err = fetchAuthInfo(request.URL.Host, true)
auth, err := fetchAuthInfo(request.URL.Host, true)
if err != nil {
return "", err
}
authHeaderVal = "Basic " + auth
response, err = client.doRequest(request, jsonAcceptHeader, authHeaderVal, tokenFetchURL)
if err != nil {
return "", err
}
} else {
response, err = client.doRequest(request, "", "", tokenFetchURL)
if err != nil {
return "", err
}
}

token, err := client.fetchTokenWithHeader(request.Context(), authHeaderVal, tokenFetchURL)
if err != nil {
return "", err
}
logger.Info(fmt.Sprintf("fetching token for %s successfully", request.URL))
return token, nil
}

func (client *orasSourceClient) fetchTokenWithHeader(ctx context.Context, authHeaderVal, tokenFetchURL string) (string, error) {
// FIXME always jsonAcceptHeader ?
var acceptHeader string
if authHeaderVal != "" {
acceptHeader = jsonAcceptHeader
}

response, err := client.doRequest(ctx, acceptHeader, authHeaderVal, tokenFetchURL)
if err != nil {
return "", err
}
token, err := io.ReadAll(response.Body)
if err != nil {
return "", err
}
defer response.Body.Close()

var tokenData map[string]interface{}
if err = json.Unmarshal(token, &tokenData); err != nil {
return "", err
}
logger.Info(fmt.Sprintf("fetching token for %s successful", request.URL))

tokenVal := fmt.Sprintf("%v", tokenData["token"])
return tokenVal, nil
}

func (client *orasSourceClient) fetchManifest(request *source.Request, accessToken, path, tag string) (string, error) {
func (client *orasSourceClient) fetchManifest(ctx context.Context, host, accessToken, path, tag string) (string, error) {
var sha string
var blobLayers Manifest
manifestFetchURL := fmt.Sprintf("https://%s/v2/%s/manifests/%s", request.URL.Host, path, tag)
manifestFetchURL := fmt.Sprintf("https://%s/v2/%s/manifests/%s", host, path, tag)
authHeaderVal := "Bearer " + accessToken
resp, err := client.doRequest(request, ociAcceptHeader, authHeaderVal, manifestFetchURL)
resp, err := client.doRequest(ctx, ociAcceptHeader, authHeaderVal, manifestFetchURL)
if err != nil {
return "", err
}
Expand All @@ -209,32 +276,32 @@ func (client *orasSourceClient) fetchManifest(request *source.Request, accessTok
if err != nil {
return "", err
}
if err := json.Unmarshal(manifest, &blobLayers); err != nil {
if err = json.Unmarshal(manifest, &blobLayers); err != nil {
return "", err
}
for _, value := range blobLayers.Layers {
sha = value.Digest
}
if sha != "" {
logger.Info(fmt.Sprintf("fetching manifests for %s successful", request.URL))
logger.Info(fmt.Sprintf("fetching manifests for %s/%s:%s successfully", host, path, tag))
return sha, nil
}
return "", errors.New("manifest is empty")
}

func (client *orasSourceClient) fetchImage(request *source.Request, token, sha256, path, tag string) (*source.Response, error) {
imageFetchURL := fmt.Sprintf("https://%s/v2/%s/blobs/%s", request.URL.Host, path, sha256)
func (client *orasSourceClient) fetchImage(ctx context.Context, host, token, path, sha256 string) (*source.Response, error) {
imageFetchURL := fmt.Sprintf("https://%s/v2/%s/blobs/%s", host, path, sha256)
authHeaderVal := "Bearer " + token
resp, err := client.doRequest(request, ociAcceptHeader, authHeaderVal, imageFetchURL)
resp, err := client.doRequest(ctx, ociAcceptHeader, authHeaderVal, imageFetchURL)
if err != nil {
return nil, errors.New("failed to fetch image")
}
logger.Info(fmt.Sprintf("Fetched %s image successfully", request.URL))
logger.Info(fmt.Sprintf("Fetched image %s/%s with digest %s successfully", host, path, sha256))
return source.NewResponse(resp.Body), nil
}

func (client *orasSourceClient) doRequest(request *source.Request, acceptHeaderVal, authHeaderVal, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(request.Context(), http.MethodGet, url, nil)
func (client *orasSourceClient) doRequest(ctx context.Context, acceptHeaderVal, authHeaderVal, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
Expand All @@ -257,7 +324,7 @@ func fileExists(filepath string) bool {
return true
}

func parseURL(request *source.Request, urlPath string) (string, string, error) {
func parseURL(urlPath string) (string, string, error) {
parseURLPattern, err := regexp.Compile("(.*):(.*)")
if err != nil {
return "", "", err
Expand All @@ -274,3 +341,7 @@ func parseURL(request *source.Request, urlPath string) (string, string, error) {
func (client *orasSourceClient) GetLastModified(request *source.Request) (int64, error) {
panic("implement me")
}

func formatTokenURL(host, path string) string {
return fmt.Sprintf("https://%s/service/token/?scope=repository:%s:pull&service=harbor-registry", host, path)
}
34 changes: 18 additions & 16 deletions pkg/source/source_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var (
resourceClientBuilder = map[string]ResourceClientBuilder{}
resourceClientOptions = map[string]interface{}{}
resourceAuthInjector = map[string]AuthInfoInjector{}
resourceDirector = map[string]Director{}
)

// ResourceClientBuilder is used to build resource client with custom option
Expand All @@ -37,9 +37,11 @@ type ResourceClientBuilder interface {
Build(optionYaml []byte) (resourceClient ResourceClient, adaptor RequestAdapter, hooks []Hook, err error)
}

// AuthInfoInjector will inject auth information for target url and metadata, eg: fetch docker config for different users
type AuthInfoInjector interface {
Inject(_url *url.URL, urlMeta *commonv1.UrlMeta) error
// Director will handle request with some actions, like:
// 1. inject auth information for target url and metadata, eg: fetch docker config for different users
// 2. rewrite a common request into an unique request, eg: oras://harbor/user:latest to oras://harbor/user:lastest?digest=sha256:12345
type Director interface {
Direct(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error
}

// RegisterOption is used for extra options when registering, like mark target scheme protocol should inject auth information
Expand All @@ -57,15 +59,15 @@ func RegisterBuilder(scheme string, builder ResourceClientBuilder, opts ...Regis
}
}

func WithAuthInfoInjector(inj AuthInfoInjector) RegisterOption {
func WithDirector(director Director) RegisterOption {
return func(scheme string) {
resourceAuthInjector[scheme] = inj
resourceDirector[scheme] = director
}
}

func ShouldInjectAuthInfo(scheme string) (AuthInfoInjector, bool) {
inj, ok := resourceAuthInjector[scheme]
return inj, ok
func HasDirector(scheme string) (Director, bool) {
director, ok := resourceDirector[scheme]
return director, ok
}

func UnRegisterBuilder(scheme string) {
Expand Down Expand Up @@ -116,15 +118,15 @@ func NewPlainResourceClientBuilder(
return &plainResourceClientBuilder{build: build}
}

type plainAuthInfoInjector struct {
inject func(url *url.URL, urlMeta *commonv1.UrlMeta) error
type plainDirector struct {
direct func(url *url.URL, urlMeta *commonv1.UrlMeta) error
}

func (a *plainAuthInfoInjector) Inject(_url *url.URL, urlMeta *commonv1.UrlMeta) error {
return a.inject(_url, urlMeta)
func (a *plainDirector) Direct(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error {
return a.direct(rawURL, urlMeta)
}

func NewPlainAuthInfoInjector(
inject func(url *url.URL, urlMeta *commonv1.UrlMeta) error) AuthInfoInjector {
return &plainAuthInfoInjector{inject: inject}
func NewPlainDirector(
direct func(url *url.URL, urlMeta *commonv1.UrlMeta) error) Director {
return &plainDirector{direct: direct}
}