Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Plugins return retryable errors where possible (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored and lyft-buildnotify-13 committed Jan 5, 2019
1 parent bd1c3ad commit 2575c97
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 29 deletions.
34 changes: 17 additions & 17 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions go/tasks/v1/flytek8s/plugin_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *K8sTaskExecutor) Initialize(ctx context.Context, recorder types.EventRe
})
}

func (e K8sTaskExecutor) handleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error) {
func (e K8sTaskExecutor) HandleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error) {
errorPath := taskCtx.GetErrorFile()
metadata, err := e.store.Head(ctx, errorPath)
if err != nil {
Expand All @@ -95,6 +95,10 @@ func (e K8sTaskExecutor) handleTaskSuccess(ctx context.Context, taskCtx types.Ta
if errorDoc.Error == nil {
return types.TaskStatusPermanentFailure(errors.Errorf(errors.MetadataAccessFailed, "error not formatted correctly, missing error @path [%v]", errorPath)), nil
}

if errorDoc.Error.Kind == core.ContainerError_RECOVERABLE {
return types.TaskStatusRetryableFailure(errors.Errorf(errorDoc.Error.Code, "user-error. Message: [%s]", errorDoc.Error.Message)), nil
}
return types.TaskStatusPermanentFailure(errors.Errorf(errorDoc.Error.Code, "user-error. Message: [%s]", errorDoc.Error.Message)), nil
}

Expand All @@ -118,7 +122,7 @@ func (e *K8sTaskExecutor) StartTask(ctx context.Context, taskCtx types.TaskConte
if !k8serrors.IsAlreadyExists(err) {
if k8serrors.IsBadRequest(err) {
logger.Errorf(ctx, "Bad Request. [%+v]", o)
return types.TaskStatusPermanentFailure(err), nil, nil
return types.TaskStatusUndefined, nil, err
}
logger.Errorf(ctx, "Failed to launch job, system error. Err: %v", err)
return types.TaskStatusUndefined, nil, err
Expand Down Expand Up @@ -163,7 +167,7 @@ func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.Tas

o, err := e.handler.BuildIdentityResource(taskCtx)
if err != nil {
logger.Warningf(ctx, "Failed to find the Job with name: %v. Error: %v", taskCtx.GetTaskExecutionID().GetGeneratedName(), err)
logger.Warningf(ctx, "Failed to build the Resource with name: %v. Error: %v", taskCtx.GetTaskExecutionID().GetGeneratedName(), err)
return types.TaskStatusPermanentFailure(err), nil, nil
}

Expand All @@ -173,8 +177,15 @@ func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.Tas
if err != nil {
return types.TaskStatusUndefined, nil, err
}
if status.Phase != taskCtx.GetPhase() {
ev := events.CreateEvent(taskCtx, status, info)
finalStatus := status
if status.Phase == types.TaskPhaseSucceeded {
finalStatus, err = e.HandleTaskSuccess(ctx, taskCtx)
if err != nil {
return finalStatus, nil, err
}
}
if finalStatus.Phase != taskCtx.GetPhase() {
ev := events.CreateEvent(taskCtx, finalStatus, info)
if err := e.recorder.RecordTaskEvent(ctx, ev); err != nil {
return types.TaskStatusUndefined, nil, errors.Wrapf(errors.TaskEventRecordingFailed, err, "failed to record state transition [%v] -> [%v]", taskCtx.GetPhase(), status.Phase)
}
Expand Down
73 changes: 73 additions & 0 deletions go/tasks/v1/flytek8s/plugin_executor_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package flytek8s_test

import (
"bytes"
"fmt"
"github.com/lyft/flyteplugins/go/tasks/v1/errors"
"testing"
"time"

"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"

"github.com/lyft/flyteplugins/go/tasks/v1/events"
Expand Down Expand Up @@ -221,3 +225,72 @@ func TestK8sTaskExecutor_CheckTaskStatus(t *testing.T) {
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase, "Expected failure got %s", s.Phase.String())
})
}

func TestK8sTaskExecutor_HandleTaskSuccess(t *testing.T) {
ctx := context.TODO()

tctx := getMockTaskContext()
mockResourceHandler := &mocks.K8sResourceHandler{}
k := flytek8s.NewK8sTaskExecutorForResource("x", &v1.Pod{}, mockResourceHandler, time.Second)
_ = flytek8s.InitializeFake()

t.Run("no-errorflie", func(t *testing.T) {
store, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.NoError(t, err)
assert.Equal(t, s.Phase, types.TaskPhaseSucceeded)
})

t.Run("retryable-error", func(t *testing.T) {
store, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
msg := &core.ErrorDocument{
Error: &core.ContainerError{
Kind: core.ContainerError_RECOVERABLE,
Code: "x",
Message: "y",
},
}
store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, msg)
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.NoError(t, err)
assert.Equal(t, s.Phase, types.TaskPhaseRetryableFailure)
c, ok := errors.GetErrorCode(s.Err)
assert.True(t, ok)
assert.Equal(t, c, "x")
})

t.Run("nonretryable-error", func(t *testing.T) {
store, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
msg := &core.ErrorDocument{
Error: &core.ContainerError{
Kind: core.ContainerError_NON_RECOVERABLE,
Code: "m",
Message: "n",
},
}
store.WriteProtobuf(ctx, tctx.GetErrorFile(), storage.Options{}, msg)
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.NoError(t, err)
assert.Equal(t, s.Phase, types.TaskPhasePermanentFailure)
c, ok := errors.GetErrorCode(s.Err)
assert.True(t, ok)
assert.Equal(t, c, "m")
})

t.Run("corrupted", func(t *testing.T) {
store, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
r := bytes.NewReader([]byte{'x'})
store.WriteRaw(ctx, tctx.GetErrorFile(), r.Size(), storage.Options{}, r)
assert.NoError(t, k.Initialize(ctx, nil, store, func(ownerId string) {}, ""))
s, err := k.HandleTaskSuccess(ctx, tctx)
assert.Error(t, err)
assert.Equal(t, s.Phase, types.TaskPhaseUndefined)
})
}
6 changes: 3 additions & 3 deletions go/tasks/v1/k8splugins/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func DemystifyPending(status v1.PodStatus) (types.TaskStatus, error) {
for _, containerStatus := range status.ContainerStatuses {
if !containerStatus.Ready {
if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == "ImagePullBackOff" {
return types.TaskStatusPermanentFailure(errors.Errorf("ImagePullBackoff", containerStatus.State.Waiting.Reason)), nil
return types.TaskStatusRetryableFailure(errors.Errorf("ImagePullBackoff", containerStatus.State.Waiting.Reason)), nil
}
}
}
Expand All @@ -84,7 +84,7 @@ func (containerTaskExecutor) GetTaskStatus(r flytek8s.K8sResource) (types.TaskSt
logs, err := GetLogsForContainerInPod(pod, 0, " (User)")

if err != nil {
return types.TaskStatusPermanentFailure(err), nil, nil
return types.TaskStatusUndefined, nil, err
}

info := &events.TaskEventInfo{
Expand All @@ -95,7 +95,7 @@ func (containerTaskExecutor) GetTaskStatus(r flytek8s.K8sResource) (types.TaskSt
case v1.PodSucceeded:
return types.TaskStatusSucceeded, info, nil
case v1.PodFailed:
return types.TaskStatusPermanentFailure(errors.Errorf(pod.Status.Reason, pod.Status.Message)), info, nil
return types.TaskStatusRetryableFailure(errors.Errorf(pod.Status.Reason, pod.Status.Message)), info, nil
case v1.PodPending:
status, err := DemystifyPending(pod.Status)
return status, info, err
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/v1/k8splugins/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestContainerTaskExecutor_GetTaskPhase(t *testing.T) {
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.NotNil(t, i)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase)
assert.Equal(t, types.TaskPhaseRetryableFailure, s.Phase)
})

t.Run("failConditionUnschedulable", func(t *testing.T) {
Expand All @@ -108,7 +108,7 @@ func TestContainerTaskExecutor_GetTaskPhase(t *testing.T) {
s, i, err := c.GetTaskStatus(j)
assert.NoError(t, err)
assert.NotNil(t, i)
assert.Equal(t, types.TaskPhasePermanentFailure, s.Phase)
assert.Equal(t, types.TaskPhaseRetryableFailure, s.Phase)
})

t.Run("success", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/v1/k8splugins/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (sparkResourceHandler) GetTaskStatus(r flytek8s.K8sResource) (types.TaskSta
case sparkOp.NewState, sparkOp.SubmittedState:
return types.TaskStatusQueued, info, nil
case sparkOp.FailedState:
return types.TaskStatusPermanentFailure(errors.Errorf("UnknownError", "spark job exited with failure")), info, nil
return types.TaskStatusRetryableFailure(errors.Errorf("UnknownError", "spark job exited with failure")), info, nil
case sparkOp.CompletedState:
return types.TaskStatusSucceeded, info, nil
default:
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/v1/k8splugins/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestGetTaskStatus(t *testing.T) {
assert.Nil(t, err)

taskStatus, i, err = sparkResourceHandler.GetTaskStatus(dummySparkApplication(sj.FailedState))
assert.Equal(t, taskStatus.Phase, types.TaskPhasePermanentFailure)
assert.Equal(t, taskStatus.Phase, types.TaskPhaseRetryableFailure)
assert.NotNil(t, i)
assert.Nil(t, err)
}
Expand Down

0 comments on commit 2575c97

Please sign in to comment.