Skip to content

Commit

Permalink
Remove workflow scan command (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Sep 8, 2022
1 parent 000e174 commit 1ec998d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 105 deletions.
8 changes: 0 additions & 8 deletions cli/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ func newWorkflowCommands() []*cli.Command {
return SignalWorkflow(c)
},
},
{
Name: "scan",
Usage: "List Workflow Executions. Faster and unsorted (requires Elasticsearch to be enabled)",
Flags: append(flagsForScan, flags.FlagsForPaginationAndRendering...),
Action: func(c *cli.Context) error {
return ScanAllWorkflow(c)
},
},
{
Name: "count",
Usage: "Count Workflow Executions (requires ElasticSearch to be enabled)",
Expand Down
111 changes: 14 additions & 97 deletions cli/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,6 @@ func queryWorkflowHelper(c *cli.Context, queryType string) error {
func ListWorkflow(c *cli.Context) error {
archived := c.Bool(FlagArchive)

namespace, err := getRequiredGlobalOption(c, FlagNamespace)
if err != nil {
return err
}
sdkClient, err := getSDKClient(c)
if err != nil {
return err
Expand All @@ -571,9 +567,9 @@ func ListWorkflow(c *cli.Context) error {
query := c.String(FlagListQuery)

if archived {
items, npt, err = listArchivedWorkflows(c, sdkClient, npt, namespace, query)
items, npt, err = listArchivedWorkflows(c, sdkClient, npt, query)
} else {
items, npt, err = listWorkflows(c, sdkClient, npt, namespace, query)
items, npt, err = listWorkflows(c, sdkClient, npt, query)
}

if err != nil {
Expand All @@ -592,62 +588,6 @@ func ListWorkflow(c *cli.Context) error {
return output.PrintIterator(c, iter, opts)
}

// ScanAllWorkflow list all workflow executions using Scan API.
func ScanAllWorkflow(c *cli.Context) error {
namespace, err := getRequiredGlobalOption(c, FlagNamespace)
if err != nil {
return err
}
listQuery := c.String(FlagListQuery)
sdkClient, err := getSDKClient(c)
if err != nil {
return err
}

paginationFunc := func(npt []byte) ([]interface{}, []byte, error) {
req := &workflowservice.ScanWorkflowExecutionsRequest{
Namespace: namespace,
NextPageToken: npt,
Query: listQuery,
}

var workflows *workflowservice.ScanWorkflowExecutionsResponse
op := func() error {
ctx, cancel := newContext(c)
defer cancel()
response, err := sdkClient.ScanWorkflow(ctx, req)
if err != nil {
return err
}
workflows = response
return nil
}
err := backoff.ThrottleRetry(op, common.CreateFrontendClientRetryPolicy(), common.IsContextDeadlineExceededErr)
if err != nil {
return nil, nil, fmt.Errorf("unable to list workflow executions: %s", err)
}

var items []interface{}
for _, e := range workflows.Executions {
items = append(items, e)
}
if err != nil {
return nil, nil, err
}

return items, workflows.NextPageToken, nil
}

iter := collection.NewPagingIterator(paginationFunc)
opts := &output.PrintOptions{
Fields: []string{"Execution.WorkflowId", "Execution.RunId", "StartTime"},
FieldsLong: []string{"Type.Name", "TaskQueue", "ExecutionTime", "CloseTime"},
Pager: pager.Less,
}

return output.PrintIterator(c, iter, opts)
}

// CountWorkflow count number of workflows
func CountWorkflow(c *cli.Context) error {
sdkClient, err := getSDKClient(c)
Expand Down Expand Up @@ -848,32 +788,6 @@ func printRunStatus(c *cli.Context, event *historypb.HistoryEvent) {
}
}

func scanWorkflowExecutions(sdkClient sdkclient.Client, pageSize int, nextPageToken []byte, query string, c *cli.Context) ([]*workflowpb.WorkflowExecutionInfo, []byte, error) {
request := &workflowservice.ScanWorkflowExecutionsRequest{
PageSize: int32(pageSize),
NextPageToken: nextPageToken,
Query: query,
}

var workflows *workflowservice.ScanWorkflowExecutionsResponse
op := func() error {
ctx, cancel := newContext(c)
defer cancel()
response, err := sdkClient.ScanWorkflow(ctx, request)
if err != nil {
return err
}
workflows = response
return nil
}
err := backoff.ThrottleRetry(op, common.CreateFrontendClientRetryPolicy(), common.IsContextDeadlineExceededErr)
if err != nil {
return nil, nil, fmt.Errorf("failed to list workflow: %s", err)
}

return workflows.Executions, workflows.NextPageToken, nil
}

// ShowHistory shows the history of given workflow execution based on workflowID and runID.
func ShowHistory(c *cli.Context) error {
wid := c.String(FlagWorkflowID)
Expand Down Expand Up @@ -1102,18 +1016,23 @@ func ResetInBatch(c *cli.Context) error {
return err
}

pageSize := 1000
var nextPageToken []byte
var result []*workflowpb.WorkflowExecutionInfo
var result []any
for {
result, nextPageToken, err = scanWorkflowExecutions(sdkClient, pageSize, nextPageToken, query, c)
result, nextPageToken, err = listWorkflows(c, sdkClient, nextPageToken, query)
if err != nil {
return err
}
for _, we := range result {
for _, resultItem := range result {
we, ok := resultItem.(*workflowpb.WorkflowExecutionInfo)
if !ok {
fmt.Printf("skip by wrong type:%T instead of:%T\n", resultItem, &workflowpb.WorkflowExecutionInfo{})
continue
}

wid := we.Execution.GetWorkflowId()
rid := we.Execution.GetRunId()
_, ok := excludes[wid]
_, ok = excludes[wid]
if ok {
fmt.Println("skip by exclude file: ", wid, rid)
continue
Expand Down Expand Up @@ -1455,9 +1374,8 @@ func getLastContinueAsNewID(ctx context.Context, namespace, wid, rid string, fro
return
}

func listWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byte, namespace string, query string) ([]interface{}, []byte, error) {
func listWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byte, query string) ([]interface{}, []byte, error) {
req := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
NextPageToken: npt,
Query: query,
}
Expand Down Expand Up @@ -1486,9 +1404,8 @@ func listWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byte, names
return items, workflows.NextPageToken, nil
}

func listArchivedWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byte, namespace string, query string) ([]interface{}, []byte, error) {
func listArchivedWorkflows(c *cli.Context, sdkClient sdkclient.Client, npt []byte, query string) ([]interface{}, []byte, error) {
req := &workflowservice.ListArchivedWorkflowExecutionsRequest{
Namespace: namespace,
NextPageToken: npt,
Query: query,
}
Expand Down

0 comments on commit 1ec998d

Please sign in to comment.