Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not re-tag non-distributable blob descriptors #2561

Merged
merged 5 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression
sr.queueBlob(desc.Digest)
sr.queueMediaType(desc.MediaType)
sr.queueBlobSize(desc.Size)
sr.appendURLs(desc.URLs)
if err := sr.commitMetadata(); err != nil {
return err
}
Expand Down Expand Up @@ -420,7 +421,7 @@ func isTypeWindows(sr *immutableRef) bool {
// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers)
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions cache/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package config

import "github.com/moby/buildkit/util/compression"

type RefConfig struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case that required a separate packages for this definition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import cycle from cache->solver->cache

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's figure this out later. It's more about what the dependency between solver and cache pkg should be.

Compression compression.Config
PreferNonDistributable bool
}
1 change: 1 addition & 0 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,
rec.queueBlobOnly(blobOnly)
rec.queueMediaType(desc.MediaType)
rec.queueBlobSize(desc.Size)
rec.appendURLs(desc.URLs)
rec.queueCommitted(true)

if err := rec.commitMetadata(); err != nil {
Expand Down
80 changes: 74 additions & 6 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/containerd/continuity/fs/fstest"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session"
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestMergeBlobchainID(t *testing.T) {
mergeRef, err := cm.Merge(ctx, mergeInputs, nil)
require.NoError(t, err)

_, err = mergeRef.GetRemotes(ctx, true, compression.New(compression.Default), false, nil)
_, err = mergeRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compression.Default)}, false, nil)
require.NoError(t, err)

// verify the merge blobchain ID isn't just set to one of the inputs (regression test)
Expand Down Expand Up @@ -1340,9 +1341,9 @@ func TestGetRemotes(t *testing.T) {
ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := compression.New(compressionType).SetForce(true)
refCfg := config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
remotes, err := ir.GetRemotes(egctx, true, refCfg, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
Expand Down Expand Up @@ -1429,15 +1430,15 @@ func TestGetRemotes(t *testing.T) {
require.True(t, ok, ir.ID())
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := compression.New(compressionType)
refCfg := config.RefConfig{Compression: compression.New(compressionType)}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, false, compressionopt, true, nil)
remotes, err := ir.GetRemotes(egctx, false, refCfg, true, nil)
require.NoError(t, err)
require.True(t, len(remotes) > 0, "for %s : %d", compressionType, len(remotes))
gotMain, gotVariants := remotes[0], remotes[1:]

// Check the main blob is compatible with all == false
mainOnly, err := ir.GetRemotes(egctx, false, compressionopt, false, nil)
mainOnly, err := ir.GetRemotes(egctx, false, refCfg, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(mainOnly))
mainRemote := mainOnly[0]
Expand Down Expand Up @@ -1508,6 +1509,73 @@ func checkVariantsCoverage(ctx context.Context, t *testing.T, variants idxToVari
require.Equal(t, 0, len(got))
}

// Make sure that media type and urls are persisted for non-distributable blobs.
func TestNondistributableBlobs(t *testing.T) {
t.Parallel()

ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")

tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)

co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)
defer cleanup()

cm := co.manager

ctx, done, err := leaseutil.WithLease(ctx, co.lm, leaseutil.MakeTemporary)
require.NoError(t, err)
defer done(context.TODO())

contentBuffer := contentutil.NewBuffer()
descHandlers := DescHandlers(map[digest.Digest]*DescHandler{})

data, desc, err := mapToBlob(map[string]string{"foo": "bar"}, false)
require.NoError(t, err)

// Pretend like this is non-distributable
desc.MediaType = ocispecs.MediaTypeImageLayerNonDistributable
desc.URLs = []string{"https://buildkit.moby.dev/foo"}

cw, err := contentBuffer.Writer(ctx)
require.NoError(t, err)
_, err = cw.Write(data)
require.NoError(t, err)
err = cw.Commit(ctx, 0, cw.Digest())
require.NoError(t, err)

descHandlers[desc.Digest] = &DescHandler{
Provider: func(_ session.Group) content.Provider { return contentBuffer },
}

ref, err := cm.GetByBlob(ctx, desc, nil, descHandlers)
require.NoError(t, err)

remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: true}, false, nil)
require.NoError(t, err)

desc2 := remotes[0].Descriptors[0]

require.Equal(t, desc.MediaType, desc2.MediaType)
require.Equal(t, desc.URLs, desc2.URLs)

remotes, err = ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: false}, false, nil)
require.NoError(t, err)

desc2 = remotes[0].Descriptors[0]

require.Equal(t, ocispecs.MediaTypeImageLayer, desc2.MediaType)
require.Len(t, desc2.URLs, 0)
}

