Skip to content

Commit

Permalink
feat: handle different plantform in cache remote
Browse files Browse the repository at this point in the history
We can use manifestIndex to record different remote cache on different plantforms. And conversion will use cache which matches current plantfotm.

Signed-off-by: breezeTuT <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Sep 12, 2023
1 parent fe66f09 commit af5320c
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 95 deletions.
318 changes: 223 additions & 95 deletions pkg/driver/nydus/nydus.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so
useRemoteCache := cacheRef != ""
if useRemoteCache {
logrus.Infof("remote cache image reference: %s", cacheRef)
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if _, err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if errors.Is(err, errdefs.ErrNotSupport) {
logrus.Warn("Content store does not support remote cache")
} else {
Expand All @@ -253,13 +253,15 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so

if useRemoteCache {
// Fetch the old remote cache before updating and pushing the new one to avoid conflict.
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
cacheDesc, err := d.FetchRemoteCache(ctx, provider, cacheRef)
if err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil {
cacheIndex, err := d.UpdateRemoteCache(ctx, provider, cacheRef, image, desc, cacheDesc)
if err != nil {
return nil, errors.Wrap(err, "update remote cache")
}
if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil {
if err := d.PushRemoteCache(ctx, provider, cacheRef, cacheIndex); err != nil {
return nil, errors.Wrap(err, "push remote cache")
}
}
Expand Down Expand Up @@ -433,150 +435,276 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide
}

// FetchRemoteCache fetch cache manifest from remote
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) (*ocispec.Descriptor, error) {
resolver, err := pvd.Resolver(ref)
if err != nil {
return err
return nil, err
}

rc := &containerd.RemoteContext{
Resolver: resolver,
Resolver: resolver,
PlatformMatcher: d.platformMC,
}
name, desc, err := rc.Resolver.Resolve(ctx, ref)
if err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
// Remote cache may do not exist, just return nil
return nil
return nil, nil
}
return err
return nil, err
}
fetcher, err := rc.Resolver.Fetcher(ctx, name)
if err != nil {
return err
return nil, err
}
ir, err := fetcher.Fetch(ctx, desc)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
ir, err = fetcher.Fetch(ctx, desc)
if err != nil {
return errors.Wrap(err, "try to pull remote cache")
return nil, errors.Wrap(err, "try to pull remote cache")
}
} else {
return errors.Wrap(err, "pull remote cache")
return nil, errors.Wrap(err, "pull remote cache")
}
}

bytes, err := io.ReadAll(ir)
defer ir.Close()
mBytes, err := io.ReadAll(ir)
if err != nil {
return errors.Wrap(err, "read remote cache to bytes")
}

// TODO: handle manifest list for multiple platform.
manifest := ocispec.Manifest{}
if err = json.Unmarshal(bytes, &manifest); err != nil {
return err
return nil, errors.Wrap(err, "read remote cache bytes to manifest index")
}

cs := pvd.ContentStore()
for _, layer := range manifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return errdefs.ErrNotSupport
switch desc.MediaType {
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
manifestIndex := ocispec.Index{}
if err = json.Unmarshal(mBytes, &manifestIndex); err != nil {
return nil, err
}
manifestIndexDesc, _, err := nydusutils.MarshalToDesc(manifestIndex, ocispec.MediaTypeImageIndex)
if err != nil {
return nil, errors.Wrap(err, "marshal remote cache to manifest index failed")
}
if err = content.WriteBlob(ctx, cs, ref, bytes.NewReader(mBytes), *manifestIndexDesc); err != nil {
return nil, errors.Wrap(err, "remote cache write blob failed")
}
for _, manifest := range manifestIndex.Manifests {
mDesc := ocispec.Descriptor{
MediaType: manifest.MediaType,
Digest: manifest.Digest,
Size: manifest.Size,
}
mir, err := fetcher.Fetch(ctx, mDesc)
if err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
manifestBytes, err := io.ReadAll(mir)
if err != nil {
return nil, errors.Wrap(err, "read remote cache bytes to manifest")
}
if err = content.WriteBlob(ctx, cs, ref, bytes.NewReader(manifestBytes), mDesc); err != nil {
return nil, errors.Wrap(err, "remote cache write blob failed")
}
return errors.Wrap(err, "update cache layer")
}
}
return nil
}

// PushRemoteCache update cache manifest and push to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "remote cache image config marshal failed")
}
configReader := bytes.NewReader(imageConfigBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil {
return errors.Wrap(err, "remote cache image config write blob failed")
// Get manifests which matches specified platforms and put them into lru cache
matchDescs, err := utils.GetManifests(ctx, cs, *manifestIndexDesc, d.platformMC)
if err != nil {
return nil, errors.Wrap(err, "get remote cache manifest list")
}
var targetManifests []ocispec.Manifest
for _, desc := range matchDescs {
targetManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &targetManifest, desc)
if err != nil {
return nil, errors.Wrap(err, "read remote cache manifest")
}
targetManifests = append(targetManifests, targetManifest)

}
for _, manifest := range targetManifests {
for _, layer := range manifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return nil, errdefs.ErrNotSupport
}
return nil, errors.Wrap(err, "update cache layer")
}
}
}
return manifestIndexDesc, nil
default:
return nil, errors.New("unsupported cache repo mediatype")
}
}

cs := pvd.ContentStore()
layers := []ocispec.Descriptor{}
if err = cs.Walk(ctx, func(info content.Info) error {
if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok {
layers = append(layers, ocispec.Descriptor{
MediaType: nydusutils.MediaTypeNydusBlob,
Digest: info.Digest,
Size: info.Size,
Annotations: info.Labels,
})
}
return nil
}); err != nil {
return errors.Wrap(err, "get remote cache layers failed")
}

manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageManifest,
Config: *imageConfigDesc,
Layers: layers,
// PushRemoteCache push remotes to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string, cacheIndex *ocispec.Index) error {
for _, manifest := range cacheIndex.Manifests {
if err := pvd.Push(ctx, manifest, ref); err != nil {
return err
}
}
manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest)
manifestIndexDesc, manifestIndexBytes, err := nydusutils.MarshalToDesc(*cacheIndex, ocispec.MediaTypeImageIndex)
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
manifestReader := bytes.NewReader(manifestBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil {
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, bytes.NewReader(manifestIndexBytes), *manifestIndexDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}

if err = pvd.Push(ctx, *manifestDesc, ref); err != nil {
if err = pvd.Push(ctx, *manifestIndexDesc, ref); err != nil {
return err
}
return nil
}

// UpdateRemoteCache update cache layer from upper to lower
func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error {
func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, ref string, orgDesc, newDesc, cacheDesc *ocispec.Descriptor) (*ocispec.Index, error) {
cs := provider.ContentStore()
cacheLayers := map[*platforms.Platform][]ocispec.Descriptor{}

orgManifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc)
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "read original manifest json")
return nil, errors.Wrap(err, "remote cache image config marshal failed")
}

newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc)
if err != nil {
return errors.Wrap(err, "read new manifest json")
if err = content.WriteBlob(ctx, cs, ref, bytes.NewReader(imageConfigBytes), *imageConfigDesc); err != nil {
return nil, errors.Wrap(err, "remote cache image config write blob failed")
}
newLayers := newManifest.Layers[:len(newManifest.Layers)-1]

// Update <LayerAnnotationNydusSourceDigest> label for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
switch orgDesc.MediaType {
case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
orgManifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, cs, &orgManifest, *orgDesc)
if err != nil {
return nil, errors.Wrap(err, "read original manifest json")
}

newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, *newDesc)
if err != nil {
return nil, errors.Wrap(err, "read new manifest json")
}
newLayers := newManifest.Layers[:len(newManifest.Layers)-1]

// Update <LayerAnnotationNydusSourceDigest> label for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
}

// Update cache to lru from upper to lower
targetLayers := []ocispec.Descriptor{}
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
targetLayers = append(targetLayers, layer)
}
platform, err := images.Platforms(ctx, cs, *orgDesc)
if err != nil {
return nil, err
}
cacheLayers[&platform[0]] = targetLayers

case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
orgManifests, err := utils.GetManifests(ctx, cs, *orgDesc, d.platformMC)
if err != nil {
return nil, errors.Wrap(err, "get original manifest list")
}
newManifests, err := utils.GetManifests(ctx, cs, *newDesc, d.platformMC)
if err != nil {
return nil, errors.Wrap(err, "get new manifest list")
}
for _, newManifestDesc := range newManifests {
newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, newManifestDesc)
if err != nil {
return nil, errors.Wrap(err, "read new manifest json")
}

// find original manifest matches converted manifest's platform
matcher := platforms.NewMatcher(*newManifestDesc.Platform)
orgManifest := ocispec.Manifest{}
for _, orgManifestDesc := range orgManifests {
if matcher.Match(*orgManifestDesc.Platform) {
_, err = utils.ReadJSON(ctx, cs, &orgManifest, orgManifestDesc)
if err != nil {
return nil, errors.Wrap(err, "read new manifest json")
}
}
}
newLayers := newManifest.Layers[:len(newManifest.Layers)-1]
// Update <LayerAnnotationNydusSourceDigest> label for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
}
// Update cache to lru from upper to lower
targetLayers := []ocispec.Descriptor{}
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
targetLayers = append(targetLayers, layer)
}
cacheLayers[newManifestDesc.Platform] = targetLayers
}
}

// Update cache to lru from upper to lower
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
return errors.Wrap(err, "update cache layer")
cacheIndex := ocispec.Index{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageIndex,
Manifests: []ocispec.Descriptor{},
}
if cacheDesc != nil {
_, err = utils.ReadJSON(ctx, cs, &cacheIndex, *cacheDesc)
if err != nil {
return nil, errors.Wrap(err, "read manifest index")
}
for idx, maniDesc := range cacheIndex.Manifests {
matcher := platforms.NewMatcher(*maniDesc.Platform)
for platform, layers := range cacheLayers {
if matcher.Match(*platform) {
// append new cache layers to existed cache manifest
var manifest ocispec.Manifest
_, err = utils.ReadJSON(ctx, cs, &manifest, maniDesc)
if err != nil {
return nil, errors.Wrap(err, "read manifest")
}
// FIXME: avoid appending duplicated layers
manifest.Layers = nydusutils.AppendLayers(manifest.Layers, layers)
newManiDesc, err := utils.WriteJSON(ctx, cs, manifest, maniDesc, "", nil)
if err != nil {
return nil, errors.Wrap(err, "write manifest")
}
cacheIndex.Manifests[idx] = *newManiDesc
delete(cacheLayers, platform)
}
}
}
}
return nil

// append new cache layers to new cache manifest
for platform, layers := range cacheLayers {
manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageManifest,
Config: *imageConfigDesc,
Layers: layers,
}
manifestDesc, err := utils.WriteJSON(ctx, cs, manifest, ocispec.Descriptor{}, "", nil)
if err != nil {
return nil, errors.Wrap(err, "write manifest")
}
cacheIndex.Manifests = append(cacheIndex.Manifests, ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: manifestDesc.Digest,
Size: manifestDesc.Size,
Platform: platform,
})
}
return &cacheIndex, nil
}
Loading

0 comments on commit af5320c

Please sign in to comment.