Skip to content

Commit

Permalink
Enable to forcefully specify compression type
Browse files Browse the repository at this point in the history
Signed-off-by: ktock <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed May 26, 2021
1 parent 8d5c5f1 commit ec8a782
Show file tree
Hide file tree
Showing 17 changed files with 1,066 additions and 33 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip]`: choose compression type for layer, gzip is default value

* `compression=[uncompressed,gzip]`: choose compression type for layers newly created and cached, gzip is default value
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
`$DOCKER_CONFIG` defaults to `~/.docker`.
Expand Down
115 changes: 115 additions & 0 deletions cache/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package cache

import (
"compress/gzip"
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/labels"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

func gzipLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispec.Descriptor) (*ocispec.Descriptor, error) {
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil
}

// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-gzip-from-%s", desc.Digest)
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw := gzip.NewWriter(w)
defer zw.Close()

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc, nil
}

func isGzipCompressedType(mt string) bool {
switch mt {
case
images.MediaTypeDockerSchema2LayerGzip,
images.MediaTypeDockerSchema2LayerForeignGzip,
ocispec.MediaTypeImageLayerGzip,
ocispec.MediaTypeImageLayerNonDistributableGzip:
return true
default:
return false
}
}

func convertMediaTypeToUncompress(mt string) string {
switch mt {
case images.MediaTypeDockerSchema2LayerGzip:
return images.MediaTypeDockerSchema2Layer
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerForeign
case ocispec.MediaTypeImageLayerGzip:
return ocispec.MediaTypeImageLayer
case ocispec.MediaTypeImageLayerNonDistributableGzip:
return ocispec.MediaTypeImageLayerNonDistributable
default:
return mt
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
mt += ".gzip"
} else {
mt += "+gzip"
}
return mt
}
return mt
}
5 changes: 3 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type cacheManager struct {
ManagerOpt
md *metadata.Store

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
compressG flightcontrol.Group
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down
47 changes: 46 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -46,7 +47,7 @@ type ImmutableRef interface {

Info() RefInfo
Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) (*solver.Remote, error)
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error)
}

type RefInfo struct {
Expand Down Expand Up @@ -361,6 +362,50 @@ func (sr *immutableRef) ociDesc() (ocispec.Descriptor, error) {
return desc, nil
}

func compressionVariantDigestLabel(compressionType compression.Type) string {
return "buildkit.io/compression/digest." + compressionType.String()
}

func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (content.Info, error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md)))
if err != nil {
return content.Info{}, err
}
dgstS, ok := info.Labels[compressionVariantDigestLabel(compressionType)]
if ok {
dgst, err := digest.Parse(dgstS)
if err != nil {
return content.Info{}, err
}
return cs.Info(ctx, dgst)
}
return content.Info{}, errdefs.ErrNotFound
}

func (sr *immutableRef) addCompressionBlob(ctx context.Context, dgst digest.Digest, compressionType compression.Type) error {
cs := sr.cm.ContentStore
if err := sr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: dgst.String(),
Type: "content",
}); err != nil {
return err
}
info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md)))
if err != nil {
return err
}
cachedVariantLabel := compressionVariantDigestLabel(compressionType)
if info.Labels == nil {
info.Labels = make(map[string]string)
}
info.Labels[cachedVariantLabel] = dgst.String()
if _, err := cs.Update(ctx, info, "labels."+cachedVariantLabel); err != nil {
return err
}
return nil
}

// order is from parent->child, sr will be at end of slice
func (sr *immutableRef) parentRefChain() []*immutableRef {
var count int
Expand Down
110 changes: 101 additions & 9 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"fmt"
"net/url"
"strings"
"sync"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/reference"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
Expand All @@ -27,7 +31,7 @@ type Unlazier interface {

// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation.
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) (*solver.Remote, error) {
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
Expand All @@ -39,12 +43,17 @@ func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, comp
return nil, err
}

mprovider := &lazyMultiProvider{mprovider: contentutil.NewMultiProvider(nil)}
chain := sr.parentRefChain()
mproviderBase := contentutil.NewMultiProvider(nil)
mprovider := &lazyMultiProvider{mprovider: mproviderBase}
remote := &solver.Remote{
Provider: mprovider,
Provider: mprovider,
Descriptors: make([]ocispec.Descriptor, len(chain)),
}

for _, ref := range sr.parentRefChain() {
var mu sync.Mutex
eg, egctx := errgroup.WithContext(ctx)
for i, ref := range chain {
i, ref := i, ref
desc, err := ref.ociDesc()
if err != nil {
return nil, err
Expand Down Expand Up @@ -104,15 +113,98 @@ func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, comp
}
}

remote.Descriptors = append(remote.Descriptors, desc)
mprovider.Add(lazyRefProvider{
layerP := lazyRefProvider{
ref: ref,
desc: desc,
dh: sr.descHandlers[desc.Digest],
session: s,
})
}
mu.Lock()
mprovider.Add(layerP)
mu.Unlock()
if forceCompression {
// ensure that this blob is compressed as expected
name := desc.Digest.String() + compressionType.String()
eg.Go(func() error {
newDescI, err := sr.cm.compressG.Do(egctx, name, func(ctx context.Context) (_ interface{}, rerr error) {
return ensureCompression(egctx, ref, desc, compressionType, layerP)
})
if err != nil {
return err
}
newDesc, ok := newDescI.(*ocispec.Descriptor)
if !ok {
return fmt.Errorf("unexpected compression result for %v", name)
}
mu.Lock()
remote.Descriptors[i] = *newDesc
if desc.Digest != newDesc.Digest {
mproviderBase.Add(newDesc.Digest, sr.cm.ContentStore)
}
mu.Unlock()
return nil
})
} else {
mu.Lock()
remote.Descriptors[i] = desc
mu.Unlock()
}
}

return remote, eg.Wait()
}

func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispec.Descriptor, compressionType compression.Type, p Unlazier) (*ocispec.Descriptor, error) {
// Resolve converters
var (
layerConvertFunc converter.ConvertFunc
mediaTypeConvertFunc func(string) string
)
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(desc.MediaType) || uncompress.IsUncompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return &desc, nil
}
layerConvertFunc = uncompress.LayerConvertFunc
mediaTypeConvertFunc = convertMediaTypeToUncompress
case compression.Gzip:
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return &desc, nil
}
layerConvertFunc = gzipLayerConvertFunc
mediaTypeConvertFunc = convertMediaTypeToGzip
default:
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}

// First, lookup local content store
if info, err := ref.getCompressionBlob(ctx, compressionType); err == nil {
// Fast path: converted blob is tracked in the local store
newDesc := desc
newDesc.MediaType = mediaTypeConvertFunc(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc, nil
} else if !errdefs.IsNotFound(err) {
return nil, err
}

// Convert layer compression type
if err := p.Unlazy(ctx); err != nil {
return nil, err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return nil, err
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
return nil, err
}
return remote, nil
return newDesc, nil
}

type lazyMultiProvider struct {
Expand Down
Loading

0 comments on commit ec8a782

Please sign in to comment.