From 2559fb73cb6950725c1d6114fa99e38ef87dddf4 Mon Sep 17 00:00:00 2001 From: Scott Cotton Date: Mon, 30 Dec 2024 15:55:32 +0100 Subject: [PATCH 1/2] checkpoint checkpoint --- internal/command/jobs/submit.go | 56 +++++++++++++++++++++++++++++++-- internal/config/job.go | 2 ++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/internal/command/jobs/submit.go b/internal/command/jobs/submit.go index a722228..7015820 100644 --- a/internal/command/jobs/submit.go +++ b/internal/command/jobs/submit.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/signadot/cli/internal/config" "github.com/signadot/cli/internal/print" @@ -36,6 +37,9 @@ func submit(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer) er if cfg.Filename == "" { return errors.New("must specify job request file with '-f' flag") } + if cfg.Wait && cfg.Attach { + return errors.New("cannot specify both --attach and --wait") + } req, err := loadJob(cfg.Filename, cfg.TemplateVals, false /*forDelete */) if err != nil { return err @@ -48,15 +52,63 @@ func submit(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer) er return err } resp := result.Payload + if cfg.Wait { + job, err := waitJob(ctx, cfg, resp.Name) + if err != nil { + writeOutput(ctx, cfg, outW, errW, resp) + return fmt.Errorf("error waiting for job %q: %w", resp.Name, err) + } + resp = job + } - return writeOutput(ctx, cfg, outW, errW, resp) + err = writeOutput(ctx, cfg, outW, errW, resp) + if err != nil { + return err + } + if cfg.Wait { + switch ph := resp.Status.Attempts[0].Phase; ph { + case "canceled", "failed": + return fmt.Errorf("job %q %s", resp.Name, ph) + } + } + return nil +} + +func waitJob(ctx context.Context, cfg *config.JobSubmit, name string) (*models.Job, error) { + + ticker := time.NewTicker(time.Second / 5) + defer ticker.Stop() + params := &jobs.GetJobParams{ + JobName: name, + OrgName: cfg.Org, + Context: ctx, + } + for { + res, err := cfg.Client.Jobs.GetJob(params, nil) + if err != nil { + return nil, err + } + attempts := res.Payload.Status.Attempts + if len(attempts) == 0 { + return nil, fmt.Errorf("no attempts in job %q", name) + } + switch attempts[0].Phase { + case "failed", "succeeded", "canceled": + return res.Payload, nil + } + select { + case <-ticker.C: + case <-ctx.Done(): + return nil, ctx.Err() + } + } } func writeOutput(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer, resp *models.Job) error { switch cfg.OutputFormat { case config.OutputFormatDefault: // Print info on how to access the job. - fmt.Fprintf(outW, "Job %s queued on Job Runner Group: %s\n", resp.Name, resp.Spec.RunnerGroup) + fmt.Fprintf(outW, "Job %s %s on Job Runner Group: %s\n", resp.Name, resp.Status.Attempts[0].Phase, resp.Spec.RunnerGroup) fmt.Fprintf(outW, "\nDashboard page: %v\n\n", cfg.JobDashboardUrl(resp.Name)) var err error diff --git a/internal/config/job.go b/internal/config/job.go index f7da17e..c120677 100644 --- a/internal/config/job.go +++ b/internal/config/job.go @@ -18,6 +18,7 @@ type JobSubmit struct { Attach bool Timeout time.Duration TemplateVals TemplateVals + Wait bool } func (c *JobSubmit) AddFlags(cmd *cobra.Command) { @@ -26,6 +27,7 @@ func (c *JobSubmit) AddFlags(cmd *cobra.Command) { cmd.Flags().Var(&c.TemplateVals, "set", "--set var=val") cmd.Flags().BoolVar(&c.Attach, "attach", false, "waits until the job is completed, displaying the stdout and stderr streams") cmd.Flags().DurationVar(&c.Timeout, "timeout", 0, "timeout when waiting for the job to be started, if 0 is specified, no timeout will be applied and the command will wait until completion or cancellation of the job (default 0)") + cmd.Flags().BoolVar(&c.Wait, "wait", false, "waits until the job is completed") } type JobDelete struct { From fff7fd8aa7487b1705794c9f52c2b6a4ae54a7ac Mon Sep 17 00:00:00 2001 From: Scott Cotton Date: Tue, 7 Jan 2025 17:11:41 +0100 Subject: [PATCH 2/2] CR: - reduce poll frequency - make sure timeout applies to `--wait` properly also correct the doc for `--timeout` --- internal/command/jobs/submit.go | 7 ++++++- internal/config/job.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/command/jobs/submit.go b/internal/command/jobs/submit.go index 7015820..d98cc4b 100644 --- a/internal/command/jobs/submit.go +++ b/internal/command/jobs/submit.go @@ -76,8 +76,13 @@ func submit(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer) er func waitJob(ctx context.Context, cfg *config.JobSubmit, name string) (*models.Job, error) { - ticker := time.NewTicker(time.Second / 5) + ticker := time.NewTicker(time.Second) defer ticker.Stop() + if cfg.Timeout != 0 { + ctxTimeout, cancel := context.WithTimeout(ctx, cfg.Timeout) + defer cancel() + ctx = ctxTimeout + } params := &jobs.GetJobParams{ JobName: name, OrgName: cfg.Org, diff --git a/internal/config/job.go b/internal/config/job.go index c120677..712eedf 100644 --- a/internal/config/job.go +++ b/internal/config/job.go @@ -26,7 +26,7 @@ func (c *JobSubmit) AddFlags(cmd *cobra.Command) { cmd.MarkFlagRequired("filename") cmd.Flags().Var(&c.TemplateVals, "set", "--set var=val") cmd.Flags().BoolVar(&c.Attach, "attach", false, "waits until the job is completed, displaying the stdout and stderr streams") - cmd.Flags().DurationVar(&c.Timeout, "timeout", 0, "timeout when waiting for the job to be started, if 0 is specified, no timeout will be applied and the command will wait until completion or cancellation of the job (default 0)") + cmd.Flags().DurationVar(&c.Timeout, "timeout", 0, "timeout when waiting for the job, if 0 is specified, no timeout will be applied and the command will wait until completion or cancellation of the job (default 0)") cmd.Flags().BoolVar(&c.Wait, "wait", false, "waits until the job is completed") }