From afea10ad0e9c9e7c8c4bab76a77d9bb1df30c43d Mon Sep 17 00:00:00 2001 From: Taylor Barrella Date: Fri, 5 Jul 2019 17:48:34 -0700 Subject: [PATCH] Wait for parallel builds to be cancelled on error Previously, if building the first cluster artifact failed, skaffold would exit immediately after. Build goroutines for other artifacts may have still been running and not get a chance to clean up The Kaniko build has logic to handle cancellation, but allow custom commands to be cancelled as well --- pkg/skaffold/build/custom/custom.go | 6 +++--- pkg/skaffold/build/custom/custom_test.go | 5 +++-- pkg/skaffold/build/parallel.go | 9 +++++++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/skaffold/build/custom/custom.go b/pkg/skaffold/build/custom/custom.go index bf6a0eb0c8b..a4b5132f23e 100644 --- a/pkg/skaffold/build/custom/custom.go +++ b/pkg/skaffold/build/custom/custom.go @@ -54,17 +54,17 @@ func NewArtifactBuilder(pushImages bool, additionalEnv []string) *ArtifactBuilde // Build builds a custom artifact // It returns true if the image is expected to exist remotely, or false if it is expected to exist locally func (b *ArtifactBuilder) Build(ctx context.Context, out io.Writer, a *latest.Artifact, tag string) error { - cmd, err := b.retrieveCmd(a, tag) + cmd, err := b.retrieveCmd(ctx, a, tag) if err != nil { return errors.Wrap(err, "retrieving cmd") } return cmd.Run() } -func (b *ArtifactBuilder) retrieveCmd(a *latest.Artifact, tag string) (*exec.Cmd, error) { +func (b *ArtifactBuilder) retrieveCmd(ctx context.Context, a *latest.Artifact, tag string) (*exec.Cmd, error) { artifact := a.CustomArtifact split := strings.Split(artifact.BuildCommand, " ") - cmd := exec.Command(split[0], split[1:]...) + cmd := exec.CommandContext(ctx, split[0], split[1:]...) env, err := b.retrieveEnv(a, tag) if err != nil { return nil, errors.Wrapf(err, "retrieving env variables for %s", a.ImageName) diff --git a/pkg/skaffold/build/custom/custom_test.go b/pkg/skaffold/build/custom/custom_test.go index 88d78028d8f..51df06781e0 100644 --- a/pkg/skaffold/build/custom/custom_test.go +++ b/pkg/skaffold/build/custom/custom_test.go @@ -17,6 +17,7 @@ limitations under the License. package custom import ( + "context" "os" "os/exec" "reflect" @@ -115,7 +116,7 @@ func TestRetrieveCmd(t *testing.T) { t.Override(&buildContext, func(string) (string, error) { return test.artifact.Workspace, nil }) builder := NewArtifactBuilder(false, nil) - cmd, err := builder.retrieveCmd(test.artifact, test.tag) + cmd, err := builder.retrieveCmd(context.Background(), test.artifact, test.tag) t.CheckNoError(err) // cmp.Diff cannot access unexported fields in *exec.Cmd, so use reflect.DeepEqual here directly @@ -127,7 +128,7 @@ func TestRetrieveCmd(t *testing.T) { } func expectedCmd(buildCommand, dir string, args, env []string) *exec.Cmd { - cmd := exec.Command(buildCommand, args...) + cmd := exec.CommandContext(context.Background(), buildCommand, args...) cmd.Dir = dir cmd.Env = env cmd.Stdout = os.Stdout diff --git a/pkg/skaffold/build/parallel.go b/pkg/skaffold/build/parallel.go index 5c78c479745..bc9602b28c2 100644 --- a/pkg/skaffold/build/parallel.go +++ b/pkg/skaffold/build/parallel.go @@ -50,6 +50,9 @@ func InParallel(ctx context.Context, out io.Writer, tags tag.ImageTags, artifact return runInSequence(ctx, out, tags, artifacts, buildArtifact) } + var wg sync.WaitGroup + defer wg.Wait() + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -57,6 +60,7 @@ func InParallel(ctx context.Context, out io.Writer, tags tag.ImageTags, artifact outputs := make([]chan []byte, len(artifacts)) // Run builds in // + wg.Add(len(artifacts)) for i := range artifacts { outputs[i] = make(chan []byte, buffSize) r, w := io.Pipe() @@ -64,7 +68,7 @@ func InParallel(ctx context.Context, out io.Writer, tags tag.ImageTags, artifact // Run build and write output/logs to piped writer and store build result in // sync.Map - go runBuild(ctx, cw, tags, artifacts[i], results, buildArtifact) + go runBuild(ctx, &wg, cw, tags, artifacts[i], results, buildArtifact) // Read build output/logs and write to buffered channel go readOutputAndWriteToChannel(r, outputs[i]) } @@ -73,7 +77,8 @@ func InParallel(ctx context.Context, out io.Writer, tags tag.ImageTags, artifact return collectResults(out, artifacts, results, outputs) } -func runBuild(ctx context.Context, cw io.WriteCloser, tags tag.ImageTags, artifact *latest.Artifact, results *sync.Map, build artifactBuilder) { +func runBuild(ctx context.Context, wg *sync.WaitGroup, cw io.WriteCloser, tags tag.ImageTags, artifact *latest.Artifact, results *sync.Map, build artifactBuilder) { + defer wg.Done() event.BuildInProgress(artifact.ImageName) finalTag, err := getBuildResult(ctx, cw, tags, artifact, build)