Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy progress + Progress Groups #2513

Merged
merged 5 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 188 additions & 127 deletions api/services/control/control.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/services/control/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message Vertex {
google.protobuf.Timestamp started = 5 [(gogoproto.stdtime) = true ];
google.protobuf.Timestamp completed = 6 [(gogoproto.stdtime) = true ];
string error = 7; // typed errors?
pb.ProgressGroup progressGroup = 8;
}

message VertexStatus {
Expand Down
4 changes: 2 additions & 2 deletions cache/contenthash/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func TestPersistence(t *testing.T) {
err = ref.Release(context.TODO())
require.NoError(t, err)

ref, err = cm.Get(context.TODO(), id)
ref, err = cm.Get(context.TODO(), id, nil)
require.NoError(t, err)

dgst, err = Checksum(context.TODO(), ref, "foo", ChecksumOpts{FollowLinks: true}, nil)
Expand All @@ -1162,7 +1162,7 @@ func TestPersistence(t *testing.T) {
defer closeBolt()
defer cm.Close()

ref, err = cm.Get(context.TODO(), id)
ref, err = cm.Get(context.TODO(), id, nil)
require.NoError(t, err)

dgst, err = Checksum(context.TODO(), ref, "foo", ChecksumOpts{FollowLinks: true}, nil)
Expand Down
62 changes: 32 additions & 30 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -51,13 +52,13 @@ type Accessor interface {
MetadataStore

GetByBlob(ctx context.Context, desc ocispecs.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error)
Get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)

New(ctx context.Context, parent ImmutableRef, s session.Group, opts ...RefOption) (MutableRef, error)
GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase?
IdentityMapping() *idtools.IdentityMapping
Merge(ctx context.Context, parents []ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Diff(ctx context.Context, lower, upper ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
}

type Controller interface {
Expand Down Expand Up @@ -134,7 +135,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,

var p *immutableRef
if parent != nil {
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed, descHandlers)
p2, err := cm.Get(ctx, parent.ID(), nil, NoUpdateLastUsed, descHandlers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,7 +170,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,
}

for _, si := range sis {
ref, err := cm.get(ctx, si.ID(), opts...)
ref, err := cm.get(ctx, si.ID(), nil, opts...)
if err != nil {
if errors.As(err, &NeedsRemoteProviderError{}) {
// This shouldn't happen and indicates that blobchain IDs are being set incorrectly,
Expand Down Expand Up @@ -199,7 +200,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,

var link *immutableRef
for _, si := range sis {
ref, err := cm.get(ctx, si.ID(), opts...)
ref, err := cm.get(ctx, si.ID(), nil, opts...)
// if the error was NotFound or NeedsRemoteProvider, we can't re-use the snapshot from the blob so just skip it
if err != nil && !IsNotFound(err) && !errors.As(err, &NeedsRemoteProviderError{}) {
return nil, errors.Wrapf(err, "failed to get record %s by chainid", si.ID())
Expand Down Expand Up @@ -290,7 +291,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,

cm.records[id] = rec

return rec.ref(true, descHandlers), nil
return rec.ref(true, descHandlers, nil), nil
}

// init loads all snapshots from metadata state and tries to load the records
Expand Down Expand Up @@ -324,14 +325,14 @@ func (cm *cacheManager) Close() error {
}

// Get returns an immutable snapshot reference for ID
func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) {
func (cm *cacheManager) Get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.get(ctx, id, opts...)
return cm.get(ctx, id, pg, opts...)
}

// get requires manager lock to be taken
func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (*immutableRef, error) {
func (cm *cacheManager) get(ctx context.Context, id string, pg progress.Controller, opts ...RefOption) (*immutableRef, error) {
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
Expand All @@ -353,12 +354,12 @@ func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
return rec.equalImmutable.ref(triggerUpdate, descHandlers), nil
return rec.equalImmutable.ref(triggerUpdate, descHandlers, pg), nil
}
return rec.mref(triggerUpdate, descHandlers).commit(ctx)
}

return rec.ref(triggerUpdate, descHandlers), nil
return rec.ref(triggerUpdate, descHandlers, pg), nil
}

// getRecord returns record for id. Requires manager lock.
Expand Down Expand Up @@ -486,22 +487,22 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt

func (cm *cacheManager) parentsOf(ctx context.Context, md *cacheMetadata, opts ...RefOption) (ps parentRefs, rerr error) {
if parentID := md.getParent(); parentID != "" {
p, err := cm.get(ctx, parentID, append(opts, NoUpdateLastUsed))
p, err := cm.get(ctx, parentID, nil, append(opts, NoUpdateLastUsed))
if err != nil {
return ps, err
}
ps.layerParent = p
return ps, nil
}
for _, parentID := range md.getMergeParents() {
p, err := cm.get(ctx, parentID, append(opts, NoUpdateLastUsed))
p, err := cm.get(ctx, parentID, nil, append(opts, NoUpdateLastUsed))
if err != nil {
return ps, err
}
ps.mergeParents = append(ps.mergeParents, p)
}
if lowerParentID := md.getLowerDiffParent(); lowerParentID != "" {
p, err := cm.get(ctx, lowerParentID, append(opts, NoUpdateLastUsed))
p, err := cm.get(ctx, lowerParentID, nil, append(opts, NoUpdateLastUsed))
if err != nil {
return ps, err
}
Expand All @@ -511,7 +512,7 @@ func (cm *cacheManager) parentsOf(ctx context.Context, md *cacheMetadata, opts .
ps.diffParents.lower = p
}
if upperParentID := md.getUpperDiffParent(); upperParentID != "" {
p, err := cm.get(ctx, upperParentID, append(opts, NoUpdateLastUsed))
p, err := cm.get(ctx, upperParentID, nil, append(opts, NoUpdateLastUsed))
if err != nil {
return ps, err
}
Expand All @@ -532,7 +533,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Gr
if _, ok := s.(*immutableRef); ok {
parent = s.Clone().(*immutableRef)
} else {
p, err := cm.Get(ctx, s.ID(), append(opts, NoUpdateLastUsed)...)
p, err := cm.Get(ctx, s.ID(), nil, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -661,7 +662,7 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOp
return rec.mref(true, descHandlersOf(opts...)), nil
}

func (cm *cacheManager) Merge(ctx context.Context, inputParents []ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
func (cm *cacheManager) Merge(ctx context.Context, inputParents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ir ImmutableRef, rerr error) {
// TODO:(sipsma) optimize merge further by
// * Removing repeated occurrences of input layers (only leaving the uppermost)
// * Reusing existing merges that are equivalent to this one
Expand All @@ -687,7 +688,7 @@ func (cm *cacheManager) Merge(ctx context.Context, inputParents []ImmutableRef,
} else {
// inputParent implements ImmutableRef but isn't our internal struct, get an instance of the internal struct
// by calling Get on its ID.
p, err := cm.Get(ctx, inputParent.ID(), append(opts, NoUpdateLastUsed)...)
p, err := cm.Get(ctx, inputParent.ID(), nil, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
Expand All @@ -710,20 +711,21 @@ func (cm *cacheManager) Merge(ctx context.Context, inputParents []ImmutableRef,
}

// On success, createMergeRef takes ownership of parents
mergeRef, err := cm.createMergeRef(ctx, parents, dhs, opts...)
mergeRef, err := cm.createMergeRef(ctx, parents, dhs, pg, opts...)
if err != nil {
return nil, err
}
return mergeRef, nil
}

func (cm *cacheManager) createMergeRef(ctx context.Context, parents parentRefs, dhs DescHandlers, opts ...RefOption) (ir *immutableRef, rerr error) {
func (cm *cacheManager) createMergeRef(ctx context.Context, parents parentRefs, dhs DescHandlers, pg progress.Controller, opts ...RefOption) (ir *immutableRef, rerr error) {
if len(parents.mergeParents) == 0 {
// merge of nothing is nothing
return nil, nil
}
if len(parents.mergeParents) == 1 {
// merge of 1 thing is that thing
parents.mergeParents[0].progress = pg
return parents.mergeParents[0], nil
}

Expand Down Expand Up @@ -789,10 +791,10 @@ func (cm *cacheManager) createMergeRef(ctx context.Context, parents parentRefs,

cm.records[id] = rec

return rec.ref(true, dhs), nil
return rec.ref(true, dhs, pg), nil
}

func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ir ImmutableRef, rerr error) {
if lower == nil {
return nil, errors.New("lower ref for diff cannot be nil")
}
Expand All @@ -815,7 +817,7 @@ func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, opt
} else {
// inputParent implements ImmutableRef but isn't our internal struct, get an instance of the internal struct
// by calling Get on its ID.
p, err := cm.Get(ctx, inputParent.ID(), append(opts, NoUpdateLastUsed)...)
p, err := cm.Get(ctx, inputParent.ID(), nil, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -868,7 +870,7 @@ func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, opt
mergeParents.mergeParents[i-len(lowerLayers)] = subUpper.clone()
} else {
subParents := parentRefs{diffParents: &diffParents{lower: subLower.clone(), upper: subUpper.clone()}}
diffRef, err := cm.createDiffRef(ctx, subParents, subUpper.descHandlers,
diffRef, err := cm.createDiffRef(ctx, subParents, subUpper.descHandlers, pg,
WithDescription(fmt.Sprintf("diff %q -> %q", subLower.ID(), subUpper.ID())))
if err != nil {
subParents.release(context.TODO())
Expand All @@ -878,7 +880,7 @@ func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, opt
}
}
// On success, createMergeRef takes ownership of mergeParents
mergeRef, err := cm.createMergeRef(ctx, mergeParents, dhs)
mergeRef, err := cm.createMergeRef(ctx, mergeParents, dhs, pg)
if err != nil {
return nil, err
}
Expand All @@ -888,14 +890,14 @@ func (cm *cacheManager) Diff(ctx context.Context, lower, upper ImmutableRef, opt
}

// On success, createDiffRef takes ownership of parents
diffRef, err := cm.createDiffRef(ctx, parents, dhs, opts...)
diffRef, err := cm.createDiffRef(ctx, parents, dhs, pg, opts...)
if err != nil {
return nil, err
}
return diffRef, nil
}

func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, dhs DescHandlers, opts ...RefOption) (ir *immutableRef, rerr error) {
func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, dhs DescHandlers, pg progress.Controller, opts ...RefOption) (ir *immutableRef, rerr error) {
dps := parents.diffParents
if err := dps.lower.Finalize(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to finalize lower parent during diff")
Expand Down Expand Up @@ -963,7 +965,7 @@ func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, d

cm.records[id] = rec

return rec.ref(true, dhs), nil
return rec.ref(true, dhs, pg), nil
}

func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
Expand Down Expand Up @@ -1390,7 +1392,7 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo)
func(d *client.UsageInfo) {
eg.Go(func() error {
cm.mu.Lock()
ref, err := cm.get(ctx, d.ID, NoUpdateLastUsed)
ref, err := cm.get(ctx, d.ID, nil, NoUpdateLastUsed)
cm.mu.Unlock()
if err != nil {
d.Size = 0
Expand Down
Loading