Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Propagate container error to the node (#39)
Browse files Browse the repository at this point in the history
- in the case when a container exits as success, but writes an error file, we would exit as success. This change actually returns the correct error status
  • Loading branch information
Ketan Umare authored and lyft-buildnotify-15 committed Jan 9, 2019
1 parent 2575c97 commit 4d5c265
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 28 deletions.
4 changes: 2 additions & 2 deletions go/tasks/v1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const configSectionKey = "plugins"
const logConfigSectionKey = "logs"

type Config struct {
IsDebugMode bool `json:"debug-mode" pflag:",Enable debug mode"`
IsDebugMode bool `json:"debug-mode" pflag:",Enable debug mode"`
EnabledPlugins *[]string `json:"enabled-plugins,omitempty" pflag:",List of enabled plugins"`
}

Expand Down Expand Up @@ -35,4 +35,4 @@ func GetConfig() *Config {

func GetLogConfig() *LogConfig {
return config.GetSection(logConfigSectionKey).(*LogConfig)
}
}
2 changes: 1 addition & 1 deletion go/tasks/v1/flytek8s/plugin_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.Tas
return types.TaskStatusUndefined, nil, errors.Wrapf(errors.TaskEventRecordingFailed, err, "failed to record state transition [%v] -> [%v]", taskCtx.GetPhase(), status.Phase)
}
}
return status, nil, err
return finalStatus, nil, err
}