func checkInfo(ctx context.Context, t *testing.T, cs content.Store, info content.Info) {
if info.Labels == nil {
return
Expand Down
45 changes: 41 additions & 4 deletions cache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const keyMediaType = "cache.mediatype"
const keyImageRefs = "cache.imageRefs"
const keyDeleted = "cache.deleted"
const keyBlobSize = "cache.blobsize" // the packed blob size as specified in the oci descriptor
const keyURLs = "cache.layer.urls"

// Indexes
const blobchainIndex = "blobchainid:"
Expand Down Expand Up @@ -281,6 +282,17 @@ func (md *cacheMetadata) queueBlob(str digest.Digest) error {
return md.queueValue(keyBlob, str, "")
}

func (md *cacheMetadata) appendURLs(urls []string) error {
if len(urls) == 0 {
return nil
}
return md.appendStringSlice(keyURLs, urls...)
}

func (md *cacheMetadata) getURLs() []string {
return md.GetStringSlice(keyURLs)
}

func (md *cacheMetadata) getBlob() digest.Digest {
return digest.Digest(md.GetString(keyBlob))
}
Expand Down Expand Up @@ -468,6 +480,18 @@ func (md *cacheMetadata) GetString(key string) string {
return str
}

func (md *cacheMetadata) GetStringSlice(key string) []string {
v := md.si.Get(key)
if v == nil {
return nil
}
var val []string
if err := v.Unmarshal(&val); err != nil {
return nil
}
return val
}

func (md *cacheMetadata) setTime(key string, value time.Time, index string) error {
return md.setValue(key, value.UnixNano(), index)
}
Expand Down Expand Up @@ -512,20 +536,33 @@ func (md *cacheMetadata) getInt64(key string) (int64, bool) {
return i, true
}

func (md *cacheMetadata) appendStringSlice(key string, value string) error {
func (md *cacheMetadata) appendStringSlice(key string, values ...string) error {
return md.si.GetAndSetValue(key, func(v *metadata.Value) (*metadata.Value, error) {
var slice []string
if v != nil {
if err := v.Unmarshal(&slice); err != nil {
return nil, err
}
}

idx := make(map[string]struct{}, len(values))
for _, v := range values {
idx[v] = struct{}{}
}

for _, existing := range slice {
if existing == value {
return nil, metadata.ErrSkipSetValue
if _, ok := idx[existing]; ok {
delete(idx, existing)
}
}
slice = append(slice, value)

if len(idx) == 0 {
return nil, metadata.ErrSkipSetValue
}

for value := range idx {
slice = append(slice, value)
}
v, err := metadata.NewValue(slice)
if err != nil {
return nil, err
Expand Down
65 changes: 61 additions & 4 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
"github.com/hashicorp/go-multierror"
"github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
Expand Down Expand Up @@ -47,7 +49,7 @@ type ImmutableRef interface {
Finalize(context.Context) error

Extract(ctx context.Context, s session.Group) error // +progress
GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt compression.Config, all bool, s session.Group) ([]*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, cfg config.RefConfig, all bool, s session.Group) ([]*solver.Remote, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktock @sipsma Any ideas how to improve this? Still not very clean. And a bit of an issue with RefConfig is that keys can be uninitialized (eg. Compression doesn't have a clear zero value atm). We can do this refactoring later in a follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe functional options?

LayerChain() RefList
}

Expand Down Expand Up @@ -589,7 +591,48 @@ func (sr *immutableRef) Clone() ImmutableRef {
return sr.clone()
}

func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs.Descriptor, error) {
// layertoDistributable changes the passed in media type to the "distributable" version of the media type.
func layerToDistributable(mt string) string {
if !images.IsNonDistributable(mt) {
// Layer is already a distributable media type (or this is not even a layer).
// No conversion needed
return mt
}

switch mt {
case ocispecs.MediaTypeImageLayerNonDistributable:
return ocispecs.MediaTypeImageLayer
case ocispecs.MediaTypeImageLayerNonDistributableGzip:
return ocispecs.MediaTypeImageLayerGzip
case ocispecs.MediaTypeImageLayerNonDistributableZstd:
return ocispecs.MediaTypeImageLayerZstd
case images.MediaTypeDockerSchema2LayerForeign:
return images.MediaTypeDockerSchema2Layer
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerGzip
default:
return mt
}
}

func layerToNonDistributable(mt string) string {
switch mt {
case ocispecs.MediaTypeImageLayer:
return ocispecs.MediaTypeImageLayerNonDistributable
case ocispecs.MediaTypeImageLayerGzip:
return ocispecs.MediaTypeImageLayerNonDistributableGzip
case ocispecs.MediaTypeImageLayerZstd:
return ocispecs.MediaTypeImageLayerNonDistributableZstd
case images.MediaTypeDockerSchema2Layer:
return images.MediaTypeDockerSchema2LayerForeign
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerForeignGzip
default:
return mt
}
}

func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers, preferNonDist bool) (ocispecs.Descriptor, error) {
dgst := sr.getBlob()
if dgst == "" {
return ocispecs.Descriptor{}, errors.Errorf("no blob set for cache record %s", sr.ID())
Expand All @@ -598,8 +641,21 @@ func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs
desc := ocispecs.Descriptor{
Digest: sr.getBlob(),
Size: sr.getBlobSize(),
MediaType: sr.getMediaType(),
Annotations: make(map[string]string),
MediaType: sr.getMediaType(),
}

if preferNonDist {
if urls := sr.getURLs(); len(urls) > 0 {
// Make sure the media type is the non-distributable version
// We don't want to rely on the stored media type here because it could have been stored as distributable originally.
desc.MediaType = layerToNonDistributable(desc.MediaType)
desc.URLs = urls
}
}
if len(desc.URLs) == 0 {
// If there are no URL's, there is no reason to have this be non-dsitributable
desc.MediaType = layerToDistributable(desc.MediaType)
}

if blobDesc, err := getBlobDesc(ctx, sr.cm.ContentStore, desc.Digest); err == nil {
Expand Down Expand Up @@ -748,6 +804,7 @@ func getBlobDesc(ctx context.Context, cs content.Store, dgst digest.Digest) (oci
if !ok {
return ocispecs.Descriptor{}, fmt.Errorf("no media type is stored for %q", info.Digest)
}

desc := ocispecs.Descriptor{
Digest: info.Digest,
Size: info.Size,
Expand Down Expand Up @@ -1062,7 +1119,7 @@ func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, pg pr
})
}

desc, err := sr.ociDesc(ctx, dhs)
desc, err := sr.ociDesc(ctx, dhs, true)
cpuguy83 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down
Loading