Skip to content

Commit

Permalink
Add workflow trace command (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Neira authored Jul 21, 2022
1 parent c6b72cc commit 510ff8f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 2 deletions.
30 changes: 30 additions & 0 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package cli

import (
"fmt"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -215,6 +216,7 @@ var (
FlagTLSServerName = "tls-server-name"
FlagLastMessageID = "last-message-id"
FlagConcurrency = "concurrency"
FlagConcurrencyAlias = []string{"c"}
FlagReportRate = "report-rate"
FlagLowerShardBound = "lower-shard-bound"
FlagUpperShardBound = "upper-shard-bound"
Expand Down Expand Up @@ -248,6 +250,10 @@ var (
FlagPauseOnFailure = "pause-on-failure"
FlagPause = "pause"
FlagUnpause = "unpause"
FlagFold = "fold"
FlagNoFold = "no-fold"
FlagDepth = "depth"
FlagDepthAlias = []string{"d"}

FlagProtoType = "type"
FlagHexData = "hex-data"
Expand Down Expand Up @@ -442,3 +448,27 @@ var flagsForStackTraceQuery = append(flagsForExecution, []cli.Flag{
Usage: "Optional flag to reject queries based on Workflow state. Valid values are \"not_open\" and \"not_completed_cleanly\"",
},
}...)

var flagsForTraceWorkflow = []cli.Flag{
&cli.IntFlag{
Name: FlagDepth,
Aliases: FlagDepthAlias,
Value: -1,
Usage: "Number of child workflows to expand, -1 to expand all child workflows",
},
&cli.IntFlag{
Name: FlagConcurrency,
Aliases: FlagConcurrencyAlias,
Value: 10,
Usage: "Request concurrency",
},
&cli.StringFlag{
Name: FlagFold,
Usage: fmt.Sprintf("Statuses for which child workflows will be folded in (this will reduce the number of information fetched and displayed). Case-insensitive and ignored if --%s supplied", FlagNoFold),
Value: "completed,canceled,terminated",
},
&cli.BoolFlag{
Name: FlagNoFold,
Usage: "Disable folding. All child workflows within the set depth will be fetched and displayed",
},
}
33 changes: 33 additions & 0 deletions cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,3 +761,36 @@ func ensureNonNil[T any, P ~*T](ptr *P) {
*ptr = new(T)
}
}

func listWorkflowExecutionStatusNames() string {
var names []string
for _, name := range enumspb.WorkflowExecutionStatus_name {
names = append(names, strings.ToLower(name))
}
return strings.Join(names, ", ")
}

// findWorkflowStatusValue finds a WorkflowExecutionStatus by its name. This search is case-insensitive.
func findWorkflowStatusValue(name string) (enumspb.WorkflowExecutionStatus, bool) {
lowerName := strings.ToLower(name)
for key, value := range enumspb.WorkflowExecutionStatus_value {
if lowerName == strings.ToLower(key) {
return enumspb.WorkflowExecutionStatus(value), true
}
}

return 0, false
}

func parseFoldStatusList(flagValue string) ([]enumspb.WorkflowExecutionStatus, error) {
var statusList []enumspb.WorkflowExecutionStatus
for _, value := range strings.Split(flagValue, ",") {
if status, ok := findWorkflowStatusValue(value); ok {
statusList = append(statusList, status)
} else {
return nil,
fmt.Errorf("invalid status \"%s\" for fold flag. Valid values: %s", value, listWorkflowExecutionStatusNames())
}
}
return statusList, nil
}
60 changes: 60 additions & 0 deletions cli/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package cli

import (
enumspb "go.temporal.io/api/enums/v1"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -89,3 +90,62 @@ func (s *utilSuite) TestStringToEnum_MapEmptyEnum() {
s.Error(err)
s.Equal(result, int32(0))
}

func (s *utilSuite) TestParseFoldStatusList() {
tests := map[string]struct {
value string
want []enumspb.WorkflowExecutionStatus
wantErr bool
}{
"default values": {
value: "completed,canceled,terminated",
want: []enumspb.WorkflowExecutionStatus{
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
},
},
"no values": {
value: "",
want: nil,
},
"invalid": {
value: "Foobar",
wantErr: true,
},
"title case": {
value: "Running,Completed,Failed,Canceled,Terminated,ContinuedAsNew,TimedOut",
want: []enumspb.WorkflowExecutionStatus{
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
},
},
"upper case": {
value: "RUNNING,COMPLETED,FAILED,CANCELED,TERMINATED,CONTINUEDASNEW,TIMEDOUT",
want: []enumspb.WorkflowExecutionStatus{
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
},
},
}
for name, tt := range tests {
s.Run(name, func() {
got, err := parseFoldStatusList(tt.value)
if tt.wantErr {
s.Error(err)
} else {
s.Equal(tt.want, got)
}
})
}
}
7 changes: 7 additions & 0 deletions cli/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,5 +276,12 @@ func newWorkflowCommands() []*cli.Command {
return ResetInBatch(c)
},
},
{
Name: "trace",
Aliases: []string{"t"},
Usage: "Traces a workflow's execution with progress of its child workflows and activities",
Flags: append(flagsForExecution, flagsForTraceWorkflow...),
Action: TraceWorkflow,
},
}
}
13 changes: 11 additions & 2 deletions cli/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"context"
"encoding/json"
"fmt"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"math/rand"
"os"
"reflect"
Expand All @@ -45,11 +47,9 @@ import (
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/operatorservice/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
clispb "go.temporal.io/server/api/cli/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -1520,6 +1520,15 @@ func listArchivedWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byt
return items, workflows.NextPageToken, nil
}

func TraceWorkflow(c *cli.Context) error {
_, err := parseFoldStatusList(c.String(FlagFold))
if err != nil {
return err
}
fmt.Println("Trace hasn't been implemented yet.")
return nil
}

type eventRow struct {
ID string
Time string
Expand Down

0 comments on commit 510ff8f

Please sign in to comment.