Skip to content

Commit

Permalink
feat: handle different plantform in cache remote
Browse files Browse the repository at this point in the history
We can use manifestIndex to record different remote cache on different plantforms. And conversion will use cache which matches current plantfotm.

Signed-off-by: YuQiang <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Sep 8, 2023
1 parent fe66f09 commit 9368e2c
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 30 deletions.
157 changes: 127 additions & 30 deletions pkg/driver/nydus/nydus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"os"
"os/exec"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -234,7 +235,7 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so
useRemoteCache := cacheRef != ""
if useRemoteCache {
logrus.Infof("remote cache image reference: %s", cacheRef)
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if _, err := d.FetchRemoteCache(ctx, provider, cacheRef, image); err != nil {
if errors.Is(err, errdefs.ErrNotSupport) {
logrus.Warn("Content store does not support remote cache")
} else {
Expand All @@ -253,13 +254,14 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so

if useRemoteCache {
// Fetch the old remote cache before updating and pushing the new one to avoid conflict.
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
cacheDesc, err := d.FetchRemoteCache(ctx, provider, cacheRef, image)
if err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil {
return nil, errors.Wrap(err, "update remote cache")
}
if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil {
if err := d.PushRemoteCache(ctx, provider, cacheRef, image, cacheDesc); err != nil {
return nil, errors.Wrap(err, "push remote cache")
}
}
Expand Down Expand Up @@ -433,76 +435,128 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide
}

// FetchRemoteCache fetch cache manifest from remote
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string, sourceDesc *ocispec.Descriptor) (*ocispec.Descriptor, error) {
resolver, err := pvd.Resolver(ref)
if err != nil {
return err
return nil, err
}

rc := &containerd.RemoteContext{
Resolver: resolver,
Resolver: resolver,
PlatformMatcher: d.platformMC,
}
name, desc, err := rc.Resolver.Resolve(ctx, ref)
if err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
// Remote cache may do not exist, just return nil
return nil
return nil, nil
}
return err
return nil, err
}
fetcher, err := rc.Resolver.Fetcher(ctx, name)
if err != nil {
return err
return nil, err
}
ir, err := fetcher.Fetch(ctx, desc)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
ir, err = fetcher.Fetch(ctx, desc)
if err != nil {
return errors.Wrap(err, "try to pull remote cache")
return nil, errors.Wrap(err, "try to pull remote cache")
}
} else {
return errors.Wrap(err, "pull remote cache")
return nil, errors.Wrap(err, "pull remote cache")
}
}

bytes, err := io.ReadAll(ir)
defer ir.Close()
mBytes, err := io.ReadAll(ir)
if err != nil {
return errors.Wrap(err, "read remote cache to bytes")
return nil, errors.Wrap(err, "read remote cache bytes to manifest index")
}

// TODO: handle manifest list for multiple platform.
manifest := ocispec.Manifest{}
if err = json.Unmarshal(bytes, &manifest); err != nil {
return err
cs := pvd.ContentStore()
var cacheDesc ocispec.Descriptor
var targetManifest ocispec.Manifest

// Try to unmarshal to cache manifest, if manifest MediaType is ManifestList or
// ManifestIndex, re-unmarshal to manifest index and get corresponding cache manifest
// otherwise, just set target manifest desc to cacheDesc.
if err = json.Unmarshal(mBytes, &targetManifest); err != nil {
return nil, err
}
switch desc.MediaType {
case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
cacheDesc = desc
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
manifestIndex := ocispec.Index{}
if err = json.Unmarshal(mBytes, &manifestIndex); err != nil {
return nil, err
}
manifestIndexDesc, _, err := nydusutils.MarshalToDesc(manifestIndex, ocispec.MediaTypeImageIndex)
if err != nil {
return nil, errors.Wrap(err, "marshal remote cache to manifest index failed")
}
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(mBytes), *manifestIndexDesc); err != nil {
return nil, errors.Wrap(err, "remote cache write blob failed")
}
for _, manifest := range manifestIndex.Manifests {
mDesc := ocispec.Descriptor{
MediaType: manifest.MediaType,
Digest: manifest.Digest,
Size: manifest.Size,
}
mir, err := fetcher.Fetch(ctx, mDesc)
if err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
manifestBytes, err := io.ReadAll(mir)
if err != nil {
return nil, errors.Wrap(err, "read remote cache bytes to manifest")
}
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(manifestBytes), mDesc); err != nil {
return nil, errors.Wrap(err, "remote cache write blob failed")
}
}

