From aa0a5bbac8aa8dbab96f63b13588ea4e7e5bb3bf Mon Sep 17 00:00:00 2001 From: gsquared94 Date: Tue, 29 Sep 2020 23:51:27 +0530 Subject: [PATCH] define `countingSemaphore` --- pkg/skaffold/build/model.go | 62 ++++++++++++++++++--------------- pkg/skaffold/build/scheduler.go | 33 +++++++++++------- 2 files changed, 53 insertions(+), 42 deletions(-) diff --git a/pkg/skaffold/build/model.go b/pkg/skaffold/build/model.go index 017e6e77505..029c5dbc0db 100644 --- a/pkg/skaffold/build/model.go +++ b/pkg/skaffold/build/model.go @@ -23,16 +23,15 @@ import ( "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" ) -// artifactChanModel models the artifact dependency graph using a set of channels. +// artifactDAG models the artifact dependency graph using a set of channels. // Each artifact has a status struct that has success and a failure channel which it closes once it completes building by calling either markSuccess or markFailure respectively. // This notifies all listeners waiting for this artifact of a successful or failed build. // Additionally it has a reference to the channels for each of its dependencies. // Calling `waitForDependencies` ensures that all required artifacts' channels have already been closed and as such have finished building before the current artifact build starts. -type artifactChanModel struct { - artifact *latest.Artifact - artifactStatus status - requiredArtifactStatuses []status - concurrencySem chan bool +type artifactDAG struct { + *latest.Artifact + status status + dependencyStatuses []status } type status struct { @@ -41,22 +40,17 @@ type status struct { failure chan interface{} } -func (a *artifactChanModel) markSuccess() { +func (a *artifactDAG) markSuccess() { // closing channel notifies all listeners waiting for this build that it succeeded - close(a.artifactStatus.success) - <-a.concurrencySem + close(a.status.success) } -func (a *artifactChanModel) markFailure() { +func (a *artifactDAG) markFailure() { // closing channel notifies all listeners waiting for this build that it failed - close(a.artifactStatus.failure) - <-a.concurrencySem + close(a.status.failure) } -func (a *artifactChanModel) waitForDependencies(ctx context.Context) error { - defer func() { - a.concurrencySem <- true - }() - for _, depStatus := range a.requiredArtifactStatuses { +func (a *artifactDAG) waitForDependencies(ctx context.Context) error { + for _, depStatus := range a.dependencyStatuses { // wait for required builds to complete select { case <-ctx.Done(): @@ -69,7 +63,7 @@ func (a *artifactChanModel) waitForDependencies(ctx context.Context) error { return nil } -func makeArtifactChanModel(artifacts []*latest.Artifact, c int) []*artifactChanModel { +func makeArtifactChanModel(artifacts []*latest.Artifact) []*artifactDAG { statusMap := make(map[string]status) for _, a := range artifacts { statusMap[a.ImageName] = status{ @@ -79,19 +73,29 @@ func makeArtifactChanModel(artifacts []*latest.Artifact, c int) []*artifactChanM } } - if c == 0 { - c = len(artifacts) - } - // sem is a channel that will allow up to `c` concurrent operations. - sem := make(chan bool, c) - - var acmSlice []*artifactChanModel + var dags []*artifactDAG for _, a := range artifacts { - acm := &artifactChanModel{artifact: a, artifactStatus: statusMap[a.ImageName], concurrencySem: sem} + dag := &artifactDAG{Artifact: a, status: statusMap[a.ImageName]} for _, d := range a.Dependencies { - acm.requiredArtifactStatuses = append(acm.requiredArtifactStatuses, statusMap[d.ImageName]) + dag.dependencyStatuses = append(dag.dependencyStatuses, statusMap[d.ImageName]) } - acmSlice = append(acmSlice, acm) + dags = append(dags, dag) + } + return dags +} + +// countingSemaphore uses a buffered channel of size `n` that acts like a counting semaphore, allowing up to `n` concurrent operations +type countingSemaphore struct { + sem chan bool +} + +func newCountingSemaphore(count int) countingSemaphore { + return countingSemaphore{sem: make(chan bool, count)} +} + +func (c countingSemaphore) acquire() (release func()) { + c.sem <- true + return func() { + <-c.sem } - return acmSlice } diff --git a/pkg/skaffold/build/scheduler.go b/pkg/skaffold/build/scheduler.go index dc3a98d5990..a01809119b5 100644 --- a/pkg/skaffold/build/scheduler.go +++ b/pkg/skaffold/build/scheduler.go @@ -41,9 +41,9 @@ var ( // InOrder builds a list of artifacts in dependency order. // `concurrency` specifies the max number of builds that can run at any one time. If concurrency is 0, then it's set to the length of the `artifacts` slice. // Each artifact build runs in its own goroutine. It limits the max concurrency using a buffered channel like a semaphore. -// At the same time, it uses the `artifactChanModel` to model the artifacts dependency graph and to make each artifact build wait for its required artifacts' builds. +// At the same time, it uses the `artifactDAG` to model the artifacts dependency graph and to make each artifact build wait for its required artifacts' builds. func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts []*latest.Artifact, buildArtifact ArtifactBuilder, concurrency int) ([]Artifact, error) { - acmSlice := makeArtifactChanModel(artifacts, concurrency) + acmSlice := makeArtifactChanModel(artifacts) var wg sync.WaitGroup defer wg.Wait() @@ -53,6 +53,11 @@ func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts [ results := new(sync.Map) outputs := make([]chan string, len(acmSlice)) + if concurrency == 0 { + concurrency = len(artifacts) + } + sem := newCountingSemaphore(concurrency) + wg.Add(len(artifacts)) for i := range acmSlice { outputs[i] = make(chan string, buffSize) @@ -60,9 +65,9 @@ func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts [ // Create a goroutine for each element in acmSlice. Each goroutine waits on its dependencies to finish building. // Because our artifacts form a DAG, at least one of the goroutines should be able to start building. - go func(a *artifactChanModel) { + go func(a *artifactDAG) { // Run build and write output/logs to piped writer and store build result in sync.Map - runBuild(ctx, w, tags, a, results, buildArtifact) + runBuild(ctx, w, tags, a, results, buildArtifact, sem) wg.Done() }(acmSlice[i]) @@ -74,27 +79,29 @@ func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts [ return collectResults(out, artifacts, results, outputs) } -func runBuild(ctx context.Context, cw io.WriteCloser, tags tag.ImageTags, a *artifactChanModel, results *sync.Map, build ArtifactBuilder) { +func runBuild(ctx context.Context, cw io.WriteCloser, tags tag.ImageTags, a *artifactDAG, results *sync.Map, build ArtifactBuilder, sem countingSemaphore) { defer cw.Close() err := a.waitForDependencies(ctx) - event.BuildInProgress(a.artifact.ImageName) + release := sem.acquire() + defer release() + event.BuildInProgress(a.ImageName) if err != nil { - event.BuildFailed(a.artifact.ImageName, err) - results.Store(a.artifact.ImageName, err) + event.BuildFailed(a.ImageName, err) + results.Store(a.ImageName, err) a.markFailure() return } - finalTag, err := getBuildResult(ctx, cw, tags, a.artifact, build) + finalTag, err := getBuildResult(ctx, cw, tags, a.Artifact, build) if err != nil { - event.BuildFailed(a.artifact.ImageName, err) - results.Store(a.artifact.ImageName, err) + event.BuildFailed(a.ImageName, err) + results.Store(a.ImageName, err) a.markFailure() return } - event.BuildComplete(a.artifact.ImageName) - ar := Artifact{ImageName: a.artifact.ImageName, Tag: finalTag} + event.BuildComplete(a.ImageName) + ar := Artifact{ImageName: a.ImageName, Tag: finalTag} results.Store(ar.ImageName, ar) a.markSuccess() }