Skip to content

Commit

Permalink
Improve job submit --attach and bug fix (#131)
Browse files Browse the repository at this point in the history
* Stream both stdout and stderr on job submit attach

* Bug fix in signadot logs
  • Loading branch information
daniel-de-vera authored Jun 17, 2024
1 parent 3cb2119 commit 9cb16bd
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 101 deletions.
116 changes: 63 additions & 53 deletions internal/command/jobs/printers.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package jobs

import (
"errors"
"fmt"
"github.com/signadot/cli/internal/command/logs"
"github.com/signadot/cli/internal/poll"
"github.com/signadot/go-sdk/client/jobs"
"golang.org/x/net/context"
"io"
"os"
"sort"
"text/tabwriter"
"time"

"github.com/signadot/cli/internal/command/logs"
"github.com/signadot/cli/internal/poll"
"golang.org/x/net/context"

"github.com/signadot/cli/internal/config"
"github.com/signadot/cli/internal/sdtab"
"github.com/signadot/go-sdk/client/artifacts"
"github.com/signadot/go-sdk/models"
"github.com/signadot/go-sdk/utils"
"github.com/xeonx/timeago"
)

Expand Down Expand Up @@ -129,32 +130,33 @@ func printJobDetails(cfg *config.JobGet, out io.Writer, job *models.Job) error {
return nil
}

func waitForJob(ctx context.Context, cfg *config.JobSubmit, out io.Writer, jobName string) error {

fmt.Fprintf(out, "Waiting for job execution\n")

looped := false

func waitForJob(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer, jobName string) error {
delayTime := 2 * time.Second

retry := poll.
NewPoll().
WithDelay(delayTime).
WithTimeout(cfg.Timeout)

lastCursor := ""
lastOutCursor := ""
lastErrCursor := ""
looped := false

err := retry.Until(func() bool {
defer func() {
looped = true
}()

j, err := getJob(cfg.Job, jobName)
if err != nil {
fmt.Fprintf(out, "Error getting job: %s", err.Error())
fmt.Fprintf(errW, "Error getting job: %s", err.Error())

// We want to keep retrying if the timeout has not been exceeded
return false
}

// Increases the time, so if the queue is empty will be likely to start seeing the logs right away without
// any bigger delay
// Increases the time, so if the queue is empty will be likely to start
// seeing the logs right away without any bigger delay
if delayTime < MaxTimeBetweenRefresh {
delayTime = (1 * time.Second) + delayTime
retry.WithDelay(delayTime)
Expand All @@ -164,55 +166,87 @@ func waitForJob(ctx context.Context, cfg *config.JobSubmit, out io.Writer, jobNa
switch attempt.Phase {
case "succeed":
return true

case "failed":
handleFailedJobPhase(out, j)
handleFailedJobPhase(errW, j)
return true

case "queued":
if looped {
fmt.Fprintf(out, "\033[1A\033[K")
clearLastLine(outW)
}

fmt.Fprintf(out, "Queued on Job Runner Group %s\n", j.Spec.RunnerGroup)
fmt.Fprintf(outW, "Queued on Job Runner Group %s\n", j.Spec.RunnerGroup)
return false

case "running":
if looped {
fmt.Fprintf(out, "\033[1A\033[K")
clearLastLine(outW)
}

newCursor, err := logs.ShowLogs(ctx, cfg.API, out, jobName, cfg.Attach, lastCursor, 0)
errch := make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
// stream stdout
cursor, err := logs.ShowLogs(ctx, cfg.API, outW, jobName, utils.LogTypeStdout, lastOutCursor, 0)
if err == nil {
lastOutCursor = cursor
} else if errors.Is(err, context.Canceled) {
err = nil // ignore context cancelations
}
cancel() // this will cause the stderr stream to terminate
errch <- err
}()

go func() {
// stream stderr
cursor, err := logs.ShowLogs(ctx, cfg.API, errW, jobName, utils.LogTypeStderr, lastErrCursor, 0)
if err == nil {
lastErrCursor = cursor
} else if errors.Is(err, context.Canceled) {
err = nil // ignore context cancelations
}
cancel() // this will make the stdout stream to terminate
errch <- err
}()

err = errors.Join(<-errch, <-errch) // wait until both streams terminate
if err != nil {
fmt.Fprintf(out, "Error getting logs: %s\n", err.Error())
} else {
lastCursor = newCursor
fmt.Fprintf(errW, "Error getting logs: %s\n", err.Error())
}

if j, err = getJob(cfg.Job, jobName); err == nil {
switch j.Status.Attempts[0].Phase {
case "failed":
handleFailedJobPhase(out, j)
handleFailedJobPhase(errW, j)
return true
case "succeeded":
return true
}
}
return false

case "canceled":
fmt.Fprintf(out, "Stopping cause job execution was canceled\n")
fmt.Fprintf(outW, "The job execution was canceled\n")
return true
}

looped = true

return false
})

return err
}

func handleFailedJobPhase(out io.Writer, job *models.Job) {
func clearLastLine(w io.Writer) {
fmt.Fprintf(w, "\033[1A\033[K")
}

func handleFailedJobPhase(errW io.Writer, job *models.Job) {
failedStatus := job.Status.Attempts[0].State.Failed
if failedStatus.Message != "" {
fmt.Fprintf(out, "Error: %s\n", failedStatus.Message)
fmt.Fprintf(errW, "Error: %s\n", failedStatus.Message)
}

exitCode := 1
Expand All @@ -223,16 +257,6 @@ func handleFailedJobPhase(out io.Writer, job *models.Job) {
os.Exit(exitCode)
}

func getJob(cfg *config.Job, jobName string) (*models.Job, error) {
params := jobs.NewGetJobParams().WithOrgName(cfg.Org).WithJobName(jobName)
resp, err := cfg.Client.Jobs.GetJob(params, nil)
if err != nil {
return nil, err
}

return resp.Payload, nil
}

func getCreatedAt(job *models.Job) string {
createdAt := job.CreatedAt
if len(createdAt) == 0 {
Expand All @@ -247,20 +271,6 @@ func getCreatedAt(job *models.Job) string {
return timeago.NoMax(timeago.English).Format(t)
}

func getArtifacts(cfg *config.JobGet, job *models.Job) ([]*models.JobArtifact, error) {
params := artifacts.NewListJobAttemptArtifactsParams().
WithOrgName(cfg.Org).
WithJobAttempt(job.Status.Attempts[0].ID).
WithJobName(job.Name)

resp, err := cfg.Client.Artifacts.ListJobAttemptArtifacts(params, nil)
if err != nil {
return []*models.JobArtifact{}, nil
}

return resp.Payload, nil
}

type jobArtifactRow struct {
Path string `sdtab:"PATH"`
Size string `sdtab:"SIZE"`
Expand Down
38 changes: 15 additions & 23 deletions internal/command/jobs/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,32 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/signadot/cli/internal/config"
"github.com/signadot/cli/internal/print"
"github.com/signadot/go-sdk/client/jobs"
"github.com/signadot/go-sdk/models"
"github.com/spf13/cobra"
"io"
)

func newSubmit(job *config.Job) *cobra.Command {
cfg := &config.JobSubmit{Job: job}

cmd := &cobra.Command{
Use: "submit -f FILENAME [ --set var1=val1 --set var2=val2 ... ]",
Short: "Create or update a job with variable expansion",
Short: "Submit a job with variable expansion",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if !cfg.ValidateAttachFlag(cmd) {
return fmt.Errorf("value not valid for --attach")
}

return submit(cmd.Context(), cfg, cmd.OutOrStdout(), cmd.ErrOrStderr(), args)
return submit(cmd.Context(), cfg, cmd.OutOrStdout(), cmd.ErrOrStderr())
},
}

cfg.AddFlags(cmd)

return cmd
}

func submit(ctx context.Context, cfg *config.JobSubmit, out, log io.Writer, args []string) error {
func submit(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer) error {
if err := cfg.InitAPIConfig(); err != nil {
return err
}
Expand All @@ -53,29 +49,25 @@ func submit(ctx context.Context, cfg *config.JobSubmit, out, log io.Writer, args
}
resp := result.Payload

fmt.Fprintf(log, "Job %s queued on Job Runner Group: %s\n", resp.Name, resp.Spec.RunnerGroup)

return writeOutput(ctx, cfg, out, resp)
return writeOutput(ctx, cfg, outW, errW, resp)
}

func writeOutput(ctx context.Context, cfg *config.JobSubmit, out io.Writer, resp *models.Job) error {
func writeOutput(ctx context.Context, cfg *config.JobSubmit, outW, errW io.Writer, resp *models.Job) error {
switch cfg.OutputFormat {
case config.OutputFormatDefault:
// Print info on how to access the job.
fmt.Fprintf(out, "\nDashboard page: %v\n\n", cfg.JobDashboardUrl(resp.Name))
fmt.Fprintf(outW, "Job %s queued on Job Runner Group: %s\n", resp.Name, resp.Spec.RunnerGroup)
fmt.Fprintf(outW, "\nDashboard page: %v\n\n", cfg.JobDashboardUrl(resp.Name))

switch cfg.Attach {
case "stdout", "stderr":
if err := waitForJob(ctx, cfg, out, resp.Name); err != nil {
return err
}
var err error
if cfg.Attach {
err = waitForJob(ctx, cfg, outW, errW, resp.Name)
}

return nil
return err
case config.OutputFormatJSON:
return print.RawJSON(out, resp)
return print.RawJSON(outW, resp)
case config.OutputFormatYAML:
return print.RawYAML(out, resp)
return print.RawYAML(outW, resp)
default:
return fmt.Errorf("unsupported output format: %q", cfg.OutputFormat)
}
Expand Down
30 changes: 29 additions & 1 deletion internal/command/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,39 @@ package jobs

import (
"fmt"
"time"

"github.com/signadot/cli/internal/config"
"github.com/signadot/go-sdk/client/artifacts"
"github.com/signadot/go-sdk/client/jobs"
"github.com/signadot/go-sdk/models"
"github.com/xeonx/timeago"
"time"
)

func getJob(cfg *config.Job, jobName string) (*models.Job, error) {
params := jobs.NewGetJobParams().WithOrgName(cfg.Org).WithJobName(jobName)
resp, err := cfg.Client.Jobs.GetJob(params, nil)
if err != nil {
return nil, err
}

return resp.Payload, nil
}

func getArtifacts(cfg *config.JobGet, job *models.Job) ([]*models.JobArtifact, error) {
params := artifacts.NewListJobAttemptArtifactsParams().
WithOrgName(cfg.Org).
WithJobAttempt(job.Status.Attempts[0].ID).
WithJobName(job.Name)

resp, err := cfg.Client.Artifacts.ListJobAttemptArtifacts(params, nil)
if err != nil {
return []*models.JobArtifact{}, nil
}

return resp.Payload, nil
}

func isJobPhaseToPrintDefault(ph string) bool {
if ph == "failed" {
return false
Expand Down
21 changes: 19 additions & 2 deletions internal/command/logs/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/signadot/cli/internal/config"
"github.com/signadot/go-sdk/client"
"github.com/signadot/go-sdk/client/job_logs"
"github.com/signadot/go-sdk/utils"
"github.com/spf13/cobra"
)

Expand All @@ -22,15 +23,31 @@ func New(api *config.API) *cobra.Command {
Short: "Display job logs",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
_, err := ShowLogs(cmd.Context(), cfg.API, cmd.OutOrStdout(), cfg.Job, cfg.Stream, "", int(cfg.TailLines))
return err
return showLogs(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(), cfg)
},
}

cfg.AddFlags(cmd)
return cmd
}

func showLogs(ctx context.Context, outW, errW io.Writer, cfg *config.Logs) error {
if err := cfg.InitAPIConfig(); err != nil {
return err
}

var w io.Writer
switch cfg.Stream {
case utils.LogTypeStderr:
w = errW
default:
w = outW
}

_, err := ShowLogs(ctx, cfg.API, w, cfg.Job, cfg.Stream, "", int(cfg.TailLines))
return err
}

type event struct {
Event string `sse:"event"`
Data string `sse:"data"`
Expand Down
Loading

0 comments on commit 9cb16bd

Please sign in to comment.