cs := pvd.ContentStore()
for _, layer := range manifest.Layers {
matchDescs, err := utils.GetManifests(ctx, pvd.ContentStore(), *manifestIndexDesc, d.platformMC)
if err != nil {
return nil, errors.Wrap(err, "get remote cache manifest list")
}
sourceDesc.Platform = utils.GetPlatform(ctx, pvd.ContentStore(), sourceDesc, d.platformMC)
targetManifest = ocispec.Manifest{}
for _, desc := range matchDescs {
if reflect.DeepEqual(desc.Platform, sourceDesc.Platform) {
_, err = utils.ReadJSON(ctx, cs, &targetManifest, desc)
if err != nil {
return nil, errors.Wrap(err, "read remote cache manifest")
}
}
}
cacheDesc = *manifestIndexDesc
}
for _, layer := range targetManifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return errdefs.ErrNotSupport
return nil, errdefs.ErrNotSupport
}
return errors.Wrap(err, "update cache layer")
return nil, errors.Wrap(err, "update cache layer")
}
}
return nil
return &cacheDesc, nil
}

// PushRemoteCache update cache manifest and push to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string, sourceDesc, cacheIndexDesc *ocispec.Descriptor) error {
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "remote cache image config marshal failed")
}
configReader := bytes.NewReader(imageConfigBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil {
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(imageConfigBytes), *imageConfigDesc); err != nil {
return errors.Wrap(err, "remote cache image config write blob failed")
}

Expand Down Expand Up @@ -534,14 +588,57 @@ func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider,
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
manifestReader := bytes.NewReader(manifestBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil {

// Platform may still be nil if we didn't set it before.

manifestDesc.Platform = utils.GetPlatform(ctx, pvd.ContentStore(), sourceDesc, d.platformMC)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(manifestBytes), *manifestDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}

if err = pvd.Push(ctx, *manifestDesc, ref); err != nil {
return err
}

if d.platformMC != platforms.All {
matchDescs := []ocispec.Descriptor{}
if cacheIndexDesc != nil {
matchDescs, err = utils.GetManifests(ctx, pvd.ContentStore(), *cacheIndexDesc, nil)
if err != nil {
return errors.Wrap(err, "get remote cache manifest list")
}
}
manifestList := []ocispec.Descriptor{}
manifestList = append(manifestList, *manifestDesc)
for _, desc := range matchDescs {
if !reflect.DeepEqual(desc.Platform, sourceDesc.Platform) {
// Add manifest to list of cache manifest and push each manifest to remote
// except current platform manifest because the newer has been pushed before.
manifestList = append(manifestList, desc)
if err = pvd.Push(ctx, desc, ref); err != nil {
return err
}
}
}

manifestIndex := ocispec.Index{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageIndex,
Manifests: manifestList,
}

manifestIndexDesc, manifestIndexBytes, err := nydusutils.MarshalToDesc(manifestIndex, ocispec.MediaTypeImageIndex)
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(manifestIndexBytes), *manifestIndexDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}
if err = pvd.Push(ctx, *manifestIndexDesc, ref); err != nil {
return err
}
}
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ func UpdateLayerDiffID(ctx context.Context, cs content.Store, image ocispec.Desc

return nil
}

func GetPlatform(ctx context.Context, provider content.Provider, desc *ocispec.Descriptor, platformMC platforms.MatchComparer) *platforms.Platform {
if desc.Platform != nil {
return desc.Platform
}
platforms, err := images.Platforms(ctx, provider, *desc)
if err != nil {
return nil
}
return &platforms[0]
}

0 comments on commit 9368e2c

Please sign in to comment.