Skip to content

Commit

Permalink
Allow for "automatic content discovery" on mount
Browse files Browse the repository at this point in the history
As described in opencontainers/distribution-spec#275,
this adds, and allows for "automatic content discovery" on mount if the from
field is ommitted. This implementation does not provide and authz functionality,
so it should be only be used / implemented, if the registry has no cross-repo
authz.

Signed-off-by: Sargun Dhillon <sargun@sargun.me>
  • Loading branch information
sargun committed Sep 16, 2021
1 parent bc27e4e commit 43f2781
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 73 deletions.
57 changes: 49 additions & 8 deletions blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/distribution/distribution/v3/reference"

"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand Down Expand Up @@ -45,15 +46,29 @@ func (err ErrBlobInvalidDigest) Error() string {
// ErrBlobMounted returned when a blob is mounted from another repository
// instead of initiating an upload session.
type ErrBlobMounted struct {
From reference.Canonical
Descriptor Descriptor
}

func (err ErrBlobMounted) Error() string {
return fmt.Sprintf("blob mounted to: %v", err.Descriptor)
}

// ErrBlobMountedFrom returned when a blob is mounted from another repository
// instead of initiating an upload session.type ErrBlobMounted struct {
type ErrBlobMountedFrom struct {
ErrBlobMounted
From reference.Canonical
}

func (err ErrBlobMountedFrom) Error() string {
return fmt.Sprintf("blob mounted from: %v to: %v",
err.From, err.Descriptor)
}

func (err ErrBlobMountedFrom) Unwrap() error {
return err.ErrBlobMounted
}

// Descriptor describes targeted content. Used in conjunction with a blob
// store, a descriptor can be used to fetch, store and target any kind of
// blob. The struct also describes the wire protocol format. Fields should
Expand Down Expand Up @@ -203,13 +218,39 @@ type BlobCreateOption interface {
// CreateOptions is a collection of blob creation modifiers relevant to general
// blob storage intended to be configured by the BlobCreateOption.Apply method.
type CreateOptions struct {
Mount struct {
ShouldMount bool
From reference.Canonical
// Stat allows to pass precalculated descriptor to link and return.
// Blob access check will be skipped if set.
Stat *Descriptor
}
Mount Mount
}

type StatMount struct {
// Stat allows to pass precalculated descriptor to link and return.
// Blob access check will be skipped if set.
Stat *Descriptor
}

func (s StatMount) Digest() digest.Digest {
return s.Stat.Digest
}

type DigestMount struct {
// BlobDigest is only checked if automatic content discovery is enabled
BlobDigest digest.Digest
}

func (d DigestMount) Digest() digest.Digest {
return d.BlobDigest
}

type FromMount struct {
// From is optional if automatic content discovery is enabled
From reference.Canonical
}

func (f FromMount) Digest() digest.Digest {
return f.From.Digest()
}

type Mount interface {
Digest() digest.Digest
}

// BlobWriter provides a handle for inserting data into a blob store.
Expand Down
4 changes: 4 additions & 0 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ type Configuration struct {
Classes []string `yaml:"classes"`
} `yaml:"repository,omitempty"`
} `yaml:"policy,omitempty"`

// AutomaticContentDiscovery enables automatic content discovery options for the registry.
// It must only be used with auth disabled.
AutomaticContentDiscovery bool `yaml:"automaticcontentdiscovery,omitempty"`
}

// LogHook is composed of hook Level and Type.
Expand Down
12 changes: 12 additions & 0 deletions notifications/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor,
return b.sink.Write(*event)
}

func (b *bridge) BlobMountedAutomaticContentDiscovery(repo reference.Named, desc distribution.Descriptor) error {
event, err := b.createBlobEvent(EventActionMount, repo, desc)
if err != nil {
return err
}

t := true
event.Target.AutomaticContentDiscovery = &t

return b.sink.Write(*event)
}

func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
}
Expand Down
4 changes: 4 additions & 0 deletions notifications/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type Event struct {
// from if appropriate.
FromRepository string `json:"fromRepository,omitempty"`

// AutomaticContentDiscovery identifies that the blob was mounted via
// automatic content discovery, and thus the "source" repo is unspecified
AutomaticContentDiscovery *bool `json:"automaticContentDiscovery,omitempty"`

// URL provides a direct link to the content.
URL string `json:"url,omitempty"`

Expand Down
8 changes: 7 additions & 1 deletion notifications/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type BlobListener interface {
BlobPushed(repo reference.Named, desc distribution.Descriptor) error
BlobPulled(repo reference.Named, desc distribution.Descriptor) error
BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
BlobMountedAutomaticContentDiscovery(repo reference.Named, desc distribution.Descriptor) error
BlobDeleted(repo reference.Named, desc digest.Digest) error
}

Expand Down Expand Up @@ -191,11 +192,16 @@ func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []b
func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
wr, err := bsl.BlobStore.Create(ctx, options...)
switch err := err.(type) {
case distribution.ErrBlobMounted:
case distribution.ErrBlobMountedFrom:
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
}
return nil, err
case distribution.ErrBlobMounted:
if err := bsl.parent.listener.BlobMountedAutomaticContentDiscovery(bsl.parent.Repository.Named(), err.Descriptor); err != nil {
dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
}
return nil, err
}
return bsl.decorateWriter(wr), err
}
Expand Down
5 changes: 5 additions & 0 deletions notifications/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Desc
return nil
}

