From cd794cdf4bde20a7ccd9630a6c95424751114d35 Mon Sep 17 00:00:00 2001 From: Adphi Date: Wed, 25 Oct 2023 12:36:28 +0200 Subject: [PATCH] add registry proxy support Signed-off-by: Adphi --- cmd/lkar/repo.go | 6 +- cmd/lkard/main.go | 71 ++++++-- pkg/{repository/repository.go => api/api.go} | 32 ++-- pkg/{storage => }/auth/auth.go | 0 pkg/cache/cache.go | 9 - pkg/cache/default.go | 31 ++++ pkg/registry/blobs.go | 51 ++++++ pkg/registry/manifests.go | 51 ++++++ pkg/registry/options.go | 171 +++++++++++++++++++ pkg/registry/registry.go | 67 ++++++++ pkg/registry/repository.go | 92 ++++++++++ pkg/storage/{client.go => copy.go} | 55 ------ pkg/storage/middleware.go | 2 +- pkg/storage/oci_storage.go | 62 +++---- pkg/storage/oci_storage_test.go | 2 +- pkg/storage/options.go | 31 ++-- 16 files changed, 583 insertions(+), 150 deletions(-) rename pkg/{repository/repository.go => api/api.go} (91%) rename pkg/{storage => }/auth/auth.go (100%) create mode 100644 pkg/cache/default.go create mode 100644 pkg/registry/blobs.go create mode 100644 pkg/registry/manifests.go create mode 100644 pkg/registry/options.go create mode 100644 pkg/registry/registry.go create mode 100644 pkg/registry/repository.go rename pkg/storage/{client.go => copy.go} (58%) diff --git a/cmd/lkar/repo.go b/cmd/lkar/repo.go index 12d79c9..a7b9d57 100644 --- a/cmd/lkar/repo.go +++ b/cmd/lkar/repo.go @@ -25,7 +25,7 @@ import ( "github.com/spf13/cobra" "go.linka.cloud/printer" - repository2 "go.linka.cloud/artifact-registry/pkg/repository" + "go.linka.cloud/artifact-registry/pkg/api" "go.linka.cloud/artifact-registry/pkg/slices" ) @@ -57,7 +57,7 @@ var ( } return errors.New(string(b)) } - var repos []repository2.Repository + var repos []api.Repository if err := json.NewDecoder(res.Body).Decode(&repos); err != nil { return err } @@ -71,7 +71,7 @@ var ( MetadataFiles int64 `json:"metadataFiles" print:"METADATA FILES"` MetadataSize int64 `json:"metadataSize" print:"METADATA SIZE"` } - out := slices.Map(repos, func(v repository2.Repository) Repo { + out := slices.Map(repos, func(v api.Repository) Repo { return Repo{ Image: v.Name + ":" + v.Type, Type: v.Type, diff --git a/cmd/lkard/main.go b/cmd/lkard/main.go index 76c33ad..01ae13d 100644 --- a/cmd/lkard/main.go +++ b/cmd/lkard/main.go @@ -34,8 +34,9 @@ import ( "go.linka.cloud/grpc-toolkit/react" artifact_registry "go.linka.cloud/artifact-registry" + "go.linka.cloud/artifact-registry/pkg/api" "go.linka.cloud/artifact-registry/pkg/packages" - "go.linka.cloud/artifact-registry/pkg/repository" + "go.linka.cloud/artifact-registry/pkg/registry" "go.linka.cloud/artifact-registry/pkg/storage" "go.linka.cloud/artifact-registry/ui" ) @@ -52,11 +53,18 @@ const ( EnvTLSCert = "ARTIFACT_REGISTRY_TLS_CERT" EnvTLSKey = "ARTIFACT_REGISTRY_TLS_KEY" EnvDisableUI = "ARTIFACT_REGISTRY_DISABLE_UI" + + EnvProxy = "ARTIFACT_REGISTRY_PROXY" + EnvProxyNoHTTPS = "ARTIFACT_REGISTRY_PROXY_NO_HTTPS" + EnvProxyInsecure = "ARTIFACT_REGISTRY_PROXY_INSECURE" + EnvProxyClientCA = "ARTIFACT_REGISTRY_PROXY_CLIENT_CA" + EnvProxyUser = "ARTIFACT_REGISTRY_PROXY_USER" + EnvProxyPassword = "ARTIFACT_REGISTRY_PROXY_PASSWORD" ) var ( addr = ":9887" - // backend = "192.168.10.11:5000" + backend = "docker.io" domain = "" @@ -75,6 +83,13 @@ var ( disableUI = false + proxyAddr string + proxyNoHTTPS = false + proxyInsecure = false + proxyClientCA string + proxyUser string + proxyPassword string + cmd = &cobra.Command{ Use: "artifact-registry (repository)", Args: cobra.MaximumNArgs(1), @@ -85,18 +100,16 @@ var ( repo = args[0] } // TODO(adphi): validate host - opts := []storage.Option{storage.WithHost(backend)} + ropts := []registry.Option{ + registry.WithProxy(proxyAddr), + registry.WithProxyUser(proxyUser), + registry.WithProxyPassword(proxyPassword), + } if noHTTPS { - opts = append(opts, storage.WithPlainHTTP()) + ropts = append(ropts, registry.WithPlainHTTP()) } if insecure { - opts = append(opts, storage.WithInsecure()) - } - if tagPerArtifact { - opts = append(opts, storage.WithArtifactTags()) - } - if repo != "" { - opts = append(opts, storage.WithRepo(repo)) + ropts = append(ropts, registry.WithInsecure()) } if clientCA != "" { p := x509.NewCertPool() @@ -107,7 +120,32 @@ var ( if !p.AppendCertsFromPEM(b) { logger.C(cmd.Context()).Fatal(err) } - opts = append(opts, storage.WithClientCA(p)) + ropts = append(ropts, registry.WithClientCA(p)) + } + if proxyNoHTTPS { + ropts = append(ropts, registry.WithProxyPlainHTTP()) + } + if proxyInsecure { + ropts = append(ropts, registry.WithProxyInsecure()) + } + if proxyClientCA != "" { + p := x509.NewCertPool() + b, err := os.ReadFile(proxyClientCA) + if err != nil { + logger.C(cmd.Context()).Fatal(err) + } + if !p.AppendCertsFromPEM(b) { + logger.C(cmd.Context()).Fatal(err) + } + ropts = append(ropts, registry.WithProxyClientCA(p)) + } + opts := []storage.Option{ + storage.WithHost(backend), + storage.WithRepo(repo), + storage.WithRegistryOptions(ropts...), + } + if tagPerArtifact { + opts = append(opts, storage.WithArtifactTags()) } if err := run(cmd.Context(), repo, opts...); err != nil { logger.C(cmd.Context()).Fatal(err) @@ -136,6 +174,13 @@ func main() { cmd.Flags().StringVar(&key, "tls-key", env.Get[string](EnvTLSKey), "tls key [$"+EnvTLSKey+"]") cmd.Flags().BoolVar(&disableUI, "disable-ui", env.GetDefault(EnvDisableUI, disableUI), "disable the Web UI [$"+EnvDisableUI+"]") + cmd.Flags().StringVar(&proxyAddr, "proxy", env.GetDefault(EnvProxy, proxyAddr), "proxy backend registry hostname (and port if not 443 or 80) [$"+EnvProxy+"]") + cmd.Flags().BoolVar(&proxyNoHTTPS, "proxy-no-https", env.GetDefault(EnvProxyNoHTTPS, noHTTPS), "disable proxy registry client https [$"+EnvProxyNoHTTPS+"]") + cmd.Flags().BoolVar(&proxyInsecure, "proxy-insecure", env.GetDefault(EnvProxyInsecure, insecure), "disable proxy registry client tls verification [$"+EnvProxyInsecure+"]") + cmd.Flags().StringVar(&proxyClientCA, "proxy-client-ca", env.Get[string](EnvProxyClientCA), "proxy tls client certificate authority [$"+EnvProxyClientCA+"]") + cmd.Flags().StringVar(&proxyUser, "proxy-user", env.GetDefault(EnvProxyUser, proxyUser), "proxy registry user [$"+EnvProxyUser+"]") + cmd.Flags().StringVar(&proxyPassword, "proxy-password", env.GetDefault(EnvProxyPassword, proxyPassword), "proxy registry password [$"+EnvProxyPassword+"]") + if err := cmd.Execute(); err != nil { os.Exit(1) } @@ -191,7 +236,7 @@ func run(ctx context.Context, repo string, opts ...storage.Option) error { router.Path("/_/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) - if err := repository.Init(ctx, router, domain, repo); err != nil { + if err := api.Init(ctx, router, domain, repo); err != nil { return err } if err := packages.Init(ctx, router, domain, repo); err != nil { diff --git a/pkg/repository/repository.go b/pkg/api/api.go similarity index 91% rename from pkg/repository/repository.go rename to pkg/api/api.go index c92785a..d919f90 100644 --- a/pkg/repository/repository.go +++ b/pkg/api/api.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package repository +package api import ( "context" @@ -29,19 +29,17 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "go.linka.cloud/grpc-toolkit/logger" "golang.org/x/sync/errgroup" - "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry" - cache2 "go.linka.cloud/artifact-registry/pkg/cache" + "go.linka.cloud/artifact-registry/pkg/auth" + "go.linka.cloud/artifact-registry/pkg/cache" "go.linka.cloud/artifact-registry/pkg/packages" "go.linka.cloud/artifact-registry/pkg/slices" "go.linka.cloud/artifact-registry/pkg/storage" - "go.linka.cloud/artifact-registry/pkg/storage/auth" ) const sessionName = "auth" -var cache = cache2.New() - type Stats struct { Size int64 `json:"size"` Count int64 `json:"count"` @@ -62,17 +60,16 @@ type handler struct { func (h *handler) Login(w http.ResponseWriter, r *http.Request) { ctx := auth.Context(r.Context(), r) - o := storage.Options(ctx) name, typ := mux.Vars(r)["repo"], mux.Vars(r)["type"] - if n := storage.Options(ctx).Repo(); name == "" && n != "" { + o := storage.Options(ctx) + if n := o.Repo(); name == "" && n != "" { name = n } - reg, err := remote.NewRegistry(o.Host()) + reg, err := o.NewRegistry(ctx) if err != nil { storage.Error(w, err) return } - o.SetClient(ctx, (*remote.Repository)(®.RepositoryOptions)) if name == "" { skip := errors.New("skip") if err := reg.Repositories(ctx, "", func(r []string) error { @@ -150,17 +147,13 @@ type credentials struct { func (h *handler) Credentials(w http.ResponseWriter, r *http.Request) { u, p, _ := r.BasicAuth() - // if !ok { - // http.Error(w, "No credentials", http.StatusUnauthorized) - // return - // } if err := json.NewEncoder(w).Encode(credentials{User: u, Password: p}); err != nil { storage.Error(w, err) return } } -func listImageRepositories(ctx context.Context, reg *remote.Registry, name string, typ ...string) ([]*Repository, error) { +func listImageRepositories(ctx context.Context, reg registry.Registry, name string, typ ...string) ([]*Repository, error) { repo, err := reg.Repository(ctx, name) if err != nil { if storage.IsNotFound(err) { @@ -204,7 +197,7 @@ func listImageRepositories(ctx context.Context, reg *remote.Registry, name strin } else { m = v.(ocispec.Manifest) } - cache.Set(desc.Digest.String(), m, cache2.WithTTL(5*time.Minute)) + cache.Set(desc.Digest.String(), m, cache.WithTTL(cache.DefaultTTL)) t, err := time.Parse(time.RFC3339, m.Annotations[ocispec.AnnotationCreated]) if err != nil { return err @@ -253,12 +246,11 @@ func (h *handler) ListRepositories(w http.ResponseWriter, r *http.Request) { ctx := auth.Context(r.Context(), r) o := storage.Options(ctx) typ := mux.Vars(r)["type"] - reg, err := remote.NewRegistry(o.Host()) + reg, err := o.NewRegistry(ctx) if err != nil { storage.Error(w, err) return } - o.SetClient(ctx, (*remote.Repository)(®.RepositoryOptions)) var repos []string if err := reg.Repositories(ctx, "", func(r []string) error { repos = append(repos, r...) @@ -298,17 +290,15 @@ func (h *handler) ListRepositories(w http.ResponseWriter, r *http.Request) { func (h *handler) ListImageRepositories(w http.ResponseWriter, r *http.Request) { ctx := auth.Context(r.Context(), r) - o := storage.Options(ctx) name, typ := mux.Vars(r)["repo"], mux.Vars(r)["type"] if n := storage.Options(ctx).Repo(); name == "" && n != "" { name = n } - reg, err := remote.NewRegistry(o.Host()) + reg, err := storage.Options(ctx).NewRegistry(ctx) if err != nil { storage.Error(w, err) return } - o.SetClient(ctx, (*remote.Repository)(®.RepositoryOptions)) out, err := listImageRepositories(ctx, reg, name, typ) if err != nil { storage.Error(w, err) diff --git a/pkg/storage/auth/auth.go b/pkg/auth/auth.go similarity index 100% rename from pkg/storage/auth/auth.go rename to pkg/auth/auth.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 12cbaee..de4be40 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -25,15 +25,6 @@ type item struct { exp time.Time } -func Get[T any](c Cache, key string) (z T, ok bool) { - v, ok := c.Get(key) - if !ok { - return z, false - } - z, ok = v.(T) - return -} - type Option func(*item) func WithTTL(d time.Duration) Option { diff --git a/pkg/cache/default.go b/pkg/cache/default.go new file mode 100644 index 0000000..5da4088 --- /dev/null +++ b/pkg/cache/default.go @@ -0,0 +1,31 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "time" +) + +var ( + DefaultTTL = 5 * time.Minute + d = New() +) + +func Set(key string, value any, opts ...Option) { + d.Set(key, value, opts...) +} +func Get(key string) (any, bool) { + return d.Get(key) +} diff --git a/pkg/registry/blobs.go b/pkg/registry/blobs.go new file mode 100644 index 0000000..6d7621b --- /dev/null +++ b/pkg/registry/blobs.go @@ -0,0 +1,51 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/content" + "oras.land/oras-go/v2/registry" +) + +type BlobStore = registry.BlobStore + +type BlobProxy interface { + registry.ReferenceFetcher + content.Fetcher +} + +type blobs struct { + BlobStore + p BlobProxy +} + +func (b *blobs) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) { + return b.maybeProxy().Fetch(ctx, target) +} + +func (b *blobs) FetchReference(ctx context.Context, reference string) (ocispec.Descriptor, io.ReadCloser, error) { + return b.maybeProxy().FetchReference(ctx, reference) +} + +func (b *blobs) maybeProxy() BlobProxy { + if b.p != nil { + return b.p + } + return b.BlobStore +} diff --git a/pkg/registry/manifests.go b/pkg/registry/manifests.go new file mode 100644 index 0000000..d3a001f --- /dev/null +++ b/pkg/registry/manifests.go @@ -0,0 +1,51 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/content" + "oras.land/oras-go/v2/registry" +) + +type ManifestStore = registry.ManifestStore + +type ManifestProxy interface { + registry.ReferenceFetcher + content.Fetcher +} + +type manifests struct { + ManifestStore + p ManifestProxy +} + +func (m *manifests) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) { + return m.maybeProxy().Fetch(ctx, target) +} + +func (m *manifests) FetchReference(ctx context.Context, reference string) (ocispec.Descriptor, io.ReadCloser, error) { + return m.maybeProxy().FetchReference(ctx, reference) +} + +func (m *manifests) maybeProxy() ManifestProxy { + if m.p != nil { + return m.p + } + return m.ManifestStore +} diff --git a/pkg/registry/options.go b/pkg/registry/options.go new file mode 100644 index 0000000..d52838f --- /dev/null +++ b/pkg/registry/options.go @@ -0,0 +1,171 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" + + auth2 "go.linka.cloud/artifact-registry/pkg/auth" + "go.linka.cloud/artifact-registry/pkg/cache" +) + +const clientID = "lk-artifact-registry" + +var clientCache = cache.New() + +type Option func(*options) + +func makeOptions(host string, opts ...Option) options { + o := options{ + clientID: clientID, + host: host, + proxy: &options{ + clientID: clientID, + creds: &creds{}, + }, + } + for _, opt := range opts { + opt(&o) + } + return o +} + +type options struct { + host string + plainHTTP bool + insecure bool + clientID string + clientCA *x509.CertPool + + // creds are valid only for proxy + creds *creds + + proxy *options +} + +type creds struct { + user, password string +} + +func (o options) apply(ctx context.Context, r *remote.Repository) { + var u, p string + if o.creds == nil { + if a := auth2.FromContext(ctx); a != nil { + u, p, _ = a.BasicAuth() + } + } else { + u, p = o.creds.user, o.creds.password + } + h := sha256.New() + h.Write([]byte(u)) + h.Write([]byte(p)) + h.Write([]byte(o.host)) + key := fmt.Sprintf("%x", h.Sum(nil)) + if v, ok := clientCache.Get(key); ok { + clientCache.Set(key, v) + r.Client = v.(remote.Client) + r.PlainHTTP = o.plainHTTP + return + } + c := &auth.Client{ + ClientID: o.clientID, + Client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: o.insecure, + ClientCAs: o.clientCA, + }, + }, + }, + // expectedHostAddress is of form ipaddr:port + Credential: auth.StaticCredential(o.host, auth.Credential{ + Username: u, + Password: p, + }), + // Cache caches credentials for accessing the remote registry. + Cache: auth.NewCache(), + } + clientCache.Set(key, c) + r.Client = c + r.PlainHTTP = o.plainHTTP +} + +func WithClientID(id string) Option { + return func(o *options) { + o.clientID = id + } +} + +func WithPlainHTTP() Option { + return func(o *options) { + o.plainHTTP = true + } +} + +func WithInsecure() Option { + return func(o *options) { + o.insecure = true + } +} + +func WithClientCA(clientCA *x509.CertPool) Option { + return func(o *options) { + o.clientCA = clientCA + } +} + +func WithProxy(host string) Option { + return func(o *options) { + o.proxy.host = host + } +} + +func WithProxyPlainHTTP() Option { + return func(o *options) { + o.proxy.plainHTTP = true + } +} + +func WithProxyInsecure() Option { + return func(o *options) { + o.proxy.insecure = true + } +} + +func WithProxyClientCA(clientCA *x509.CertPool) Option { + return func(o *options) { + o.proxy.clientCA = clientCA + } +} + +func WithProxyUser(user string) Option { + return func(o *options) { + o.proxy.creds.user = user + } +} + +func WithProxyPassword(password string) Option { + return func(o *options) { + o.proxy.creds.password = password + } +} diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go new file mode 100644 index 0000000..613eb20 --- /dev/null +++ b/pkg/registry/registry.go @@ -0,0 +1,67 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + + "oras.land/oras-go/v2/registry" + "oras.land/oras-go/v2/registry/remote" +) + +type Registry = registry.Registry + +func NewRegistry(ctx context.Context, name string, opts ...Option) (Registry, error) { + r, err := remote.NewRegistry(name) + if err != nil { + return nil, err + } + o := makeOptions(r.RepositoryOptions.Reference.Host(), opts...) + o.apply(ctx, (*remote.Repository)(&r.RepositoryOptions)) + var proxy Registry + if o.proxy.host != "" { + p, err := remote.NewRegistry(o.proxy.host) + if err != nil { + return nil, err + } + o.proxy.apply(ctx, (*remote.Repository)(&p.RepositoryOptions)) + proxy = p + } + return ®{r: r, p: proxy}, nil +} + +type reg struct { + r Registry + p Registry +} + +func (r *reg) Repositories(ctx context.Context, last string, fn func(repos []string) error) error { + return r.r.Repositories(ctx, last, fn) +} + +func (r *reg) Repository(ctx context.Context, name string) (Repository, error) { + rep, err := r.r.Repository(ctx, name) + if err != nil { + return nil, err + } + var proxy Repository + if r.p != nil { + proxy, err = r.p.Repository(ctx, name) + if err != nil { + return nil, err + } + } + return &repo{Repository: rep, p: proxy}, nil +} diff --git a/pkg/registry/repository.go b/pkg/registry/repository.go new file mode 100644 index 0000000..ed96878 --- /dev/null +++ b/pkg/registry/repository.go @@ -0,0 +1,92 @@ +// Copyright 2023 Linka Cloud All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/content" + "oras.land/oras-go/v2/registry" + "oras.land/oras-go/v2/registry/remote" +) + +type Repository = registry.Repository + +type RepositoryProxy interface { + registry.ReferenceFetcher + content.Fetcher +} + +func NewRepository(ctx context.Context, reference string, opts ...Option) (Repository, error) { + r, err := remote.NewRepository(reference) + if err != nil { + return nil, err + } + o := makeOptions(r.Reference.Host(), opts...) + o.apply(ctx, r) + var proxy registry.Repository + if o.proxy.host != "" { + ref, err := registry.ParseReference(reference) + if err != nil { + return nil, err + } + ref.Registry = o.proxy.host + p, err := remote.NewRepository(ref.String()) + if err != nil { + return nil, err + } + o.proxy.apply(ctx, p) + proxy = p + } + return &repo{Repository: r, p: proxy}, nil +} + +type repo struct { + Repository + p Repository +} + +func (r *repo) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) { + return r.maybeProxy().Fetch(ctx, target) +} + +func (r *repo) FetchReference(ctx context.Context, reference string) (ocispec.Descriptor, io.ReadCloser, error) { + return r.maybeProxy().FetchReference(ctx, reference) +} + +func (r *repo) Blobs() BlobStore { + b := blobs{BlobStore: r.Repository.Blobs()} + if r.p != nil { + b.p = r.p.Blobs() + } + return &b +} + +func (r *repo) Manifests() ManifestStore { + m := &manifests{ManifestStore: r.Repository.Manifests()} + if r.p != nil { + m.p = r.p.Manifests() + } + return m +} + +func (r *repo) maybeProxy() RepositoryProxy { + if r.p != nil { + return r.p + } + return r.Repository +} diff --git a/pkg/storage/client.go b/pkg/storage/copy.go similarity index 58% rename from pkg/storage/client.go rename to pkg/storage/copy.go index 4008977..bc2ba3b 100644 --- a/pkg/storage/client.go +++ b/pkg/storage/copy.go @@ -16,10 +16,6 @@ package storage import ( "context" - "crypto/sha256" - "crypto/tls" - "fmt" - "net/http" "runtime" "sync" "time" @@ -28,20 +24,8 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "go.linka.cloud/grpc-toolkit/logger" "oras.land/oras-go/v2" - "oras.land/oras-go/v2/registry/remote" - "oras.land/oras-go/v2/registry/remote/auth" - - cache2 "go.linka.cloud/artifact-registry/pkg/cache" - auth2 "go.linka.cloud/artifact-registry/pkg/storage/auth" -) - -const ( -// plainHTTP = false -// plainHTTP = true ) -var clientCache = cache2.New() - func copts(name string) oras.CopyOptions { var times sync.Map return oras.CopyOptions{ @@ -80,42 +64,3 @@ func copts(name string) oras.CopyOptions { }, } } - -func (o options) SetClient(ctx context.Context, reg *remote.Repository) { - // we might need an auth client even if we do not have credentials to authenticate anonymous access - var u, p string - if a := auth2.FromContext(ctx); a != nil { - u, p, _ = a.BasicAuth() - } - h := sha256.New() - h.Write([]byte(u)) - h.Write([]byte(p)) - h.Write([]byte(o.host)) - key := fmt.Sprintf("%x", h.Sum(nil)) - if v, ok := clientCache.Get(key); ok { - clientCache.Set(key, v) - reg.Client = v.(remote.Client) - reg.PlainHTTP = o.plainHTTP - return - } - c := &auth.Client{ - Client: &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: o.insecure, - ClientCAs: o.clientCA, - }, - }, - }, - // expectedHostAddress is of form ipaddr:port - Credential: auth.StaticCredential(o.host, auth.Credential{ - Username: u, - Password: p, - }), - // Cache caches credentials for accessing the remote registry. - Cache: auth.NewCache(), - } - clientCache.Set(key, c) - reg.Client = c - reg.PlainHTTP = o.plainHTTP -} diff --git a/pkg/storage/middleware.go b/pkg/storage/middleware.go index 5a02e09..f11e492 100644 --- a/pkg/storage/middleware.go +++ b/pkg/storage/middleware.go @@ -20,7 +20,7 @@ import ( "github.com/gorilla/mux" "go.linka.cloud/grpc-toolkit/logger" - "go.linka.cloud/artifact-registry/pkg/storage/auth" + "go.linka.cloud/artifact-registry/pkg/auth" ) type MiddlewareFunc = func(repoVar string) mux.MiddlewareFunc diff --git a/pkg/storage/oci_storage.go b/pkg/storage/oci_storage.go index 2bd9916..a183cb0 100644 --- a/pkg/storage/oci_storage.go +++ b/pkg/storage/oci_storage.go @@ -26,7 +26,6 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -34,22 +33,17 @@ import ( "oras.land/oras-go/v2" "oras.land/oras-go/v2/content/file" "oras.land/oras-go/v2/errdef" - "oras.land/oras-go/v2/registry/remote" - cache2 "go.linka.cloud/artifact-registry/pkg/cache" + "go.linka.cloud/artifact-registry/pkg/cache" "go.linka.cloud/artifact-registry/pkg/crypt/aes" + "go.linka.cloud/artifact-registry/pkg/registry" "go.linka.cloud/artifact-registry/pkg/slices" ) -var ( - cache = cache2.New() - ttl = 5 * time.Minute -) - type storage struct { opts options name string - rrepo *remote.Repository + rrepo registry.Repository ref string repo Repository key string @@ -64,14 +58,14 @@ func NewStorage(ctx context.Context, name string, repo Repository) (Storage, err } name = opts.repo } - name = opts.host + "/" + strings.TrimSuffix(name, "/") - ref := name + ":" + repo.Name() - tmp, err := os.MkdirTemp(os.TempDir(), "lk-artifact-registry-"+repo.Name()) + rname := opts.host + "/" + strings.TrimSuffix(name, "/") + ref := rname + ":" + repo.Name() + tmp, err := os.MkdirTemp(os.TempDir(), fmt.Sprintf("lk-artifact-registry-%s-", repo.Name())) if err != nil { return nil, err } r := &storage{ - name: name, + name: rname, repo: repo, ref: ref, tmp: tmp, @@ -82,7 +76,7 @@ func NewStorage(ctx context.Context, name string, repo Repository) (Storage, err r.Close() } }() - r.rrepo, err = r.newRepository(ctx, name) + r.rrepo, err = opts.NewRepository(ctx, rname) if err != nil { return nil, err } @@ -178,7 +172,7 @@ func (s *storage) Write(ctx context.Context, pkg Artifact) error { if err := store.Tag(ctx, img, img.Digest.String()); err != nil { return err } - rrepo, err := s.newRepository(ctx, repo) + rrepo, err := s.opts.NewRepository(ctx, repo) if err != nil { return err } @@ -219,7 +213,7 @@ func (s *storage) Delete(ctx context.Context, name string) error { if s.opts.artifactTags { repo := s.artifactName(pkg) ref := strings.NewReplacer("~", "-", "+", "-").Replace(repo + ":" + defaults(pkg.Version(), "latest")) - rrepo, err := s.newRepository(ctx, repo) + rrepo, err := s.opts.NewRepository(ctx, repo) if err != nil { return err } @@ -325,15 +319,6 @@ func (s *storage) Close() error { return os.RemoveAll(s.tmp) } -func (s *storage) newRepository(ctx context.Context, name string) (*remote.Repository, error) { - rrepo, err := remote.NewRepository(name) - if err != nil { - return nil, err - } - s.opts.SetClient(ctx, rrepo) - return rrepo, nil -} - func (s *storage) updateIndex(ctx context.Context, store *file.Store, m ocispec.Manifest, pkgs []Artifact, layers []ocispec.Descriptor) error { pvn, pbn := s.repo.KeyNames() for i := range m.Layers { @@ -356,8 +341,25 @@ func (s *storage) updateIndex(ctx context.Context, store *file.Store, m ocispec. if err != nil { return err } + i := make(map[string]string) + for _, v := range append(pkgs, files...) { + i[v.Path()] = v.Digest().String() + } + ib, err := json.Marshal(i) + if err != nil { + return fmt.Errorf("failed to marshal packages: %w", err) + } + cfg := ocispec.Descriptor{ + MediaType: s.MediaTypeIndexConfig(), + Digest: digest.FromBytes(ib), + Size: int64(len(ib)), + } + if err := store.Push(ctx, cfg, bytes.NewReader(ib)); err != nil { + return err + } opts := oras.PackManifestOptions{ - Layers: layers, + ConfigDescriptor: &cfg, + Layers: layers, } for _, v := range files { l := ocispec.Descriptor{ @@ -380,7 +382,6 @@ func (s *storage) updateIndex(ctx context.Context, store *file.Store, m ocispec. if err := store.Tag(ctx, img, img.Digest.String()); err != nil { return err } - // TODO(adphi): update only manifest img, err = oras.Copy(ctx, store, img.Digest.String(), s.rrepo, s.ref, copts(s.ref)) if err != nil { return err @@ -445,7 +446,7 @@ func (s *storage) manifest(ctx context.Context) (m ocispec.Manifest, err error) } if v, ok := cache.Get(desc.Digest.String()); ok { // reset ttl - cache.Set(desc.Digest.String(), v, cache2.WithTTL(ttl)) + cache.Set(desc.Digest.String(), v, cache.WithTTL(cache.DefaultTTL)) return v.(ocispec.Manifest), nil } logger.C(ctx).Infof("retrieve manifest %s", desc.Digest.String()) @@ -461,7 +462,7 @@ func (s *storage) manifest(ctx context.Context) (m ocispec.Manifest, err error) if m.ArtifactType != s.ArtefactTypeRegistry() { return m, fmt.Errorf("%w: %s", ErrInvalidArtifactType, m.MediaType) } - cache.Set(desc.Digest.String(), m, cache2.WithTTL(ttl)) + cache.Set(desc.Digest.String(), m, cache.WithTTL(cache.DefaultTTL)) return m, nil } @@ -512,6 +513,9 @@ func (s *storage) artifactName(a Artifact) string { func (s *storage) ArtefactTypeRegistry() string { return "application/vnd.lk.registry+" + s.repo.Name() } +func (s *storage) MediaTypeIndexConfig() string { + return "application/vnd.lk.registry.index.config.v1." + s.repo.Name() + "+json" +} func (s *storage) MediaTypeArtifactConfig() string { return "application/vnd.lk.registry.config.v1." + s.repo.Name() + "+" + s.repo.Codec().Name() } diff --git a/pkg/storage/oci_storage_test.go b/pkg/storage/oci_storage_test.go index 07cd65a..435498d 100644 --- a/pkg/storage/oci_storage_test.go +++ b/pkg/storage/oci_storage_test.go @@ -37,10 +37,10 @@ import ( "go.linka.cloud/grpc-toolkit/logger" "oras.land/oras-go/v2/registry/remote" + "go.linka.cloud/artifact-registry/pkg/auth" "go.linka.cloud/artifact-registry/pkg/codec" "go.linka.cloud/artifact-registry/pkg/crypt/aes" "go.linka.cloud/artifact-registry/pkg/slices" - "go.linka.cloud/artifact-registry/pkg/storage/auth" ) const ( diff --git a/pkg/storage/options.go b/pkg/storage/options.go index 187b5a1..99598b0 100644 --- a/pkg/storage/options.go +++ b/pkg/storage/options.go @@ -16,7 +16,8 @@ package storage import ( "context" - "crypto/x509" + + "go.linka.cloud/artifact-registry/pkg/registry" ) type optionsKey struct{} @@ -38,10 +39,8 @@ type options struct { host string key []byte repo string - plainHTTP bool - insecure bool artifactTags bool - clientCA *x509.CertPool + ropts []registry.Option } func (o options) Host() string { @@ -56,6 +55,14 @@ func (o options) Key() []byte { return o.key } +func (o options) NewRegistry(ctx context.Context) (registry.Registry, error) { + return registry.NewRegistry(ctx, o.host, o.ropts...) +} + +func (o options) NewRepository(ctx context.Context, name string) (registry.Repository, error) { + return registry.NewRepository(ctx, name, o.ropts...) +} + type Option func(o *options) func WithHost(host string) Option { @@ -76,26 +83,14 @@ func WithRepo(repo string) Option { } } -func WithPlainHTTP() Option { - return func(o *options) { - o.plainHTTP = true - } -} - -func WithInsecure() Option { - return func(o *options) { - o.insecure = true - } -} - func WithArtifactTags() Option { return func(o *options) { o.artifactTags = true } } -func WithClientCA(clientCA *x509.CertPool) Option { +func WithRegistryOptions(opts ...registry.Option) Option { return func(o *options) { - o.clientCA = clientCA + o.ropts = opts } }