From 8c67dd115e19ced5007c1e2acbb9956440c51947 Mon Sep 17 00:00:00 2001 From: Ruslan <11838981+feedmeapples@users.noreply.github.com> Date: Mon, 31 Oct 2022 13:52:19 -0400 Subject: [PATCH] Add batch-API commands to tctl v1 (#328) --- cli_curr/app.go | 5 + cli_curr/batchv2.go | 76 +++++++++++++ cli_curr/batchv2_commands.go | 209 +++++++++++++++++++++++++++++++++++ cli_curr/workflow.go | 53 ++++++++- cli_curr/workflowCommands.go | 24 ++++ 5 files changed, 361 insertions(+), 6 deletions(-) create mode 100644 cli_curr/batchv2.go create mode 100644 cli_curr/batchv2_commands.go diff --git a/cli_curr/app.go b/cli_curr/app.go index 5bf6f50..82fcb8c 100644 --- a/cli_curr/app.go +++ b/cli_curr/app.go @@ -165,6 +165,11 @@ func NewCliApp() *cli.App { Usage: "Batch operation on a list of workflows from query", Subcommands: newBatchCommands(), }, + { + Name: "batch-v2", + Usage: "Batch operation on a list of workflows from query", + Subcommands: newBatchV2Commands(), + }, { Name: "admin", Aliases: []string{"adm"}, diff --git a/cli_curr/batchv2.go b/cli_curr/batchv2.go new file mode 100644 index 0000000..634bd0b --- /dev/null +++ b/cli_curr/batchv2.go @@ -0,0 +1,76 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cli_curr + +import ( + "github.com/urfave/cli" +) + +func newBatchV2Commands() []cli.Command { + return []cli.Command{ + { + Name: "describe", + Usage: "Describe a batch operation job", + Flags: append([]cli.Flag{ + cli.StringFlag{ + Name: FlagJobID, + Usage: "Batch Job Id", + Required: true, + }, + }), + Action: func(c *cli.Context) { + DescribeBatchJobV2(c) + }, + }, + { + Name: "list", + Usage: "List batch operation jobs", + Flags: []cli.Flag{}, + ArgsUsage: " ", + Action: func(c *cli.Context) { + ListBatchJobsV2(c) + }, + }, + { + Name: "terminate", + Usage: "Stop a batch operation job", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: FlagJobID, + Usage: "Batch Job Id", + Required: true, + }, + cli.StringFlag{ + Name: FlagReason, + Usage: "Reason to stop the batch job", + Required: true, + }, + }, + Action: func(c *cli.Context) { + StopBatchJobV2(c) + }, + }, + } +} diff --git a/cli_curr/batchv2_commands.go b/cli_curr/batchv2_commands.go new file mode 100644 index 0000000..17b1eff --- /dev/null +++ b/cli_curr/batchv2_commands.go @@ -0,0 +1,209 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cli_curr + +import ( + "fmt" + + "github.com/fatih/color" + "github.com/pborman/uuid" + "github.com/temporalio/tctl-kit/pkg/output" + "github.com/temporalio/tctl-kit/pkg/pager" + "github.com/urfave/cli" + "go.temporal.io/api/batch/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/payloads" +) + +// DescribeBatchJobV2 describe the status of the batch job +func DescribeBatchJobV2(c *cli.Context) { + namespace := getRequiredGlobalOption(c, FlagNamespace) + jobID := c.String(FlagJobID) + + client := cFactory.FrontendClient(c) + ctx, cancel := newContext(c) + defer cancel() + resp, err := client.DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + Namespace: namespace, + JobId: jobID, + }) + if err != nil { + ErrorAndExit("unable to describe batch job", err) + } + + opts := &output.PrintOptions{ + OutputFormat: output.JSON, + } + output.PrintItems(nil, []interface{}{resp}, opts) +} + +// ListBatchJobs list the started batch jobs +func ListBatchJobsV2(c *cli.Context) { + namespace := getRequiredGlobalOption(c, FlagNamespace) + client := cFactory.FrontendClient(c) + + paginationFunc := func(npt []byte) ([]interface{}, []byte, error) { + var items []interface{} + var err error + + ctx, cancel := newContext(c) + defer cancel() + resp, err := client.ListBatchOperations(ctx, &workflowservice.ListBatchOperationsRequest{ + Namespace: namespace, + }) + + for _, e := range resp.OperationInfo { + items = append(items, e) + } + + if err != nil { + return nil, nil, err + } + + return items, npt, nil + } + + iter := collection.NewPagingIterator(paginationFunc) + opts := &output.PrintOptions{ + Fields: []string{"State", "JobId", "StartTime", "CloseTime"}, + Pager: pager.Less, + } + output.PrintIterator(nil, iter, opts) +} + +// BatchTerminateV2 terminate a list of workflows +func BatchTerminateV2(c *cli.Context) { + operator := getCurrentUserFromEnv() + + req := workflowservice.StartBatchOperationRequest{ + Operation: &workflowservice.StartBatchOperationRequest_TerminationOperation{ + TerminationOperation: &batch.BatchOperationTermination{ + Identity: operator, + }, + }, + } + + startBatchJob(c, &req) +} + +// BatchCancelV2 cancel a list of workflows +func BatchCancelV2(c *cli.Context) { + operator := getCurrentUserFromEnv() + + req := workflowservice.StartBatchOperationRequest{ + Operation: &workflowservice.StartBatchOperationRequest_CancellationOperation{ + CancellationOperation: &batch.BatchOperationCancellation{ + Identity: operator, + }, + }, + } + + startBatchJob(c, &req) +} + +// BatchSignalV2 send a signal to a list of workflows +func BatchSignalV2(c *cli.Context) { + signalName := c.String(FlagName) + input := c.String(FlagInput) + operator := getCurrentUserFromEnv() + + inputP, err := payloads.Encode(input) + if err != nil { + ErrorAndExit("unable to serialize signal input", err) + } + + req := workflowservice.StartBatchOperationRequest{ + Operation: &workflowservice.StartBatchOperationRequest_SignalOperation{ + SignalOperation: &batch.BatchOperationSignal{ + Signal: signalName, + Identity: operator, + Input: inputP, + }, + }, + } + + startBatchJob(c, &req) +} + +// startBatchJob starts a batch job +func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationRequest) { + namespace := getRequiredGlobalOption(c, FlagNamespace) + query := c.String(FlagListQuery) + reason := c.String(FlagReason) + + sdk := cFactory.SDKClient(c, namespace) + tcCtx, cancel := newContext(c) + defer cancel() + count, err := sdk.CountWorkflow(tcCtx, &workflowservice.CountWorkflowExecutionsRequest{ + Namespace: namespace, + Query: query, + }) + if err != nil { + ErrorAndExit("unable to count impacted workflows", err) + } + + msg := fmt.Sprintf("Will start a batch job operating on %v Workflow Executions. Continue? Y/N", count.GetCount()) + prompt(msg, c.Bool(FlagYes)) + + jobID := uuid.New() + req.JobId = jobID + req.Namespace = namespace + req.VisibilityQuery = query + req.Reason = reason + + client := cFactory.FrontendClient(c) + ctx, cancel := newContext(c) + defer cancel() + _, err = client.StartBatchOperation(ctx, req) + if err != nil { + ErrorAndExit("unable to start batch job", err) + } + + fmt.Printf("Batch job %s is started\n", color.MagentaString(jobID)) +} + +// StopBatchJobV2 stops a batch job +func StopBatchJobV2(c *cli.Context) { + namespace := getRequiredGlobalOption(c, FlagNamespace) + jobID := c.String(FlagJobID) + reason := c.String(FlagReason) + client := cFactory.FrontendClient(c) + + ctx, cancel := newContext(c) + defer cancel() + _, err := client.StopBatchOperation(ctx, &workflowservice.StopBatchOperationRequest{ + Namespace: namespace, + JobId: jobID, + Reason: reason, + Identity: getCurrentUserFromEnv(), + }) + + if err != nil { + ErrorAndExit("unable to stop a batch job", err) + } + + fmt.Printf("Batch job %s is stopped\n", color.MagentaString(jobID)) +} diff --git a/cli_curr/workflow.go b/cli_curr/workflow.go index 9d1117e..5a8cc33 100644 --- a/cli_curr/workflow.go +++ b/cli_curr/workflow.go @@ -69,7 +69,28 @@ func newWorkflowCommands() []cli.Command { Name: "cancel", Aliases: []string{"c"}, Usage: "cancel a workflow execution", - Flags: flagsForExecution, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: FlagWorkflowIDWithAlias, + Usage: "Cancel Workflow Execution by Id", + }, + cli.StringFlag{ + Name: FlagRunIDWithAlias, + Usage: "Run Id", + }, + cli.StringFlag{ + Name: FlagListQuery, + Usage: "Cancel Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/", + }, + cli.StringFlag{ + Name: FlagReason, + Usage: "Reason for canceling with List Filter", + }, + cli.BoolFlag{ + Name: FlagYes, + Usage: "Confirm all prompts", + }, + }, Action: func(c *cli.Context) { CancelWorkflow(c) }, @@ -81,11 +102,15 @@ func newWorkflowCommands() []cli.Command { Flags: []cli.Flag{ cli.StringFlag{ Name: FlagWorkflowIDWithAlias, - Usage: "WorkflowId", + Usage: "Signal Workflow Execution by Id", }, cli.StringFlag{ Name: FlagRunIDWithAlias, - Usage: "RunId", + Usage: "Run Id", + }, + cli.StringFlag{ + Name: FlagListQuery, + Usage: "Signal Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/", }, cli.StringFlag{ Name: FlagNameWithAlias, @@ -99,6 +124,14 @@ func newWorkflowCommands() []cli.Command { Name: FlagInputFileWithAlias, Usage: "Input for the signal from JSON file.", }, + cli.StringFlag{ + Name: FlagReason, + Usage: "Reason for signaling with List Filter", + }, + cli.BoolFlag{ + Name: FlagYes, + Usage: "Confirm all prompts", + }, }, Action: func(c *cli.Context) { SignalWorkflow(c) @@ -111,15 +144,23 @@ func newWorkflowCommands() []cli.Command { Flags: []cli.Flag{ cli.StringFlag{ Name: FlagWorkflowIDWithAlias, - Usage: "WorkflowId", + Usage: "Terminate Workflow Execution by Id", }, cli.StringFlag{ Name: FlagRunIDWithAlias, - Usage: "RunId", + Usage: "Run Id", + }, + cli.StringFlag{ + Name: FlagListQuery, + Usage: "Terminate Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/", }, cli.StringFlag{ Name: FlagReasonWithAlias, - Usage: "The reason you want to terminate the workflow", + Usage: "Reason for termination", + }, + cli.BoolFlag{ + Name: FlagYes, + Usage: "Confirm all prompts", }, }, Action: func(c *cli.Context) { diff --git a/cli_curr/workflowCommands.go b/cli_curr/workflowCommands.go index 8a04263..87078e5 100644 --- a/cli_curr/workflowCommands.go +++ b/cli_curr/workflowCommands.go @@ -467,6 +467,14 @@ func printWorkflowProgress(c *cli.Context, wid, rid string) { // TerminateWorkflow terminates a workflow execution func TerminateWorkflow(c *cli.Context) { + if c.String(FlagListQuery) != "" { + BatchTerminateV2(c) + } else { + terminateWorkflow(c) + } +} + +func terminateWorkflow(c *cli.Context) { sdkClient := getSDKClient(c) wid := getRequiredOption(c, FlagWorkflowID) @@ -486,6 +494,14 @@ func TerminateWorkflow(c *cli.Context) { // CancelWorkflow cancels a workflow execution func CancelWorkflow(c *cli.Context) { + if c.String(FlagListQuery) != "" { + BatchCancelV2(c) + } else { + cancelWorkflow(c) + } +} + +func cancelWorkflow(c *cli.Context) { sdkClient := getSDKClient(c) wid := getRequiredOption(c, FlagWorkflowID) @@ -504,6 +520,14 @@ func CancelWorkflow(c *cli.Context) { // SignalWorkflow signals a workflow execution func SignalWorkflow(c *cli.Context) { + if c.String(FlagListQuery) != "" { + BatchSignalV2(c) + } else { + signalWorkflow(c) + } +} + +func signalWorkflow(c *cli.Context) { serviceClient := cFactory.FrontendClient(c) namespace := getRequiredGlobalOption(c, FlagNamespace)