func (tl *testListener) BlobMountedAutomaticContentDiscovery(repo reference.Named, desc distribution.Descriptor) error {
tl.ops["layer:automount"]++
return nil
}

func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error {
tl.ops["layer:delete"]++
return nil
Expand Down
28 changes: 22 additions & 6 deletions registry/client/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,9 @@ func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
return fmt.Errorf("unexpected options type: %T", v)
}

opts.Mount.ShouldMount = true
opts.Mount.From = ref
opts.Mount = distribution.FromMount{
From: ref,
}

return nil
})
Expand All @@ -783,8 +784,14 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO

var values []url.Values

if opts.Mount.ShouldMount {
values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}})
switch v := opts.Mount.(type) {
case distribution.FromMount:
values = append(values, url.Values{"from": {v.From.Name()}, "mount": {v.Digest().String()}})
case distribution.DigestMount:
values = append(values, url.Values{"mount": {v.Digest().String()}})
case nil:
default:
return nil, fmt.Errorf("Unknown mount type: %T", v)
}

u, err := bs.ub.BuildBlobUploadURL(bs.name, values...)
Expand All @@ -805,11 +812,20 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO

switch resp.StatusCode {
case http.StatusCreated:
desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest())
desc, err := bs.statter.Stat(ctx, opts.Mount.Digest())
if err != nil {
return nil, err
}
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}

switch v := opts.Mount.(type) {
case distribution.FromMount:
return nil, distribution.ErrBlobMountedFrom{ErrBlobMounted: distribution.ErrBlobMounted{Descriptor: desc}, From: v.From}
case distribution.DigestMount:
// We shouldn't expose the from.
return nil, distribution.ErrBlobMounted{Descriptor: desc}
default:
panic("Unexpected state")
}
case http.StatusAccepted:
// TODO(dmcgowan): Check for invalid UUID
uuid := resp.Header.Get("Docker-Upload-UUID")
Expand Down
11 changes: 4 additions & 7 deletions registry/client/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,12 @@ func TestBlobMount(t *testing.T) {
t.Fatalf("Expected blob writer to be nil, was %v", bw)
}

if ebm, ok := err.(distribution.ErrBlobMounted); ok {
if ebm.From.Digest() != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", ebm.From.Digest(), dgst)
}
if ebm.From.Name() != sourceRepo.Name() {
t.Fatalf("Unexpected from: %s, expected %s", ebm.From.Name(), sourceRepo)
if ebm, ok := err.(distribution.ErrBlobMountedFrom); ok {
if ebm.Descriptor.Digest != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", ebm.Descriptor.Digest, dgst)
}
} else {
t.Fatalf("Unexpected error: %v, expected an ErrBlobMounted", err)
t.Fatalf("Unexpected error: %v, expected an ErrBlobMountedFrom", err)
}
}

Expand Down
29 changes: 18 additions & 11 deletions registry/handlers/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,24 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
}
}

authType := config.Auth.Type()

if authType != "" && !strings.EqualFold(authType, "none") {
accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}

if config.AutomaticContentDiscovery {
dcontext.GetLogger(app).Warn("Not enabling automatic content discovery because auth is enabled")
}

app.accessController = accessController
dcontext.GetLogger(app).Debugf("configured %q access controller", authType)
} else if config.AutomaticContentDiscovery {
options = append(options, storage.EnableAutomaticContentDiscovery)
}

// configure storage caches
if cc, ok := config.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"]
Expand Down Expand Up @@ -306,17 +324,6 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
panic(err)
}

authType := config.Auth.Type()

if authType != "" && !strings.EqualFold(authType, "none") {
accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}
app.accessController = accessController
dcontext.GetLogger(app).Debugf("configured %q access controller", authType)
}

// configure as a pull through cache
if config.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
Expand Down
11 changes: 9 additions & 2 deletions registry/handlers/blobupload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -67,8 +68,9 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")

if mountDigest != "" && fromRepo != "" {
if mountDigest != "" {
opt, err := buh.createBlobMountOption(fromRepo, mountDigest)
// We ignore errors here because this is supposed to (always) fail open according to the spec.
if opt != nil && err == nil {
options = append(options, opt)
}
Expand All @@ -78,7 +80,8 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
upload, err := blobs.Create(buh, options...)

if err != nil {
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
var ebm distribution.ErrBlobMounted
if errors.As(err, &ebm) {
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
Expand Down Expand Up @@ -364,6 +367,10 @@ func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string
return nil, err
}

if fromRepo == "" {
return storage.WithMount(dgst), nil
}

ref, err := reference.WithName(fromRepo)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 43f2781

Please sign in to comment.