Skip to content

Commit

Permalink
Some scheduler cleanup
Browse files Browse the repository at this point in the history
The main improvement is the removal of `err` as a _field_ from `jobWrapper`. The second is clearer logic for the error path (fallback to failure job).
  • Loading branch information
DrJosh9000 committed May 27, 2024
1 parent 6ad40d6 commit ad03163
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
67 changes: 33 additions & 34 deletions internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,40 @@ type worker struct {
func (w *worker) Create(ctx context.Context, job *api.CommandJob) error {
logger := w.logger.With(zap.String("uuid", job.Uuid))
logger.Info("creating job")
jobWrapper := NewJobWrapper(w.logger, job, w.cfg).ParsePlugins()

jobWrapper := NewJobWrapper(w.logger, job, w.cfg)

if err := jobWrapper.ParsePlugins(); err != nil {
logger.Warn("Plugin parsing failed, creating failure job instead", zap.Error(err))
return w.buildAndCreateFallbackJob(ctx, jobWrapper, err)
}

kjob, err := jobWrapper.Build(false)
if err != nil {
logger.Warn("Job definition error detected, creating failure job instead", zap.Error(err))
kjob, err = jobWrapper.BuildFailureJob(err)
if err != nil {
return fmt.Errorf("job definition error and failure job definition error: %w", err)
}
return w.buildAndCreateFallbackJob(ctx, jobWrapper, err)
}

_, err = w.client.BatchV1().Jobs(w.cfg.Namespace).Create(ctx, kjob, metav1.CreateOptions{})
err = w.createJob(ctx, kjob)
if kerrors.IsInvalid(err) {
logger.Warn("Job creation failed, creating failure job instead", zap.Error(err))
return w.buildAndCreateFallbackJob(ctx, jobWrapper, err)
}
return err
}

func (w *worker) buildAndCreateFallbackJob(ctx context.Context, jobWrapper *jobWrapper, err error) error {
kjob, ferr := jobWrapper.BuildFailureJob(err)
if ferr != nil {
return fmt.Errorf("job definition error and failure job definition error: %w", err)
}
return w.createJob(ctx, kjob)
}

func (w *worker) createJob(ctx context.Context, kjob *batchv1.Job) error {
_, err := w.client.BatchV1().Jobs(w.cfg.Namespace).Create(ctx, kjob, metav1.CreateOptions{})
if err != nil {
if kerrors.IsInvalid(err) {
kjob, err = jobWrapper.BuildFailureJob(err)
if err != nil {
return fmt.Errorf("job registration error and failure job definition error: %w", err)
}
_, err = w.client.BatchV1().Jobs(w.cfg.Namespace).Create(ctx, kjob, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("job registration error and failure job registration error: %w", err)
}
return nil
} else {
return err
}
return fmt.Errorf("failed to create job: %w", err)
}
return nil
}
Expand All @@ -111,7 +120,6 @@ type jobWrapper struct {
logger *zap.Logger
job *api.CommandJob
envMap map[string]string
err error
k8sPlugin KubernetesPlugin
otherPlugins []map[string]json.RawMessage
cfg Config
Expand All @@ -126,7 +134,7 @@ func NewJobWrapper(logger *zap.Logger, job *api.CommandJob, config Config) *jobW
}
}