func (e *K8sTaskExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext) error {
Expand Down
177 changes: 155 additions & 22 deletions go/tasks/v1/flytek8s/plugin_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package flytek8s_test
import (
"bytes"
"fmt"
"github.com/lyft/flyteplugins/go/tasks/v1/errors"
"testing"
"time"

"github.com/lyft/flyteplugins/go/tasks/v1/errors"

"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -152,24 +153,31 @@ func TestK8sTaskExecutor_StartTask(t *testing.T) {

func TestK8sTaskExecutor_CheckTaskStatus(t *testing.T) {
ctx := context.TODO()
c := flytek8s.InitializeFake()

evRecorder := &mocks2.EventRecorder{}
t.Run("phaseChange", func(t *testing.T) {

tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)
evRecorder := &mocks2.EventRecorder{}
tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)

assert.NoError(t, k.Initialize(ctx, evRecorder, nil, func(ownerId string) {}, "x"))
c := flytek8s.InitializeFake()
testPod := &v1.Pod{}
testPod.SetName(tctx.GetTaskExecutionID().GetGeneratedName())
testPod.SetNamespace(tctx.GetNamespace())
testPod.SetOwnerReferences([]v12.OwnerReference{tctx.GetOwnerReference()})
_ = c.Delete(ctx, testPod)
assert.NoError(t, c.Create(ctx, testPod))
store, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

t.Run("phaseChange", func(t *testing.T) {
assert.NoError(t, k.Initialize(ctx, evRecorder, store, func(ownerId string) {}, "x"))
testPod := &v1.Pod{}
testPod.SetName(tctx.GetTaskExecutionID().GetGeneratedName())
testPod.SetNamespace(tctx.GetNamespace())
testPod.SetOwnerReferences([]v12.OwnerReference{tctx.GetOwnerReference()})
_ = c.Delete(ctx, testPod)
assert.NoError(t, c.Create(ctx, testPod))
defer func() {
_ = c.Delete(ctx, testPod)
}()

info := &events.TaskEventInfo{}
info.Logs = []*core.TaskLog{{Uri: "log1"}}
Expand All @@ -189,6 +197,28 @@ func TestK8sTaskExecutor_CheckTaskStatus(t *testing.T) {

t.Run("noChange", func(t *testing.T) {

evRecorder := &mocks2.EventRecorder{}
tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)

store, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

assert.NoError(t, k.Initialize(ctx, evRecorder, store, func(ownerId string) {}, "x"))
testPod := &v1.Pod{}
testPod.SetName(tctx.GetTaskExecutionID().GetGeneratedName())
testPod.SetNamespace(tctx.GetNamespace())
testPod.SetOwnerReferences([]v12.OwnerReference{tctx.GetOwnerReference()})
_ = c.Delete(ctx, testPod)
assert.NoError(t, c.Create(ctx, testPod))
defer func() {
_ = c.Delete(ctx, testPod)
}()

info := &events.TaskEventInfo{}
info.Logs = []*core.TaskLog{{Uri: "log1"}}
expectedOldPhase := types.TaskPhaseRunning
Expand All @@ -204,25 +234,128 @@ func TestK8sTaskExecutor_CheckTaskStatus(t *testing.T) {

t.Run("resourceNotFound", func(t *testing.T) {

evRecorder := &mocks2.EventRecorder{}
tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)

store, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

assert.NoError(t, k.Initialize(ctx, evRecorder, store, func(ownerId string) {}, "x"))
_ = flytek8s.InitializeFake()

info := &events.TaskEventInfo{}
info.Logs = []*core.TaskLog{{Uri: "log1"}}
expectedOldPhase := types.TaskPhaseRunning
tctx.On("GetPhase").Return(expectedOldPhase)

// NOTE: We have deleted the object
assert.NoError(t, c.Delete(ctx, testPod))
evRecorder.On("RecordTaskEvent", mock.MatchedBy(func(c context.Context) bool { return true }),
mock.MatchedBy(func(e *event.TaskExecutionEvent) bool { return true })).Return(nil)

s, state, err := k.CheckTaskStatus(ctx, tctx, nil)
assert.Nil(t, state)
assert.NoError(t, err)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase, "Expected failure got %s", s.Phase.String())
})

t.Run("errorFileExit", func(t *testing.T) {

evRecorder := &mocks2.EventRecorder{}
tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)

store, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

assert.NoError(t, k.Initialize(ctx, evRecorder, store, func(ownerId string) {}, "x"))
testPod := &v1.Pod{}
testPod.SetName(tctx.GetTaskExecutionID().GetGeneratedName())
testPod.SetNamespace(tctx.GetNamespace())
testPod.SetOwnerReferences([]v12.OwnerReference{tctx.GetOwnerReference()})

_ = c.Delete(ctx, testPod)
assert.NoError(t, c.Create(ctx, testPod))
defer func() {
_ = c.Delete(ctx, testPod)
}()

assert.NoError(t, store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, &core.ErrorDocument{
Error: &core.ContainerError{
Kind: core.ContainerError_NON_RECOVERABLE,
Code: "code",
Message: "pleh",
},
}))

info := &events.TaskEventInfo{}
info.Logs = []*core.TaskLog{{Uri: "log1"}}
expectedOldPhase := types.TaskPhaseQueued
tctx.On("GetPhase").Return(expectedOldPhase)
mockResourceHandler.On("GetTaskStatus", mock.MatchedBy(func(o *v1.Pod) bool { return true })).Return(types.TaskStatusSucceeded, nil, nil)

evRecorder.On("RecordTaskEvent", mock.MatchedBy(func(c context.Context) bool { return true }),
mock.MatchedBy(func(e *event.TaskExecutionEvent) bool { return true })).Return(nil)

s, state, err := k.CheckTaskStatus(ctx, tctx, nil)
assert.Nil(t, state)
assert.NoError(t, err)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase)
})

t.Run("errorFileExitRecoverable", func(t *testing.T) {

evRecorder := &mocks2.EventRecorder{}
tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
mockResourceHandler.On("BuildIdentityResource", tctx).Return(&v1.Pod{}, nil)

store, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

assert.NoError(t, k.Initialize(ctx, evRecorder, store, func(ownerId string) {}, "x"))
testPod := &v1.Pod{}
testPod.SetName(tctx.GetTaskExecutionID().GetGeneratedName())
testPod.SetNamespace(tctx.GetNamespace())
testPod.SetOwnerReferences([]v12.OwnerReference{tctx.GetOwnerReference()})

_ = c.Delete(ctx, testPod)
assert.NoError(t, c.Create(ctx, testPod))
defer func() {
// Lets add it back in for other tests to work
assert.NoError(t, c.Create(ctx, testPod))
_ = c.Delete(ctx, testPod)
}()

assert.NoError(t, store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, &core.ErrorDocument{
Error: &core.ContainerError{
Kind: core.ContainerError_RECOVERABLE,
Code: "code",
Message: "pleh",
},
}))

info := &events.TaskEventInfo{}
info.Logs = []*core.TaskLog{{Uri: "log1"}}
expectedOldPhase := types.TaskPhaseQueued
tctx.On("GetPhase").Return(expectedOldPhase)
mockResourceHandler.On("GetTaskStatus", mock.MatchedBy(func(o *v1.Pod) bool { return true })).Return(types.TaskStatusSucceeded, nil, nil)

evRecorder.On("RecordTaskEvent", mock.MatchedBy(func(c context.Context) bool { return true }),
mock.MatchedBy(func(e *event.TaskExecutionEvent) bool { return true })).Return(nil)

s, state, err := k.CheckTaskStatus(ctx, tctx, nil)
assert.Nil(t, state)
assert.NoError(t, err)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase, "Expected failure got %s", s.Phase.String())
assert.Equal(t, types.TaskPhaseRetryableFailure, s.Phase)
})
}

Expand Down Expand Up @@ -253,7 +386,7 @@ func TestK8sTaskExecutor_HandleTaskSuccess(t *testing.T) {
Message: "y",
},
}
store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, msg)
assert.NoError(t, store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, msg))
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -287,7 +420,7 @@ func TestK8sTaskExecutor_HandleTaskSuccess(t *testing.T) {
store, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
r := bytes.NewReader([]byte{'x'})
store.WriteRaw(ctx, tctx.GetErrorFile(), r.Size(), storage.Options{}, r)
assert.NoError(t, store.WriteRaw(ctx, tctx.GetErrorFile(), r.Size(), storage.Options{}, r))
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.Error(t, err)
Expand Down
7 changes: 4 additions & 3 deletions go/tasks/v1/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package utils
import (
"context"
"fmt"
"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"reflect"
"regexp"
"strings"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
)

var inputFileRegex = regexp.MustCompile(`(?i){{\s*[\.$]Input\s*}}`)
Expand Down

0 comments on commit 4d5c265

Please sign in to comment.