Skip to content

Commit

Permalink
define countingSemaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
gsquared94 committed Sep 29, 2020
1 parent 95beb46 commit aa0a5bb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 42 deletions.
62 changes: 33 additions & 29 deletions pkg/skaffold/build/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
)

// artifactChanModel models the artifact dependency graph using a set of channels.
// artifactDAG models the artifact dependency graph using a set of channels.
// Each artifact has a status struct that has success and a failure channel which it closes once it completes building by calling either markSuccess or markFailure respectively.
// This notifies all listeners waiting for this artifact of a successful or failed build.
// Additionally it has a reference to the channels for each of its dependencies.
// Calling `waitForDependencies` ensures that all required artifacts' channels have already been closed and as such have finished building before the current artifact build starts.
type artifactChanModel struct {
artifact *latest.Artifact
artifactStatus status
requiredArtifactStatuses []status
concurrencySem chan bool
type artifactDAG struct {
*latest.Artifact
status status
dependencyStatuses []status
}

type status struct {
Expand All @@ -41,22 +40,17 @@ type status struct {
failure chan interface{}
}

func (a *artifactChanModel) markSuccess() {
func (a *artifactDAG) markSuccess() {
// closing channel notifies all listeners waiting for this build that it succeeded
close(a.artifactStatus.success)
<-a.concurrencySem
close(a.status.success)
}

func (a *artifactChanModel) markFailure() {
func (a *artifactDAG) markFailure() {
// closing channel notifies all listeners waiting for this build that it failed
close(a.artifactStatus.failure)
<-a.concurrencySem
close(a.status.failure)
}
func (a *artifactChanModel) waitForDependencies(ctx context.Context) error {
defer func() {
a.concurrencySem <- true
}()
for _, depStatus := range a.requiredArtifactStatuses {
func (a *artifactDAG) waitForDependencies(ctx context.Context) error {
for _, depStatus := range a.dependencyStatuses {
// wait for required builds to complete
select {
case <-ctx.Done():
Expand All @@ -69,7 +63,7 @@ func (a *artifactChanModel) waitForDependencies(ctx context.Context) error {
return nil
}

func makeArtifactChanModel(artifacts []*latest.Artifact, c int) []*artifactChanModel {
func makeArtifactChanModel(artifacts []*latest.Artifact) []*artifactDAG {
statusMap := make(map[string]status)
for _, a := range artifacts {
statusMap[a.ImageName] = status{
Expand All @@ -79,19 +73,29 @@ func makeArtifactChanModel(artifacts []*latest.Artifact, c int) []*artifactChanM
}
}

if c == 0 {
c = len(artifacts)
}
// sem is a channel that will allow up to `c` concurrent operations.
sem := make(chan bool, c)

var acmSlice []*artifactChanModel
var dags []*artifactDAG
for _, a := range artifacts {
acm := &artifactChanModel{artifact: a, artifactStatus: statusMap[a.ImageName], concurrencySem: sem}
dag := &artifactDAG{Artifact: a, status: statusMap[a.ImageName]}
for _, d := range a.Dependencies {
acm.requiredArtifactStatuses = append(acm.requiredArtifactStatuses, statusMap[d.ImageName])
dag.dependencyStatuses = append(dag.dependencyStatuses, statusMap[d.ImageName])
}
acmSlice = append(acmSlice, acm)
dags = append(dags, dag)
}
return dags
}

// countingSemaphore uses a buffered channel of size `n` that acts like a counting semaphore, allowing up to `n` concurrent operations
type countingSemaphore struct {
sem chan bool
}

func newCountingSemaphore(count int) countingSemaphore {
return countingSemaphore{sem: make(chan bool, count)}
}

func (c countingSemaphore) acquire() (release func()) {
c.sem <- true
return func() {
<-c.sem
}
return acmSlice
}
33 changes: 20 additions & 13 deletions pkg/skaffold/build/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ var (
// InOrder builds a list of artifacts in dependency order.
// `concurrency` specifies the max number of builds that can run at any one time. If concurrency is 0, then it's set to the length of the `artifacts` slice.
// Each artifact build runs in its own goroutine. It limits the max concurrency using a buffered channel like a semaphore.
// At the same time, it uses the `artifactChanModel` to model the artifacts dependency graph and to make each artifact build wait for its required artifacts' builds.
// At the same time, it uses the `artifactDAG` to model the artifacts dependency graph and to make each artifact build wait for its required artifacts' builds.
func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts []*latest.Artifact, buildArtifact ArtifactBuilder, concurrency int) ([]Artifact, error) {
acmSlice := makeArtifactChanModel(artifacts, concurrency)
acmSlice := makeArtifactChanModel(artifacts)
var wg sync.WaitGroup
defer wg.Wait()

Expand All @@ -53,16 +53,21 @@ func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts [
results := new(sync.Map)
outputs := make([]chan string, len(acmSlice))

if concurrency == 0 {
concurrency = len(artifacts)
}
sem := newCountingSemaphore(concurrency)

wg.Add(len(artifacts))
for i := range acmSlice {
outputs[i] = make(chan string, buffSize)
r, w := io.Pipe()

// Create a goroutine for each element in acmSlice. Each goroutine waits on its dependencies to finish building.
// Because our artifacts form a DAG, at least one of the goroutines should be able to start building.
go func(a *artifactChanModel) {
go func(a *artifactDAG) {
// Run build and write output/logs to piped writer and store build result in sync.Map
runBuild(ctx, w, tags, a, results, buildArtifact)
runBuild(ctx, w, tags, a, results, buildArtifact, sem)
wg.Done()
}(acmSlice[i])

Expand All @@ -74,27 +79,29 @@ func InOrder(ctx context.Context, out io.Writer, tags tag.ImageTags, artifacts [
return collectResults(out, artifacts, results, outputs)
}

func runBuild(ctx context.Context, cw io.WriteCloser, tags tag.ImageTags, a *artifactChanModel, results *sync.Map, build ArtifactBuilder) {
func runBuild(ctx context.Context, cw io.WriteCloser, tags tag.ImageTags, a *artifactDAG, results *sync.Map, build ArtifactBuilder, sem countingSemaphore) {
defer cw.Close()
err := a.waitForDependencies(ctx)
event.BuildInProgress(a.artifact.ImageName)
release := sem.acquire()
defer release()
event.BuildInProgress(a.ImageName)
if err != nil {
event.BuildFailed(a.artifact.ImageName, err)
results.Store(a.artifact.ImageName, err)
event.BuildFailed(a.ImageName, err)
results.Store(a.ImageName, err)
a.markFailure()
return
}

finalTag, err := getBuildResult(ctx, cw, tags, a.artifact, build)
finalTag, err := getBuildResult(ctx, cw, tags, a.Artifact, build)
if err != nil {
event.BuildFailed(a.artifact.ImageName, err)
results.Store(a.artifact.ImageName, err)
event.BuildFailed(a.ImageName, err)
results.Store(a.ImageName, err)
a.markFailure()
return
}

event.BuildComplete(a.artifact.ImageName)
ar := Artifact{ImageName: a.artifact.ImageName, Tag: finalTag}
event.BuildComplete(a.ImageName)
ar := Artifact{ImageName: a.ImageName, Tag: finalTag}
results.Store(ar.ImageName, ar)
a.markSuccess()
}
Expand Down

0 comments on commit aa0a5bb

Please sign in to comment.