func (w *jobWrapper) ParsePlugins() *jobWrapper {
func (w *jobWrapper) ParsePlugins() error {
for _, val := range w.job.Env {
parts := strings.SplitN(val, "=", 2)
w.envMap[parts[0]] = parts[1]
Expand All @@ -135,38 +143,30 @@ func (w *jobWrapper) ParsePlugins() *jobWrapper {
if pluginsJson, ok := w.envMap["BUILDKITE_PLUGINS"]; ok {
if err := json.Unmarshal([]byte(pluginsJson), &plugins); err != nil {
w.logger.Debug("invalid plugin spec", zap.String("json", pluginsJson))
w.err = fmt.Errorf("failed parsing plugins: %w", err)
return w
return fmt.Errorf("failed parsing plugins: %w", err)
}
}
w.logger.Info("parsing", zap.Any("plugins", plugins))
for _, plugin := range plugins {
if len(plugin) != 1 {
w.err = fmt.Errorf("found invalid plugin: %v", plugin)
return w
return fmt.Errorf("found invalid plugin: %v", plugin)
}
if val, ok := plugin["github.com/buildkite-plugins/kubernetes-buildkite-plugin"]; ok {
if err := json.Unmarshal(val, &w.k8sPlugin); err != nil {
w.err = fmt.Errorf("failed parsing Kubernetes plugin: %w", err)
return w
return fmt.Errorf("failed parsing Kubernetes plugin: %w", err)
}
} else {
for k, v := range plugin {
w.otherPlugins = append(w.otherPlugins, map[string]json.RawMessage{k: v})
}
}
}
return w
return nil
}

// Build builds a job. The checkout container will be skipped either by passing
// `true` or if the k8s plugin configuration is configured to skip it.
func (w *jobWrapper) Build(skipCheckout bool) (*batchv1.Job, error) {
// if previous steps have failed, error immediately
if w.err != nil {
return nil, w.err
}

skipCheckout = skipCheckout || w.k8sPlugin.Checkout.Skip

kjob := &batchv1.Job{}
Expand Down Expand Up @@ -622,7 +622,6 @@ su buildkite-agent -c "buildkite-agent-entrypoint bootstrap"`,
}

func (w *jobWrapper) BuildFailureJob(err error) (*batchv1.Job, error) {
w.err = nil
w.k8sPlugin = KubernetesPlugin{
PodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ func TestJobPluginConversion(t *testing.T) {
input,
scheduler.Config{AgentToken: "token-secret"},
)
result, err := wrapper.ParsePlugins().Build(false)
err = wrapper.ParsePlugins()
require.NoError(t, err)
result, err := wrapper.Build(false)
require.NoError(t, err)

assert.Len(t, result.Spec.Template.Spec.Containers, 3)
Expand Down Expand Up @@ -217,7 +219,9 @@ func TestTagEnv(t *testing.T) {
AgentQueryRules: []string{"queue=kubernetes"},
}
wrapper := scheduler.NewJobWrapper(logger, input, scheduler.Config{AgentToken: "token-secret"})
result, err := wrapper.ParsePlugins().Build(false)
err = wrapper.ParsePlugins()
require.NoError(t, err)
result, err := wrapper.Build(false)
require.NoError(t, err)

container := findContainer(t, result.Spec.Template.Spec.Containers, "agent")
Expand Down Expand Up @@ -247,7 +251,9 @@ func TestJobWithNoKubernetesPlugin(t *testing.T) {
AgentQueryRules: []string{},
}
wrapper := scheduler.NewJobWrapper(zaptest.NewLogger(t), input, scheduler.Config{})
result, err := wrapper.ParsePlugins().Build(false)
err := wrapper.ParsePlugins()
require.NoError(t, err)
result, err := wrapper.Build(false)
require.NoError(t, err)

require.Len(t, result.Spec.Template.Spec.Containers, 3)
Expand Down Expand Up @@ -300,8 +306,9 @@ func TestBuild(t *testing.T) {
},
},
},
).ParsePlugins()

)
err = wrapper.ParsePlugins()
require.NoError(t, err)
job, err := wrapper.Build(false)
require.NoError(t, err)

Expand Down Expand Up @@ -349,8 +356,9 @@ func TestBuildSkipCheckout(t *testing.T) {
Image: "buildkite/agent:latest",
AgentToken: "bkcq_1234567890",
},
).ParsePlugins()

)
err = wrapper.ParsePlugins()
require.NoError(t, err)
job, err := wrapper.Build(false)
require.NoError(t, err)

Expand Down Expand Up @@ -383,7 +391,7 @@ func TestFailureJobs(t *testing.T) {
AgentQueryRules: []string{"queue=kubernetes"},
}
wrapper := scheduler.NewJobWrapper(zaptest.NewLogger(t), input, scheduler.Config{})
_, err = wrapper.ParsePlugins().Build(false)
err = wrapper.ParsePlugins()
require.Error(t, err)

result, err := wrapper.BuildFailureJob(err)
Expand Down

0 comments on commit ad03163

Please sign in to comment.