diff --git a/internal/command/jobs/printers.go b/internal/command/jobs/printers.go index 604a7b8..f18d89b 100644 --- a/internal/command/jobs/printers.go +++ b/internal/command/jobs/printers.go @@ -1,21 +1,22 @@ package jobs import ( + "errors" "fmt" - "github.com/signadot/cli/internal/command/logs" - "github.com/signadot/cli/internal/poll" - "github.com/signadot/go-sdk/client/jobs" - "golang.org/x/net/context" "io" "os" "sort" "text/tabwriter" "time" + "github.com/signadot/cli/internal/command/logs" + "github.com/signadot/cli/internal/poll" + "golang.org/x/net/context" + "github.com/signadot/cli/internal/config" "github.com/signadot/cli/internal/sdtab" - "github.com/signadot/go-sdk/client/artifacts" "github.com/signadot/go-sdk/models" + "github.com/signadot/go-sdk/utils" "github.com/xeonx/timeago" ) @@ -129,12 +130,7 @@ func printJobDetails(cfg *config.JobGet, out io.Writer, job *models.Job) error { return nil } -func waitForJob(ctx context.Context, cfg *config.JobSubmit, out io.Writer, jobName string) error { - - fmt.Fprintf(out, "Waiting for job execution\n") - - looped := false - +func waitForJob(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer, jobName string) error { delayTime := 2 * time.Second retry := poll. @@ -142,19 +138,25 @@ func waitForJob(ctx context.Context, cfg *config.JobSubmit, out io.Writer, jobNa WithDelay(delayTime). WithTimeout(cfg.Timeout) - lastCursor := "" + lastOutCursor := "" + lastErrCursor := "" + looped := false err := retry.Until(func() bool { + defer func() { + looped = true + }() + j, err := getJob(cfg.Job, jobName) if err != nil { - fmt.Fprintf(out, "Error getting job: %s", err.Error()) + fmt.Fprintf(errW, "Error getting job: %s", err.Error()) // We want to keep retrying if the timeout has not been exceeded return false } - // Increases the time, so if the queue is empty will be likely to start seeing the logs right away without - // any bigger delay + // Increases the time, so if the queue is empty will be likely to start + // seeing the logs right away without any bigger delay if delayTime < MaxTimeBetweenRefresh { delayTime = (1 * time.Second) + delayTime retry.WithDelay(delayTime) @@ -164,55 +166,87 @@ func waitForJob(ctx context.Context, cfg *config.JobSubmit, out io.Writer, jobNa switch attempt.Phase { case "succeed": return true + case "failed": - handleFailedJobPhase(out, j) + handleFailedJobPhase(errW, j) return true + case "queued": if looped { - fmt.Fprintf(out, "\033[1A\033[K") + clearLastLine(outW) } - fmt.Fprintf(out, "Queued on Job Runner Group %s\n", j.Spec.RunnerGroup) + fmt.Fprintf(outW, "Queued on Job Runner Group %s\n", j.Spec.RunnerGroup) return false + case "running": if looped { - fmt.Fprintf(out, "\033[1A\033[K") + clearLastLine(outW) } - newCursor, err := logs.ShowLogs(ctx, cfg.API, out, jobName, cfg.Attach, lastCursor, 0) + errch := make(chan error) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + // stream stdout + cursor, err := logs.ShowLogs(ctx, cfg.API, outW, jobName, utils.LogTypeStdout, lastOutCursor, 0) + if err == nil { + lastOutCursor = cursor + } else if errors.Is(err, context.Canceled) { + err = nil // ignore context cancelations + } + cancel() // this will cause the stderr stream to terminate + errch <- err + }() + + go func() { + // stream stderr + cursor, err := logs.ShowLogs(ctx, cfg.API, errW, jobName, utils.LogTypeStderr, lastErrCursor, 0) + if err == nil { + lastErrCursor = cursor + } else if errors.Is(err, context.Canceled) { + err = nil // ignore context cancelations + } + cancel() // this will make the stdout stream to terminate + errch <- err + }() + + err = errors.Join(<-errch, <-errch) // wait until both streams terminate if err != nil { - fmt.Fprintf(out, "Error getting logs: %s\n", err.Error()) - } else { - lastCursor = newCursor + fmt.Fprintf(errW, "Error getting logs: %s\n", err.Error()) } if j, err = getJob(cfg.Job, jobName); err == nil { switch j.Status.Attempts[0].Phase { case "failed": - handleFailedJobPhase(out, j) + handleFailedJobPhase(errW, j) return true case "succeeded": return true } } + return false case "canceled": - fmt.Fprintf(out, "Stopping cause job execution was canceled\n") + fmt.Fprintf(outW, "The job execution was canceled\n") return true } - looped = true - return false }) return err } -func handleFailedJobPhase(out io.Writer, job *models.Job) { +func clearLastLine(w io.Writer) { + fmt.Fprintf(w, "\033[1A\033[K") +} + +func handleFailedJobPhase(errW io.Writer, job *models.Job) { failedStatus := job.Status.Attempts[0].State.Failed if failedStatus.Message != "" { - fmt.Fprintf(out, "Error: %s\n", failedStatus.Message) + fmt.Fprintf(errW, "Error: %s\n", failedStatus.Message) } exitCode := 1 @@ -223,16 +257,6 @@ func handleFailedJobPhase(out io.Writer, job *models.Job) { os.Exit(exitCode) } -func getJob(cfg *config.Job, jobName string) (*models.Job, error) { - params := jobs.NewGetJobParams().WithOrgName(cfg.Org).WithJobName(jobName) - resp, err := cfg.Client.Jobs.GetJob(params, nil) - if err != nil { - return nil, err - } - - return resp.Payload, nil -} - func getCreatedAt(job *models.Job) string { createdAt := job.CreatedAt if len(createdAt) == 0 { @@ -247,20 +271,6 @@ func getCreatedAt(job *models.Job) string { return timeago.NoMax(timeago.English).Format(t) } -func getArtifacts(cfg *config.JobGet, job *models.Job) ([]*models.JobArtifact, error) { - params := artifacts.NewListJobAttemptArtifactsParams(). - WithOrgName(cfg.Org). - WithJobAttempt(job.Status.Attempts[0].ID). - WithJobName(job.Name) - - resp, err := cfg.Client.Artifacts.ListJobAttemptArtifacts(params, nil) - if err != nil { - return []*models.JobArtifact{}, nil - } - - return resp.Payload, nil -} - type jobArtifactRow struct { Path string `sdtab:"PATH"` Size string `sdtab:"SIZE"` diff --git a/internal/command/jobs/submit.go b/internal/command/jobs/submit.go index 70f35fa..a722228 100644 --- a/internal/command/jobs/submit.go +++ b/internal/command/jobs/submit.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" + "io" + "github.com/signadot/cli/internal/config" "github.com/signadot/cli/internal/print" "github.com/signadot/go-sdk/client/jobs" "github.com/signadot/go-sdk/models" "github.com/spf13/cobra" - "io" ) func newSubmit(job *config.Job) *cobra.Command { @@ -17,23 +18,18 @@ func newSubmit(job *config.Job) *cobra.Command { cmd := &cobra.Command{ Use: "submit -f FILENAME [ --set var1=val1 --set var2=val2 ... ]", - Short: "Create or update a job with variable expansion", + Short: "Submit a job with variable expansion", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - if !cfg.ValidateAttachFlag(cmd) { - return fmt.Errorf("value not valid for --attach") - } - - return submit(cmd.Context(), cfg, cmd.OutOrStdout(), cmd.ErrOrStderr(), args) + return submit(cmd.Context(), cfg, cmd.OutOrStdout(), cmd.ErrOrStderr()) }, } cfg.AddFlags(cmd) - return cmd } -func submit(ctx context.Context, cfg *config.JobSubmit, out, log io.Writer, args []string) error { +func submit(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer) error { if err := cfg.InitAPIConfig(); err != nil { return err } @@ -53,29 +49,25 @@ func submit(ctx context.Context, cfg *config.JobSubmit, out, log io.Writer, args } resp := result.Payload - fmt.Fprintf(log, "Job %s queued on Job Runner Group: %s\n", resp.Name, resp.Spec.RunnerGroup) - - return writeOutput(ctx, cfg, out, resp) + return writeOutput(ctx, cfg, outW, errW, resp) } -func writeOutput(ctx context.Context, cfg *config.JobSubmit, out io.Writer, resp *models.Job) error { +func writeOutput(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer, resp *models.Job) error { switch cfg.OutputFormat { case config.OutputFormatDefault: // Print info on how to access the job. - fmt.Fprintf(out, "\nDashboard page: %v\n\n", cfg.JobDashboardUrl(resp.Name)) + fmt.Fprintf(outW, "Job %s queued on Job Runner Group: %s\n", resp.Name, resp.Spec.RunnerGroup) + fmt.Fprintf(outW, "\nDashboard page: %v\n\n", cfg.JobDashboardUrl(resp.Name)) - switch cfg.Attach { - case "stdout", "stderr": - if err := waitForJob(ctx, cfg, out, resp.Name); err != nil { - return err - } + var err error + if cfg.Attach { + err = waitForJob(ctx, cfg, outW, errW, resp.Name) } - - return nil + return err case config.OutputFormatJSON: - return print.RawJSON(out, resp) + return print.RawJSON(outW, resp) case config.OutputFormatYAML: - return print.RawYAML(out, resp) + return print.RawYAML(outW, resp) default: return fmt.Errorf("unsupported output format: %q", cfg.OutputFormat) } diff --git a/internal/command/jobs/utils.go b/internal/command/jobs/utils.go index 81b9bac..a2d4f72 100644 --- a/internal/command/jobs/utils.go +++ b/internal/command/jobs/utils.go @@ -2,11 +2,39 @@ package jobs import ( "fmt" + "time" + + "github.com/signadot/cli/internal/config" + "github.com/signadot/go-sdk/client/artifacts" + "github.com/signadot/go-sdk/client/jobs" "github.com/signadot/go-sdk/models" "github.com/xeonx/timeago" - "time" ) +func getJob(cfg *config.Job, jobName string) (*models.Job, error) { + params := jobs.NewGetJobParams().WithOrgName(cfg.Org).WithJobName(jobName) + resp, err := cfg.Client.Jobs.GetJob(params, nil) + if err != nil { + return nil, err + } + + return resp.Payload, nil +} + +func getArtifacts(cfg *config.JobGet, job *models.Job) ([]*models.JobArtifact, error) { + params := artifacts.NewListJobAttemptArtifactsParams(). + WithOrgName(cfg.Org). + WithJobAttempt(job.Status.Attempts[0].ID). + WithJobName(job.Name) + + resp, err := cfg.Client.Artifacts.ListJobAttemptArtifacts(params, nil) + if err != nil { + return []*models.JobArtifact{}, nil + } + + return resp.Payload, nil +} + func isJobPhaseToPrintDefault(ph string) bool { if ph == "failed" { return false diff --git a/internal/command/logs/command.go b/internal/command/logs/command.go index 07a27ff..5c6831b 100644 --- a/internal/command/logs/command.go +++ b/internal/command/logs/command.go @@ -11,6 +11,7 @@ import ( "github.com/signadot/cli/internal/config" "github.com/signadot/go-sdk/client" "github.com/signadot/go-sdk/client/job_logs" + "github.com/signadot/go-sdk/utils" "github.com/spf13/cobra" ) @@ -22,8 +23,7 @@ func New(api *config.API) *cobra.Command { Short: "Display job logs", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - _, err := ShowLogs(cmd.Context(), cfg.API, cmd.OutOrStdout(), cfg.Job, cfg.Stream, "", int(cfg.TailLines)) - return err + return showLogs(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(), cfg) }, } @@ -31,6 +31,23 @@ func New(api *config.API) *cobra.Command { return cmd } +func showLogs(ctx context.Context, outW, errW io.Writer, cfg *config.Logs) error { + if err := cfg.InitAPIConfig(); err != nil { + return err + } + + var w io.Writer + switch cfg.Stream { + case utils.LogTypeStderr: + w = errW + default: + w = outW + } + + _, err := ShowLogs(ctx, cfg.API, w, cfg.Job, cfg.Stream, "", int(cfg.TailLines)) + return err +} + type event struct { Event string `sse:"event"` Data string `sse:"data"` diff --git a/internal/config/job.go b/internal/config/job.go index 3d343b7..f7da17e 100644 --- a/internal/config/job.go +++ b/internal/config/job.go @@ -1,8 +1,9 @@ package config import ( - "github.com/spf13/cobra" "time" + + "github.com/spf13/cobra" ) type Job struct { @@ -14,7 +15,7 @@ type JobSubmit struct { // Flags Filename string - Attach string + Attach bool Timeout time.Duration TemplateVals TemplateVals } @@ -23,27 +24,8 @@ func (c *JobSubmit) AddFlags(cmd *cobra.Command) { cmd.Flags().StringVarP(&c.Filename, "filename", "f", "", "YAML or JSON file containing the jobs creation request") cmd.MarkFlagRequired("filename") cmd.Flags().Var(&c.TemplateVals, "set", "--set var=val") - - cmd.Flags().StringVarP(&c.Attach, "attach", "", "", "waits until the job is completed, displaying the selected stream (accepted values: stdout or stderr)") - + cmd.Flags().BoolVar(&c.Attach, "attach", false, "waits until the job is completed, displaying the stdout and stderr streams") cmd.Flags().DurationVar(&c.Timeout, "timeout", 0, "timeout when waiting for the job to be started, if 0 is specified, no timeout will be applied and the command will wait until completion or cancellation of the job (default 0)") - - cmd.Flags().Lookup("attach").NoOptDefVal = "stdout" -} - -func (c *JobSubmit) ValidateAttachFlag(cmd *cobra.Command) bool { - attach := cmd.Flags().Lookup("attach") - - if !attach.Changed { - return true - } - - switch attach.Value.String() { - case "stdout", "stderr": - return true - } - - return false } type JobDelete struct {