Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Start installing packages as they become ready (#75)
Browse files Browse the repository at this point in the history
Previously, we would wait for everything to be fetched and expanded, but
this change allows us to start installing concurrently with fetches.

This assumes that we must install in the exact order returend by
ResolveWorld, but we can likely improve upon this.

Signed-off-by: Jon Johnson <jon.johnson@chainguard.dev>
  • Loading branch information
jonjohnsonjr authored Jul 5, 2023
1 parent ebe7903 commit 0f56281
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions pkg/apk/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,46 @@ func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error
// TODO: Consider making this configurable option.
jobs := runtime.GOMAXPROCS(0)

var g errgroup.Group
g.SetLimit(jobs)
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(jobs + 1)

expanded := make([]*APKExpanded, len(allpkgs))

// First we concurrently fetch and expand all our APKs.
// A slice of pseudo-promises that get closed when expanded[i] is ready.
done := make([]chan struct{}, len(allpkgs))
for i := range allpkgs {
done[i] = make(chan struct{})
}

// Kick off a goroutine that sequentially installs packages as they become ready.
//
// We could probably do better than this by mirroring the dependency graph or even
// just computing non-overlapping packages based on the installed files, but we'll
// keep this simple for now by assuming we must install in the given order exactly.
g.Go(func() error {
for i, ch := range done {
select {
case <-gctx.Done():
return gctx.Err()
case <-ch:
exp := expanded[i]
if exp == nil {
continue
}

pkg := allpkgs[i]

if err := a.installPackage(gctx, pkg, exp, sourceDateEpoch); err != nil {
return fmt.Errorf("installing %s: %w", pkg.Name, err)
}
}
}

return nil
})

// Meanwhile, concurrently fetch and expand all our APKs.
// We signal they are ready to be installed by closing done[i].
for i, pkg := range allpkgs {
i, pkg := i, pkg

Expand All @@ -501,38 +535,26 @@ func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error
return nil
}

rc, err := a.fetchPackage(ctx, pkg)
rc, err := a.fetchPackage(gctx, pkg)
if err != nil {
return fmt.Errorf("fetching package %q: %w", pkg.Name, err)
}
defer rc.Close()

exp, err := ExpandApk(ctx, rc)
exp, err := ExpandApk(gctx, rc)
if err != nil {
return err
return fmt.Errorf("expanding %s: %w", pkg.Name, err)
}

expanded[i] = exp
close(done[i])

return nil
})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("expanding packages: %w", err)
}

// Then we sequentially install them.
for i, pkg := range allpkgs {
exp := expanded[i]
if exp == nil {
// As above, so below.
continue
}

if err := a.installPackage(ctx, pkg, exp, sourceDateEpoch); err != nil {
return err
}
return fmt.Errorf("installing packages: %w", err)
}

return nil
Expand Down

0 comments on commit 0f56281

Please sign in to comment.