Skip to content

Commit

Permalink
Move signal workflow to api package (#3348)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Sep 9, 2022
1 parent e80a691 commit 9f93c44
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 98 deletions.
126 changes: 126 additions & 0 deletions service/history/api/signalworkflow/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package signalworkflow

import (
"context"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
)

func Invoke(
ctx context.Context,
req *historyservice.SignalWorkflowExecutionRequest,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.SignalWorkflowExecutionResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}
namespaceID := namespaceEntry.ID()

request := req.SignalRequest
parentExecution := req.ExternalWorkflowExecution
childWorkflowOnly := req.GetChildWorkflowOnly()

err = api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
namespaceID.String(),
request.WorkflowExecution.WorkflowId,
request.WorkflowExecution.RunId,
),
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
mutableState := workflowContext.GetMutableState()
if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) {
return &api.UpdateWorkflowAction{
Noop: true,
CreateWorkflowTask: false,
}, nil
}

if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}

executionInfo := mutableState.GetExecutionInfo()
createWorkflowTask := true
if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask = false
}

if err := api.ValidateSignal(
ctx,
shard,
mutableState,
request.GetInput().Size(),
"SignalWorkflowExecution",
); err != nil {
return nil, err
}

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowId
parentRunID := executionInfo.ParentRunId
if parentExecution.GetWorkflowId() != parentWorkflowID ||
parentExecution.GetRunId() != parentRunID {
return nil, consts.ErrWorkflowParent
}
}

if request.GetRequestId() != "" {
mutableState.AddSignalRequested(request.GetRequestId())
}
if _, err := mutableState.AddWorkflowExecutionSignaled(
request.GetSignalName(),
request.GetInput(),
request.GetIdentity(),
request.GetHeader()); err != nil {
return nil, err
}

return &api.UpdateWorkflowAction{
Noop: false,
CreateWorkflowTask: createWorkflowTask,
}, nil
},
nil,
shard,
workflowConsistencyChecker,
)
if err != nil {
return nil, err
}
return &historyservice.SignalWorkflowExecutionResponse{}, nil
}
4 changes: 2 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,12 @@ func (h *Handler) SignalWorkflowExecution(ctx context.Context, request *historys
return nil, h.convertError(err)
}

err2 := engine.SignalWorkflowExecution(ctx, request)
resp, err2 := engine.SignalWorkflowExecution(ctx, request)
if err2 != nil {
return nil, h.convertError(err2)
}

return &historyservice.SignalWorkflowExecutionResponse{}, nil
return resp, nil
}

// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
Expand Down
85 changes: 4 additions & 81 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"go.temporal.io/server/service/history/api/respondactivitytaskcompleted"
"go.temporal.io/server/service/history/api/respondactivitytaskfailed"
"go.temporal.io/server/service/history/api/signalwithstartworkflow"
"go.temporal.io/server/service/history/api/signalworkflow"
"go.temporal.io/server/service/history/api/startworkflow"
"go.temporal.io/server/service/history/api/terminateworkflow"
"go.temporal.io/server/service/history/configs"
Expand Down Expand Up @@ -1149,87 +1150,9 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution(

func (e *historyEngineImpl) SignalWorkflowExecution(
ctx context.Context,
signalRequest *historyservice.SignalWorkflowExecutionRequest,
) error {

namespaceEntry, err := e.getActiveNamespaceEntry(namespace.ID(signalRequest.GetNamespaceId()))
if err != nil {
return err
}
namespaceID := namespaceEntry.ID()

request := signalRequest.SignalRequest
parentExecution := signalRequest.ExternalWorkflowExecution
childWorkflowOnly := signalRequest.GetChildWorkflowOnly()

return api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
namespaceID.String(),
request.WorkflowExecution.WorkflowId,
request.WorkflowExecution.RunId,
),
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
mutableState := workflowContext.GetMutableState()
if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) {
return &api.UpdateWorkflowAction{
Noop: true,
CreateWorkflowTask: false,
}, nil
}

if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}

executionInfo := mutableState.GetExecutionInfo()
createWorkflowTask := true
if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask = false
}

if err := api.ValidateSignal(
ctx,
e.shard,
mutableState,
request.GetInput().Size(),
"SignalWorkflowExecution",
); err != nil {
return nil, err
}

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowId
parentRunID := executionInfo.ParentRunId
if parentExecution.GetWorkflowId() != parentWorkflowID ||
parentExecution.GetRunId() != parentRunID {
return nil, consts.ErrWorkflowParent
}
}

if request.GetRequestId() != "" {
mutableState.AddSignalRequested(request.GetRequestId())
}
if _, err := mutableState.AddWorkflowExecutionSignaled(
request.GetSignalName(),
request.GetInput(),
request.GetIdentity(),
request.GetHeader()); err != nil {
return nil, err
}

return &api.UpdateWorkflowAction{
Noop: false,
CreateWorkflowTask: createWorkflowTask,
}, nil
},
nil,
e.shard,
e.workflowConsistencyChecker,
)
req *historyservice.SignalWorkflowExecutionRequest,
) (resp *historyservice.SignalWorkflowExecutionResponse, retError error) {
return signalworkflow.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker)
}

// SignalWithStartWorkflowExecution signals current workflow (if running) or creates & signals a new workflow
Expand Down
20 changes: 10 additions & 10 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4713,7 +4713,7 @@ func (s *engineSuite) TestCancelTimer_RespondWorkflowTaskCompleted_TimerFired()

func (s *engineSuite) TestSignalWorkflowExecution() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := commonpb.WorkflowExecution{
Expand Down Expand Up @@ -4746,14 +4746,14 @@ func (s *engineSuite) TestSignalWorkflowExecution() {
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

// Test signal workflow task by adding request ID
func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := commonpb.WorkflowExecution{
Expand Down Expand Up @@ -4789,14 +4789,14 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() {

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

// Test signal workflow task by dedup request ID & workflow finished
func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := commonpb.WorkflowExecution{
Expand Down Expand Up @@ -4833,13 +4833,13 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() {

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

func (s *engineSuite) TestSignalWorkflowExecution_Failed() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := &commonpb.WorkflowExecution{
Expand Down Expand Up @@ -4871,13 +4871,13 @@ func (s *engineSuite) TestSignalWorkflowExecution_Failed() {

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "workflow execution already completed")
}

func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() {
signalRequest := &historyservice.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "Missing namespace UUID.")

we := commonpb.WorkflowExecution{
Expand Down Expand Up @@ -4935,7 +4935,7 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() {
return tests.UpdateWorkflowExecutionResponse, nil
})

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
_, err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type (
RespondActivityTaskFailed(ctx context.Context, request *historyservice.RespondActivityTaskFailedRequest) (*historyservice.RespondActivityTaskFailedResponse, error)
RespondActivityTaskCanceled(ctx context.Context, request *historyservice.RespondActivityTaskCanceledRequest) (*historyservice.RespondActivityTaskCanceledResponse, error)
RecordActivityTaskHeartbeat(ctx context.Context, request *historyservice.RecordActivityTaskHeartbeatRequest) (*historyservice.RecordActivityTaskHeartbeatResponse, error)
RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (resp *historyservice.RequestCancelWorkflowExecutionResponse, retError error)
SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) error
RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (*historyservice.RequestCancelWorkflowExecutionResponse, error)
SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error)
SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error)
RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) error
TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error)
Expand Down
7 changes: 4 additions & 3 deletions service/history/shard/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9f93c44

Please sign in to comment.