Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel taskrun using entrypoint binary #4618

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -49,6 +50,7 @@ var (
onError = flag.String("on_error", "", "Set to \"continue\" to ignore an error and continue when a container terminates with a non-zero exit code."+
" Set to \"stopAndFail\" to declare a failure with a step error and stop executing the rest of the steps.")
stepMetadataDir = flag.String("step_metadata_dir", "", "If specified, create directory to store the step metadata e.g. /tekton/steps/<step-name>/")
cancelFile = flag.String("cancel_file", "", "Path indicating task should be cancelled")
)

const (
Expand All @@ -58,7 +60,7 @@ const (

func checkForBreakpointOnFailure(e entrypoint.Entrypointer, breakpointExitPostFile string) {
if e.BreakpointOnFailure {
if waitErr := e.Waiter.Wait(breakpointExitPostFile, false, false); waitErr != nil {
if waitErr := e.Waiter.Wait(context.Background(), breakpointExitPostFile, false, false); waitErr != nil {
log.Println("error occurred while waiting for " + breakpointExitPostFile + " : " + waitErr.Error())
}
// get exitcode from .breakpointexit
Expand Down Expand Up @@ -136,6 +138,7 @@ func main() {
BreakpointOnFailure: *breakpointOnFailure,
OnError: *onError,
StepMetadataDir: *stepMetadataDir,
CancelFile: *cancelFile,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand All @@ -144,7 +147,8 @@ func main() {
log.Printf("non-fatal error copying credentials: %q", err)
}

if err := e.Go(); err != nil {
ctx := context.Background()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this value of ctx be passed to checkForBreakpointOnFailure, instead of creating a new one in that function?

if err := e.Go(ctx); err != nil {
breakpointExitPostFile := e.PostFile + breakpointExitSuffix
switch t := err.(type) {
case skipError:
Expand Down
11 changes: 9 additions & 2 deletions cmd/entrypoint/waiter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -31,11 +32,17 @@ func (rw *realWaiter) setWaitPollingInterval(pollingInterval time.Duration) *rea
//
// If a file of the same name with a ".err" extension exists then this Wait
// will end with a skipError.
func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure bool) error {
func (rw *realWaiter) Wait(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error {
if file == "" {
return nil
}
for ; ; time.Sleep(rw.waitPollingInterval) {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(rw.waitPollingInterval):
}

if info, err := os.Stat(file); err == nil {
if !expectContent || info.Size() > 0 {
return nil
Expand Down
68 changes: 61 additions & 7 deletions cmd/entrypoint/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ limitations under the License.
package main

import (
"context"
"errors"
"io/ioutil"
"os"
"strings"
"testing"
"time"
)

const testWaitPollingInterval = 10 * time.Millisecond
const testWaitPollingInterval = 25 * time.Millisecond

func TestRealWaiterWaitMissingFile(t *testing.T) {
// Create a temp file and then immediately delete it to get
Expand All @@ -37,8 +39,9 @@ func TestRealWaiterWaitMissingFile(t *testing.T) {
os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmp.Name(), false, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
Expand All @@ -60,8 +63,9 @@ func TestRealWaiterWaitWithFile(t *testing.T) {
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmp.Name(), false, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
Expand All @@ -83,8 +87,9 @@ func TestRealWaiterWaitMissingContent(t *testing.T) {
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmp.Name(), true, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
Expand All @@ -106,8 +111,9 @@ func TestRealWaiterWaitWithContent(t *testing.T) {
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmp.Name(), true, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
Expand All @@ -133,9 +139,10 @@ func TestRealWaiterWaitWithErrorWaitfile(t *testing.T) {
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
// error of type skipError is returned after encountering a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmpFileName, false, false)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmpFileName, false, false)
if err == nil {
t.Errorf("expected skipError upon encounter error waitfile")
}
Expand Down Expand Up @@ -163,9 +170,10 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
ctx := context.Background()
go func() {
// When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmpFileName, false, true)
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, tmpFileName, false, true)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
Expand All @@ -178,3 +186,49 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitWithCancel(t *testing.T) {
rw := realWaiter{}
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, "does_not_exist", true, false)
if err != nil && !errors.Is(err, context.Canceled) {
t.Error("error waiting")
}
close(doneCh)
}()
time.Sleep(testWaitPollingInterval)
cancel()

select {
case <-doneCh:
// Success
case <-time.After(2 * testWaitPollingInterval):
t.Errorf("expected Wait() to have exited by now")
}
}

func TestRealWaiterWaitWithDeadline(t *testing.T) {
rw := realWaiter{}
doneCh := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), testWaitPollingInterval)
defer cancel()
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(ctx, "does_not_exist", true, false)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
t.Error("error waiting")
}
close(doneCh)
}()
time.Sleep(2 * testWaitPollingInterval)
cancel()

select {
case <-doneCh:
// Success
case <-time.After(2 * testWaitPollingInterval):
t.Errorf("expected Wait() to have exited by now")
}
}
7 changes: 7 additions & 0 deletions pkg/apis/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
DefaultSendCloudEventsForRuns = false
// DefaultEmbeddedStatus is the default value for "embedded-status".
DefaultEmbeddedStatus = FullEmbeddedStatus
// DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint"
DefaultEnableCancelUsingEntrypoint = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name for this feature flag might be something that describes the user-facing behavior changes, e.g. "stopPodOnCancel"


disableAffinityAssistantKey = "disable-affinity-assistant"
disableCredsInitKey = "disable-creds-init"
Expand All @@ -70,6 +72,7 @@ const (
scopeWhenExpressionsToTask = "scope-when-expressions-to-task"
sendCloudEventsForRuns = "send-cloudevents-for-runs"
embeddedStatus = "embedded-status"
enableCancelUsingEntrypoint = "enable-cancel-using-entrypoint"
)

// FeatureFlags holds the features configurations
Expand All @@ -85,6 +88,7 @@ type FeatureFlags struct {
EnableAPIFields string
SendCloudEventsForRuns bool
EmbeddedStatus string
EnableCancelUsingEntrypoint bool
}

// GetFeatureFlagsConfigName returns the name of the configmap containing all
Expand Down Expand Up @@ -136,6 +140,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) {
if err := setEmbeddedStatus(cfgMap, DefaultEmbeddedStatus, &tc.EmbeddedStatus); err != nil {
return nil, err
}
if err := setFeature(enableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil {
return nil, err
}

// Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if
// enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/feature_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) {
EnableAPIFields: "alpha",
SendCloudEventsForRuns: true,
EmbeddedStatus: "both",
EnableCancelUsingEntrypoint: true,
},
fileName: "feature-flags-all-flags-set",
},
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/testdata/feature-flags-all-flags-set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ data:
enable-api-fields: "alpha"
send-cloudevents-for-runs: "true"
embedded-status: "both"
enable-cancel-using-entrypoint: "true"
42 changes: 36 additions & 6 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,15 @@ type Entrypointer struct {
OnError string
// StepMetadataDir is the directory for a step where the step related metadata can be stored
StepMetadataDir string

// CancelFile is the file that causes the task to be cancelled.
CancelFile string
}

// Waiter encapsulates waiting for files to exist.
type Waiter interface {
// Wait blocks until the specified file exists.
Wait(file string, expectContent bool, breakpointOnFailure bool) error
Wait(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error
}

// Runner encapsulates running commands.
Expand All @@ -101,7 +104,7 @@ type PostWriter interface {

// Go optionally waits for a file, runs the command, and writes a
// post file.
func (e Entrypointer) Go() error {
func (e Entrypointer) Go(ctx context.Context) error {
prod, _ := zap.NewProduction()
logger := prod.Sugar()

Expand All @@ -114,7 +117,7 @@ func (e Entrypointer) Go() error {
}()

for _, f := range e.WaitFiles {
if err := e.Waiter.Wait(f, e.WaitFileContent, e.BreakpointOnFailure); err != nil {
if err := e.Waiter.Wait(ctx, f, e.WaitFileContent, e.BreakpointOnFailure); err != nil {
// An error happened while waiting, so we bail
// *but* we write postfile to make next steps bail too.
// In case of breakpoint on failure do not write post file.
Expand Down Expand Up @@ -142,19 +145,46 @@ func (e Entrypointer) Go() error {
}

if err == nil {
ctx := context.Background()
var cancel context.CancelFunc
if e.Timeout != nil && *e.Timeout != time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, *e.Timeout)
defer cancel()
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- e.Runner.Run(ctx, e.Command...)
cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean we'll run cancel twice ? (here and the defer)
I don't remember if it's a no-op or if it panics.. If it's a no-op we are fine though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. Yes, we will end up calling cancel twice on the context, but that is no-op (I tested to be sure).
I added the defer cancel() for both readability and ensuring clean-up. If removing it is preferred, please let me know.

}()

var cancelled bool
if e.CancelFile != "" {
if err := e.Waiter.Wait(ctx, e.CancelFile, true, e.BreakpointOnFailure); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit weird to reuse Waiter.Wait for a cancel file in this way; for example "breakpointOnFailure" isn't meaningful here.

Just to make sure I understand what this change is doing:

  • we were previously calling Runner.Run in the main thread (which runs the step entrypoint)
  • now we're running Runner.Run in a goroutine and cancelling its context when it returns
  • If there's a cancel file, the main thread waits for it to exist
  • If the cancel file exists, the Waiter returns an error, and the main thread cancels the original context
  • If the cancel file never exists, Runner.Run will eventually complete and this function will return
  • If there is no cancel file, this function will just wait for Runner.Run to complete in the goroutine

I am wondering if there's some opportunity to simplify this logic? It would be helpful to at least include a comment block explaining a bit about what this does because it's a bit hard to parse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. Indeed the logic here could benefit from some simplification. I will do another review to see if it can be simplified without making too many modifications elsewhere.

return err
}
if ctx.Err() == nil {
cancel()
cancelled = true
}
}
err = e.Runner.Run(ctx, e.Command...)
err = <-errChan

if err == context.DeadlineExceeded {
output = append(output, v1beta1.PipelineResourceResult{
Key: "Reason",
Value: "TimeoutExceeded",
ResultType: v1beta1.InternalTektonResultType,
})
} else if cancelled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right type of error to return-- this seems like something that should be handled by the reconciler. In particular, PipelineResourceResult doesn't seem appropriate as it's not related to pipelineresources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lbernick I think the name is a bit infortunate as it is this struct that we use for returning Result I think 🙃
We could rename it at some point.

// Waiter has found the cancel file: Cancel the run
cancel()
output = append(output, v1beta1.PipelineResourceResult{
Key: "Reason",
Value: "Cancelled",
ResultType: v1beta1.InternalTektonResultType,
})
}
}

Expand Down
Loading