Skip to content

Commit

Permalink
Enable to forcefully specify compression type
Browse files Browse the repository at this point in the history
Signed-off-by: ktock <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Jul 2, 2021
1 parent 8d5c5f1 commit 7e98e60
Show file tree
Hide file tree
Showing 18 changed files with 1,086 additions and 33 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip]`: choose compression type for layer, gzip is default value

* `compression=[uncompressed,gzip]`: choose compression type for layers newly created and cached, gzip is default value
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
`$DOCKER_CONFIG` defaults to `~/.docker`.
Expand Down
60 changes: 56 additions & 4 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
// computeBlobChain ensures every ref in a parent chain has an associated blob in the content store. If
// a blob is missing and createIfNeeded is true, then the blob will be created, otherwise ErrNoBlobs will
// be returned. Caller must hold a lease when calling this function.
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) error {
// If forceCompression is specified but the blob of compressionType doesn't exist, this function creates it.
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for computeBlobChain")
}
Expand All @@ -39,22 +40,31 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
ctx = winlayers.UseWindowsLayerMode(ctx)
}

return computeBlobChain(ctx, sr, createIfNeeded, compressionType, s)
return computeBlobChain(ctx, sr, createIfNeeded, compressionType, forceCompression, s)
}

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, s session.Group) error {
func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)
var currentDescr ocispec.Descriptor
if sr.parent != nil {
eg.Go(func() error {
return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, s)
return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, forceCompression, s)
})
}
eg.Go(func() error {
dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (interface{}, error) {
refInfo := sr.Info()
if refInfo.Blob != "" {
if forceCompression {
desc, err := sr.ociDesc()
if err != nil {
return nil, err
}
if err := ensureCompression(ctx, sr, desc, compressionType, s); err != nil {
return nil, err
}
}
return nil, nil
} else if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
Expand Down Expand Up @@ -127,6 +137,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return nil, err
}
}

return descr, nil

})
Expand Down Expand Up @@ -224,3 +240,39 @@ func isTypeWindows(sr *immutableRef) bool {
}
return false
}

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispec.Descriptor, compressionType compression.Type, s session.Group) error {
// Resolve converters
layerConvertFunc, _, err := getConverters(desc, compressionType)
if err != nil {
return err
} else if layerConvertFunc == nil {
return nil // no need to convert
}

// First, lookup local content store
if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil {
return nil // found the compression variant. no need to convert.
}

// Convert layer compression type
if err := (lazyRefProvider{
ref: ref,
desc: desc,
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return err
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
return err
}
return nil
}
138 changes: 138 additions & 0 deletions cache/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package cache

import (
"compress/gzip"
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/labels"
"github.com/moby/buildkit/util/compression"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// getConverters returns converter functions according to the specified compression type.
// If no conversoin is needed, this returns nil without error.
func getConverters(desc ocispec.Descriptor, compressionType compression.Type) (converter.ConvertFunc, func(string) string, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(desc.MediaType) || uncompress.IsUncompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
}
return uncompress.LayerConvertFunc, convertMediaTypeToUncompress, nil
case compression.Gzip:
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
}
return gzipLayerConvertFunc, convertMediaTypeToGzip, nil
default:
return nil, nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
}

func gzipLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispec.Descriptor) (*ocispec.Descriptor, error) {
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil
}

// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-gzip-from-%s", desc.Digest)
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw := gzip.NewWriter(w)
defer zw.Close()

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc, nil
}

func isGzipCompressedType(mt string) bool {
switch mt {
case
images.MediaTypeDockerSchema2LayerGzip,
images.MediaTypeDockerSchema2LayerForeignGzip,
ocispec.MediaTypeImageLayerGzip,
ocispec.MediaTypeImageLayerNonDistributableGzip:
return true
default:
return false
}
}

func convertMediaTypeToUncompress(mt string) string {
switch mt {
case images.MediaTypeDockerSchema2LayerGzip:
return images.MediaTypeDockerSchema2Layer
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerForeign
case ocispec.MediaTypeImageLayerGzip:
return ocispec.MediaTypeImageLayer
case ocispec.MediaTypeImageLayerNonDistributableGzip:
return ocispec.MediaTypeImageLayerNonDistributable
default:
return mt
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
mt += ".gzip"
} else {
mt += "+gzip"
}
return mt
}
return mt
}
5 changes: 3 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type cacheManager struct {
ManagerOpt
md *metadata.Store

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
compressG flightcontrol.Group
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down
59 changes: 58 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -46,7 +47,7 @@ type ImmutableRef interface {

Info() RefInfo
Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, s session.Group) (*solver.Remote, error)
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error)
}

type RefInfo struct {
Expand Down Expand Up @@ -198,6 +199,16 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
if err == nil {
usage.Size += info.Size
}
for k, v := range info.Labels {
// accumulate size of compression variant blobs
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if cdgst, err := digest.Parse(v); err == nil {
if info, err := cr.cm.ContentStore.Info(ctx, cdgst); err == nil {
usage.Size += info.Size
}
}
}
}
}
cr.mu.Lock()
setSize(cr.md, usage.Size)
Expand Down Expand Up @@ -361,6 +372,52 @@ func (sr *immutableRef) ociDesc() (ocispec.Descriptor, error) {
return desc, nil
}

const compressionVariantDigestLabelPrefix = "buildkit.io/compression/digest."

func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String()
}

func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (content.Info, error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md)))
if err != nil {
return content.Info{}, err
}
dgstS, ok := info.Labels[compressionVariantDigestLabel(compressionType)]
if ok {
dgst, err := digest.Parse(dgstS)
if err != nil {
return content.Info{}, err
}
return cs.Info(ctx, dgst)
}
return content.Info{}, errdefs.ErrNotFound
}

func (sr *immutableRef) addCompressionBlob(ctx context.Context, dgst digest.Digest, compressionType compression.Type) error {
cs := sr.cm.ContentStore
if err := sr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{
ID: dgst.String(),
Type: "content",
}); err != nil {
return err
}
info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md)))
if err != nil {
return err
}
if info.Labels == nil {
info.Labels = make(map[string]string)
}
cachedVariantLabel := compressionVariantDigestLabel(compressionType)
info.Labels[cachedVariantLabel] = dgst.String()
if _, err := cs.Update(ctx, info, "labels."+cachedVariantLabel); err != nil {
return err
}
return nil
}

// order is from parent->child, sr will be at end of slice
func (sr *immutableRef) parentRefChain() []*immutableRef {
var count int
Expand Down
Loading

0 comments on commit 7e98e60

Please sign in to comment.