Skip to content

Commit

Permalink
Move respond activity task completed to api package (#3333)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Sep 8, 2022
1 parent 57b07b1 commit 4899a21
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 150 deletions.
78 changes: 78 additions & 0 deletions service/history/api/activity_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package api

import (
"context"
"fmt"

"go.temporal.io/api/serviceerror"

tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/service/history/workflow"
)

func SetActivityTaskRunID(
ctx context.Context,
token *tokenspb.Task,
workflowConsistencyChecker WorkflowConsistencyChecker,
) error {
// TODO when the following APIs are deprecated
// remove this function since run ID will always be set
// * RecordActivityTaskHeartbeatById
// * RespondActivityTaskCanceledById
// * RespondActivityTaskFailedById
// * RespondActivityTaskCompletedById

if len(token.RunId) != 0 {
return nil
}

runID, err := workflowConsistencyChecker.GetCurrentRunID(
ctx,
token.NamespaceId,
token.WorkflowId,
)
if err != nil {
return err
}
token.RunId = runID
return nil
}

func GetActivityScheduledEventID(
activityID string,
mutableState workflow.MutableState,
) (int64, error) {

if activityID == "" {
return 0, serviceerror.NewInvalidArgument("activityID cannot be empty")
}
activityInfo, ok := mutableState.GetActivityByActivityID(activityID)
if !ok {
return 0, serviceerror.NewNotFound(fmt.Sprintf("cannot find pending activity with ActivityID %s, check workflow execution history for more details", activityID))
}
return activityInfo.ScheduledEventId, nil
}
129 changes: 129 additions & 0 deletions service/history/api/respondactivitytaskcompleted/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package respondactivitytaskcompleted

import (
"context"
"time"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
)

func Invoke(
ctx context.Context,
req *historyservice.RespondActivityTaskCompletedRequest,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RespondActivityTaskCompletedResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}
namespace := namespaceEntry.Name()

tokenSerializer := common.NewProtoTaskTokenSerializer()
request := req.CompleteRequest
token, err0 := tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, consts.ErrDeserializingToken
}
if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil {
return nil, err
}

var activityStartedTime time.Time
var taskQueue string
var workflowTypeName string
err = api.GetAndUpdateWorkflowWithNew(
ctx,
token.Clock,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
token.NamespaceId,
token.WorkflowId,
token.RunId,
),
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
mutableState := workflowContext.GetMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
scheduledEventID := token.GetScheduledEventId()
if scheduledEventID == common.EmptyEventID { // client call CompleteActivityById, so get scheduledEventID by activityID
scheduledEventID, err0 = api.GetActivityScheduledEventID(token.GetActivityId(), mutableState)
if err0 != nil {
return nil, err0
}
}
ai, isRunning := mutableState.GetActivityInfo(scheduledEventID)

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduledEventID >= mutableState.GetNextEventID() {
shard.GetMetricsClient().IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter)
return nil, consts.ErrStaleState
}

if !isRunning || ai.StartedEventId == common.EmptyEventID ||
(token.GetScheduledEventId() != common.EmptyEventID && token.Attempt != ai.Attempt) {
return nil, consts.ErrActivityTaskNotFound
}

if _, err := mutableState.AddActivityTaskCompletedEvent(scheduledEventID, ai.StartedEventId, request); err != nil {
// Unable to add ActivityTaskCompleted event to history
return nil, err
}
activityStartedTime = *ai.StartedTime
taskQueue = ai.TaskQueue
return &api.UpdateWorkflowAction{
Noop: false,
CreateWorkflowTask: true,
}, nil
},
nil,
shard,
workflowConsistencyChecker,
)

if err == nil && !activityStartedTime.IsZero() {
scope := shard.GetMetricsClient().Scope(metrics.HistoryRespondActivityTaskCompletedScope).
Tagged(
metrics.NamespaceTag(namespace.String()),
metrics.WorkflowTypeTag(workflowTypeName),
metrics.ActivityTypeTag(token.ActivityType),
metrics.TaskQueueTag(taskQueue),
)
scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return &historyservice.RespondActivityTaskCompletedResponse{}, err
}
4 changes: 2 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ func (h *Handler) RespondActivityTaskCompleted(ctx context.Context, request *his
return nil, h.convertError(err)
}

err2 := engine.RespondActivityTaskCompleted(ctx, request)
resp, err2 := engine.RespondActivityTaskCompleted(ctx, request)
if err2 != nil {
return nil, h.convertError(err2)
}

return &historyservice.RespondActivityTaskCompletedResponse{}, nil
return resp, nil
}

// RespondActivityTaskFailed - records failure of an activity task
Expand Down
Loading

0 comments on commit 4899a21

Please sign in to comment.