From 9f93c4493f0830a7929817c39495ec4e4dded52d Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 8 Sep 2022 17:02:25 -0700 Subject: [PATCH] Move signal workflow to api package (#3348) --- service/history/api/signalworkflow/api.go | 126 ++++++++++++++++++++++ service/history/handler.go | 4 +- service/history/historyEngine.go | 85 +-------------- service/history/historyEngine_test.go | 20 ++-- service/history/shard/engine.go | 4 +- service/history/shard/engine_mock.go | 7 +- 6 files changed, 148 insertions(+), 98 deletions(-) create mode 100644 service/history/api/signalworkflow/api.go diff --git a/service/history/api/signalworkflow/api.go b/service/history/api/signalworkflow/api.go new file mode 100644 index 00000000000..b589f34fe1b --- /dev/null +++ b/service/history/api/signalworkflow/api.go @@ -0,0 +1,126 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package signalworkflow + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/shard" +) + +func Invoke( + ctx context.Context, + req *historyservice.SignalWorkflowExecutionRequest, + shard shard.Context, + workflowConsistencyChecker api.WorkflowConsistencyChecker, +) (resp *historyservice.SignalWorkflowExecutionResponse, retError error) { + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + namespaceID := namespaceEntry.ID() + + request := req.SignalRequest + parentExecution := req.ExternalWorkflowExecution + childWorkflowOnly := req.GetChildWorkflowOnly() + + err = api.GetAndUpdateWorkflowWithNew( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + namespaceID.String(), + request.WorkflowExecution.WorkflowId, + request.WorkflowExecution.RunId, + ), + func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) { + mutableState := workflowContext.GetMutableState() + if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) { + return &api.UpdateWorkflowAction{ + Noop: true, + CreateWorkflowTask: false, + }, nil + } + + if !mutableState.IsWorkflowExecutionRunning() { + return nil, consts.ErrWorkflowCompleted + } + + executionInfo := mutableState.GetExecutionInfo() + createWorkflowTask := true + if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() { + // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet + createWorkflowTask = false + } + + if err := api.ValidateSignal( + ctx, + shard, + mutableState, + request.GetInput().Size(), + "SignalWorkflowExecution", + ); err != nil { + return nil, err + } + + if childWorkflowOnly { + parentWorkflowID := executionInfo.ParentWorkflowId + parentRunID := executionInfo.ParentRunId + if parentExecution.GetWorkflowId() != parentWorkflowID || + parentExecution.GetRunId() != parentRunID { + return nil, consts.ErrWorkflowParent + } + } + + if request.GetRequestId() != "" { + mutableState.AddSignalRequested(request.GetRequestId()) + } + if _, err := mutableState.AddWorkflowExecutionSignaled( + request.GetSignalName(), + request.GetInput(), + request.GetIdentity(), + request.GetHeader()); err != nil { + return nil, err + } + + return &api.UpdateWorkflowAction{ + Noop: false, + CreateWorkflowTask: createWorkflowTask, + }, nil + }, + nil, + shard, + workflowConsistencyChecker, + ) + if err != nil { + return nil, err + } + return &historyservice.SignalWorkflowExecutionResponse{}, nil +} diff --git a/service/history/handler.go b/service/history/handler.go index 6cb00f02862..db943f1de39 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -862,12 +862,12 @@ func (h *Handler) SignalWorkflowExecution(ctx context.Context, request *historys return nil, h.convertError(err) } - err2 := engine.SignalWorkflowExecution(ctx, request) + resp, err2 := engine.SignalWorkflowExecution(ctx, request) if err2 != nil { return nil, h.convertError(err2) } - return &historyservice.SignalWorkflowExecutionResponse{}, nil + return resp, nil } // SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution. diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index f341ab88e90..90b4758aa21 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -72,6 +72,7 @@ import ( "go.temporal.io/server/service/history/api/respondactivitytaskcompleted" "go.temporal.io/server/service/history/api/respondactivitytaskfailed" "go.temporal.io/server/service/history/api/signalwithstartworkflow" + "go.temporal.io/server/service/history/api/signalworkflow" "go.temporal.io/server/service/history/api/startworkflow" "go.temporal.io/server/service/history/api/terminateworkflow" "go.temporal.io/server/service/history/configs" @@ -1149,87 +1150,9 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( func (e *historyEngineImpl) SignalWorkflowExecution( ctx context.Context, - signalRequest *historyservice.SignalWorkflowExecutionRequest, -) error { - - namespaceEntry, err := e.getActiveNamespaceEntry(namespace.ID(signalRequest.GetNamespaceId())) - if err != nil { - return err - } - namespaceID := namespaceEntry.ID() - - request := signalRequest.SignalRequest - parentExecution := signalRequest.ExternalWorkflowExecution - childWorkflowOnly := signalRequest.GetChildWorkflowOnly() - - return api.GetAndUpdateWorkflowWithNew( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - namespaceID.String(), - request.WorkflowExecution.WorkflowId, - request.WorkflowExecution.RunId, - ), - func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) { - mutableState := workflowContext.GetMutableState() - if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) { - return &api.UpdateWorkflowAction{ - Noop: true, - CreateWorkflowTask: false, - }, nil - } - - if !mutableState.IsWorkflowExecutionRunning() { - return nil, consts.ErrWorkflowCompleted - } - - executionInfo := mutableState.GetExecutionInfo() - createWorkflowTask := true - if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() { - // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet - createWorkflowTask = false - } - - if err := api.ValidateSignal( - ctx, - e.shard, - mutableState, - request.GetInput().Size(), - "SignalWorkflowExecution", - ); err != nil { - return nil, err - } - - if childWorkflowOnly { - parentWorkflowID := executionInfo.ParentWorkflowId - parentRunID := executionInfo.ParentRunId - if parentExecution.GetWorkflowId() != parentWorkflowID || - parentExecution.GetRunId() != parentRunID { - return nil, consts.ErrWorkflowParent - } - } - - if request.GetRequestId() != "" { - mutableState.AddSignalRequested(request.GetRequestId()) - } - if _, err := mutableState.AddWorkflowExecutionSignaled( - request.GetSignalName(), - request.GetInput(), - request.GetIdentity(), - request.GetHeader()); err != nil { - return nil, err - } - - return &api.UpdateWorkflowAction{ - Noop: false, - CreateWorkflowTask: createWorkflowTask, - }, nil - }, - nil, - e.shard, - e.workflowConsistencyChecker, - ) + req *historyservice.SignalWorkflowExecutionRequest, +) (resp *historyservice.SignalWorkflowExecutionResponse, retError error) { + return signalworkflow.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker) } // SignalWithStartWorkflowExecution signals current workflow (if running) or creates & signals a new workflow diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 8029701fd84..f0c93982c27 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -4713,7 +4713,7 @@ func (s *engineSuite) TestCancelTimer_RespondWorkflowTaskCompleted_TimerFired() func (s *engineSuite) TestSignalWorkflowExecution() { signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "Missing namespace UUID.") we := commonpb.WorkflowExecution{ @@ -4746,14 +4746,14 @@ func (s *engineSuite) TestSignalWorkflowExecution() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.Nil(err) } // Test signal workflow task by adding request ID func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() { signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "Missing namespace UUID.") we := commonpb.WorkflowExecution{ @@ -4789,14 +4789,14 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) - err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.Nil(err) } // Test signal workflow task by dedup request ID & workflow finished func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() { signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "Missing namespace UUID.") we := commonpb.WorkflowExecution{ @@ -4833,13 +4833,13 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) - err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.Nil(err) } func (s *engineSuite) TestSignalWorkflowExecution_Failed() { signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "Missing namespace UUID.") we := &commonpb.WorkflowExecution{ @@ -4871,13 +4871,13 @@ func (s *engineSuite) TestSignalWorkflowExecution_Failed() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) - err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "workflow execution already completed") } func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() { signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.EqualError(err, "Missing namespace UUID.") we := commonpb.WorkflowExecution{ @@ -4935,7 +4935,7 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() { return tests.UpdateWorkflowExecutionResponse, nil }) - err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + _, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.Nil(err) } diff --git a/service/history/shard/engine.go b/service/history/shard/engine.go index 52e61155938..4682194ea5b 100644 --- a/service/history/shard/engine.go +++ b/service/history/shard/engine.go @@ -60,8 +60,8 @@ type ( RespondActivityTaskFailed(ctx context.Context, request *historyservice.RespondActivityTaskFailedRequest) (*historyservice.RespondActivityTaskFailedResponse, error) RespondActivityTaskCanceled(ctx context.Context, request *historyservice.RespondActivityTaskCanceledRequest) (*historyservice.RespondActivityTaskCanceledResponse, error) RecordActivityTaskHeartbeat(ctx context.Context, request *historyservice.RecordActivityTaskHeartbeatRequest) (*historyservice.RecordActivityTaskHeartbeatResponse, error) - RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (resp *historyservice.RequestCancelWorkflowExecutionResponse, retError error) - SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) error + RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (*historyservice.RequestCancelWorkflowExecutionResponse, error) + SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error) SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error) RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) error TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error) diff --git a/service/history/shard/engine_mock.go b/service/history/shard/engine_mock.go index a3850463b2e..61d4d8b4fd5 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/shard/engine_mock.go @@ -575,11 +575,12 @@ func (mr *MockEngineMockRecorder) SignalWithStartWorkflowExecution(ctx, request } // SignalWorkflowExecution mocks base method. -func (m *MockEngine) SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) error { +func (m *MockEngine) SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SignalWorkflowExecution", ctx, request) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*historyservice.SignalWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 } // SignalWorkflowExecution indicates an expected call of SignalWorkflowExecution.