Skip to content

Commit

Permalink
Handle buffered events during namespace migration / promotion (#2440)
Browse files Browse the repository at this point in the history
* Add new namespace conversion util handling namespace promotion & buffered events case
* Remove unused StartTransactionSkipWorkflowTaskFail
  • Loading branch information
wxing1292 authored Feb 1, 2022
1 parent 4600713 commit 426621d
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 39 deletions.
7 changes: 7 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (m *metadataImpl) ClusterNameForFailoverVersion(isGlobalNamespace bool, fai
return m.currentClusterName
}

if !isGlobalNamespace {
panic(fmt.Sprintf(
"ClusterMetadata encountered local namesapce with failover version %v",
failoverVersion,
))
}

initialFailoverVersion := failoverVersion % m.failoverVersionIncrement
// Failover version starts with 1. Zero is an invalid value for failover version
if initialFailoverVersion == common.EmptyVersion {
Expand Down
1 change: 1 addition & 0 deletions common/namespace/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

namespacepb "go.temporal.io/api/namespace/v1"

"go.temporal.io/server/common/persistence"
)

Expand Down
58 changes: 58 additions & 0 deletions common/namespace/namespace_migration_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package namespace

import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/persistence"
)

type (
PretendAsLocalNamespace struct {
localClusterName string
}
)

var _ Mutation = PretendAsLocalNamespace{}

// NewPretendAsLocalNamespace create a Mutation which update namespace replication
// config as if this namespace is local
func NewPretendAsLocalNamespace(
localClusterName string,
) PretendAsLocalNamespace {
return PretendAsLocalNamespace{
localClusterName: localClusterName,
}
}

func (c PretendAsLocalNamespace) apply(response *persistence.GetNamespaceResponse) {
response.IsGlobalNamespace = false
response.Namespace.ReplicationConfig = &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: c.localClusterName,
Clusters: persistence.GetOrUseDefaultClusters(c.localClusterName, nil),
}
response.Namespace.FailoverVersion = common.EmptyVersion
}
5 changes: 5 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,11 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() {

msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry,
log.NewTestLogger(), runID)
addWorkflowExecutionStartedEvent(msBuilder, commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
}, "wType", "testTaskQueue", payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, identity)
_ = addWorkflowTaskScheduledEvent(msBuilder)
ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
gceResponse := &persistence.GetCurrentExecutionResponse{RunID: runID}
Expand Down
5 changes: 5 additions & 0 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() {

msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry,
log.NewTestLogger(), runID)
addWorkflowExecutionStartedEvent(msBuilder, commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
}, "wType", "testTaskQueue", payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, identity)
_ = addWorkflowTaskScheduledEvent(msBuilder)
ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
Expand Down
1 change: 0 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ type (
GetUpdateCondition() (int64, int64)

StartTransaction(entry *namespace.Namespace) (bool, error)
StartTransactionSkipWorkflowTaskFail(entry *namespace.Namespace) error
CloseTransactionAsMutation(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
GenerateLastHistoryReplicationTasks(now time.Time) (*tasks.HistoryReplicationTask, error)
Expand Down
54 changes: 31 additions & 23 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3660,13 +3660,16 @@ func (e *MutableStateImpl) UpdateWorkflowStateStatus(
func (e *MutableStateImpl) StartTransaction(
namespaceEntry *namespace.Namespace,
) (bool, error) {

namespaceEntry, err := e.startTransactionHandleNamespaceMigration(namespaceEntry)
if err != nil {
return false, err
}
e.namespaceEntry = namespaceEntry
if err := e.UpdateCurrentVersion(namespaceEntry.FailoverVersion(), false); err != nil {
return false, err
}

flushBeforeReady, err := e.startTransactionHandleWorkflowTaskFailover(false)
flushBeforeReady, err := e.startTransactionHandleWorkflowTaskFailover()
if err != nil {
return false, err
}
Expand All @@ -3676,19 +3679,6 @@ func (e *MutableStateImpl) StartTransaction(
return flushBeforeReady, nil
}

func (e *MutableStateImpl) StartTransactionSkipWorkflowTaskFail(
namespaceEntry *namespace.Namespace,
) error {

e.namespaceEntry = namespaceEntry
if err := e.UpdateCurrentVersion(namespaceEntry.FailoverVersion(), false); err != nil {
return err
}

_, err := e.startTransactionHandleWorkflowTaskFailover(true)
return err
}

func (e *MutableStateImpl) CloseTransactionAsMutation(
now time.Time,
transactionPolicy TransactionPolicy,
Expand Down Expand Up @@ -4163,9 +4153,31 @@ func (e *MutableStateImpl) startTransactionHandleWorkflowTaskTTL() {
}
}

func (e *MutableStateImpl) startTransactionHandleWorkflowTaskFailover(
skipWorkflowTaskFailed bool,
) (bool, error) {
func (e *MutableStateImpl) startTransactionHandleNamespaceMigration(
namespaceEntry *namespace.Namespace,
) (*namespace.Namespace, error) {
// NOTE:
// the main idea here is to guarantee that buffered events & namespace migration works
// e.g. handle buffered events during version 0 => version > 0 by postponing namespace migration
// * flush buffered events as if namespace is still local
// * use updated namespace for actual call

lastWriteVersion, err := e.GetLastWriteVersion()
if err != nil {
return nil, err
}

// local namespace -> global namespace && with buffered events
if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && e.HasBufferedEvents() {
localNamespaceMutation := namespace.NewPretendAsLocalNamespace(
e.clusterMetadata.GetCurrentClusterName(),
)
return namespaceEntry.Clone(localNamespaceMutation), nil
}
return namespaceEntry, nil
}

func (e *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool, error) {

if !e.IsWorkflowExecutionRunning() ||
!e.canReplicateEvents() {
Expand Down Expand Up @@ -4222,7 +4234,7 @@ func (e *MutableStateImpl) startTransactionHandleWorkflowTaskFailover(
if lastWriteSourceCluster != currentCluster && currentVersionCluster == currentCluster {
// do a sanity check on buffered events
if e.HasBufferedEvents() {
return false, serviceerror.NewInternal("MutableStateImpl encounter previous passive workflow with buffered events")
return false, serviceerror.NewInternal("MutableStateImpl encountered previous passive workflow with buffered events")
}
flushBufferVersion = currentVersion
}
Expand All @@ -4234,10 +4246,6 @@ func (e *MutableStateImpl) startTransactionHandleWorkflowTaskFailover(
return false, err
}

if skipWorkflowTaskFailed {
return false, nil
}

// we have a workflow task with buffered events on the fly with a lower version, fail it
if err := failWorkflowTask(
e,
Expand Down
14 changes: 0 additions & 14 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion service/history/workflow/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func TestGlobalMutableState(
func TestCloneToProto(
mutableState MutableState,
) *persistencespb.WorkflowMutableState {
_, _, _ = mutableState.CloseTransactionAsSnapshot(time.Now().UTC(), TransactionPolicyActive)
if mutableState.HasBufferedEvents() {
_, _, _ = mutableState.CloseTransactionAsMutation(time.Now().UTC(), TransactionPolicyActive)
} else {
_, _, _ = mutableState.CloseTransactionAsSnapshot(time.Now().UTC(), TransactionPolicyActive)
}
return mutableState.CloneToProto()
}

0 comments on commit 426621d

Please sign in to comment.