From b87d605126fe9189aeb08c6849bc49f684506b21 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Wed, 26 Jan 2022 19:15:09 -0800 Subject: [PATCH] Fix fake cluster for empty version (#2376) --- host/xdc/integration_failover_test.go | 177 +++++++++++++++++- service/history/historyEngine.go | 9 +- service/history/nDCTransactionMgr.go | 5 +- service/history/shard/context_impl.go | 4 +- service/history/shard/engine.go | 4 +- service/history/shard/engine_mock.go | 16 +- service/history/timerQueueProcessor.go | 3 - service/history/transferQueueProcessor.go | 3 - service/history/workflow/context.go | 4 +- .../history/workflow/mutable_state_impl.go | 3 + service/history/workflow/transaction.go | 3 + service/history/workflow/transaction_impl.go | 42 ++--- service/history/workflow/transaction_mock.go | 24 +-- service/history/workflow/transaction_test.go | 4 +- service/history/workflowResetter.go | 1 + service/history/workflowResetter_test.go | 11 ++ 16 files changed, 240 insertions(+), 73 deletions(-) diff --git a/host/xdc/integration_failover_test.go b/host/xdc/integration_failover_test.go index 9aeca3f2f34..852e43e548c 100644 --- a/host/xdc/integration_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -56,6 +56,7 @@ import ( sdkclient "go.temporal.io/sdk/client" sdkworker "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + sw "go.temporal.io/server/service/worker" "gopkg.in/yaml.v3" "go.temporal.io/server/api/historyservice/v1" @@ -69,6 +70,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/environment" "go.temporal.io/server/host" + "go.temporal.io/server/service/worker/migration" ) type ( @@ -1844,7 +1846,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() { func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() { namespace := "test-activity-heartbeat-workflow-failover-" + common.GenerateRandomString(5) - s.registerNamespace(namespace) + s.registerNamespace(namespace, true) taskqueue := "integration-activity-heartbeat-workflow-failover-test-taskqueue" client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1") @@ -1949,6 +1951,167 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() { s.Equal(2, lastAttemptCount) } +func (s *integrationClustersTestSuite) printHistory(frontendClient workflowservice.WorkflowServiceClient, namespace, workflowID, runID string) { + events := s.getHistory(frontendClient, namespace, &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }) + history := &historypb.History{Events: events} + common.PrettyPrintHistory(history, s.logger) +} + +func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { + namespace := "local-ns-to-be-promote-" + common.GenerateRandomString(5) + s.registerNamespace(namespace, false) + + taskqueue := "integration-local-ns-to-be-promote-taskqueue" + client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1") + + testWorkflowFn := func(ctx workflow.Context, sleepInterval time.Duration) error { + err := workflow.Sleep(ctx, sleepInterval) + return err + } + worker1.RegisterWorkflow(testWorkflowFn) + worker1.Start() + + // Start wf1 (in local ns) + workflowID := "local-ns-wf-1" + run1, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + WorkflowRunTimeout: time.Second * 30, + }, testWorkflowFn, time.Millisecond*10) + + s.NoError(err) + s.NotEmpty(run1.GetRunID()) + s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID())) + // wait until wf1 complete + err = run1.Get(context.Background(), nil) + s.NoError(err) + + // Start wf2 (start in local ns, and then promote to global ns, wf2 close in global ns) + workflowID2 := "local-ns-wf-2" + run2, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + ID: workflowID2, + TaskQueue: taskqueue, + WorkflowRunTimeout: time.Second * 30, + }, testWorkflowFn, time.Second*15 /* longer than ns refresh */) + s.NoError(err) + s.NotEmpty(run2.GetRunID()) + s.logger.Info("start wf2", tag.WorkflowRunID(run2.GetRunID())) + + // promote ns + frontendClient1 := s.cluster1.GetFrontendClient() + _, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{ + Namespace: namespace, + PromoteNamespace: true, + }) + s.NoError(err) + nsResp, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + }) + s.NoError(err) + s.True(nsResp.IsGlobalNamespace) + s.Equal(1, len(nsResp.ReplicationConfig.Clusters)) + + // wait until wf2 complete + err = run2.Get(context.Background(), nil) + s.NoError(err) + + // update ns to have 2 clusters + _, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{ + Namespace: namespace, + ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ + Clusters: clusterReplicationConfig, + }, + }) + s.NoError(err) + // wait for ns cache to pick up the change + time.Sleep(cacheRefreshInterval) + + nsResp, err = frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + }) + s.NoError(err) + s.True(nsResp.IsGlobalNamespace) + s.Equal(2, len(nsResp.ReplicationConfig.Clusters)) + + // start wf3 (start in global ns) + workflowID3 := "local-ns-wf-3" + run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + ID: workflowID3, + TaskQueue: taskqueue, + WorkflowRunTimeout: time.Second * 30, + }, testWorkflowFn, time.Millisecond*10) + s.NoError(err) + s.NotEmpty(run3.GetRunID()) + s.logger.Info("start wf3", tag.WorkflowRunID(run3.GetRunID())) + // wait until wf3 complete + err = run3.Get(context.Background(), nil) + s.NoError(err) + + // start force-replicate wf + sysClient, err := sdkclient.NewClient(sdkclient.Options{ + HostPort: s.cluster1.GetHost().FrontendGRPCAddress(), + Namespace: "temporal-system", + }) + workflowID4 := "force-replication-wf-4" + run4, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + ID: workflowID4, + TaskQueue: sw.DefaultWorkerTaskQueue, + WorkflowRunTimeout: time.Second * 30, + }, "force-replication", migration.ForceReplicationParams{ + Namespace: namespace, + RpsPerActivity: 10, + }) + + s.NoError(err) + err = run4.Get(context.Background(), nil) + s.NoError(err) + + // start namespace-handover wf + workflowID5 := "namespace-handover-wf-5" + run5, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + ID: workflowID5, + TaskQueue: sw.DefaultWorkerTaskQueue, + WorkflowRunTimeout: time.Second * 30, + }, "namespace-handover", migration.NamespaceHandoverParams{ + Namespace: namespace, + RemoteCluster: clusterName[1], + AllowedLaggingSeconds: 10, + HandoverTimeoutSeconds: 30, + }) + s.NoError(err) + err = run5.Get(context.Background(), nil) + s.NoError(err) + + // at this point ns migration is done. + // verify namespace is now active in cluster2 + nsResp2, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + }) + s.NoError(err) + s.True(nsResp2.IsGlobalNamespace) + s.Equal(2, len(nsResp2.ReplicationConfig.Clusters)) + s.Equal(clusterName[1], nsResp2.ReplicationConfig.ActiveClusterName) + + // verify all wf in ns is now available in cluster2 + client2, err := sdkclient.NewClient(sdkclient.Options{ + HostPort: s.cluster2.GetHost().FrontendGRPCAddress(), + Namespace: namespace, + }) + s.NoError(err) + verify := func(wfID string, expectedRunID string) { + desc1, err := client2.DescribeWorkflowExecution(host.NewContext(), wfID, "") + s.NoError(err) + s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId) + s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status) + } + verify(workflowID, run1.GetRunID()) + verify(workflowID2, run2.GetRunID()) + verify(workflowID3, run3.GetRunID()) +} + func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent { historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{ Namespace: namespace, @@ -1996,12 +2159,16 @@ func (s *integrationClustersTestSuite) failover( time.Sleep(cacheRefreshInterval) } -func (s *integrationClustersTestSuite) registerNamespace(namespace string) { +func (s *integrationClustersTestSuite) registerNamespace(namespace string, isGlobalNamespace bool) { + clusters := clusterReplicationConfig + if !isGlobalNamespace { + clusters = clusterReplicationConfig[0:1] + } client1 := s.cluster1.GetFrontendClient() // active regReq := &workflowservice.RegisterNamespaceRequest{ Namespace: namespace, - IsGlobalNamespace: true, - Clusters: clusterReplicationConfig, + IsGlobalNamespace: isGlobalNamespace, + Clusters: clusters, ActiveClusterName: clusterName[0], WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24), } @@ -2017,7 +2184,7 @@ func (s *integrationClustersTestSuite) registerNamespace(namespace string) { s.NoError(err) s.NotNil(resp) s.Equal(namespace, resp.NamespaceInfo.Name) - s.Equal(true, resp.IsGlobalNamespace) + s.Equal(isGlobalNamespace, resp.IsGlobalNamespace) } func (s *integrationClustersTestSuite) newClientAndWorker(hostport, namespace, taskqueue, identity string) (sdkclient.Client, sdkworker.Worker) { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 59ff152dc86..35765d6f285 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2650,24 +2650,20 @@ func (e *historyEngineImpl) NotifyNewHistoryEvent( } func (e *historyEngineImpl) NotifyNewTransferTasks( - isGlobalNamespace bool, + clusterName string, tasks []tasks.Task, ) { if len(tasks) > 0 { - task := tasks[0] - clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(isGlobalNamespace, task.GetVersion()) e.txProcessor.NotifyNewTask(clusterName, tasks) } } func (e *historyEngineImpl) NotifyNewTimerTasks( - isGlobalNamespace bool, + clusterName string, tasks []tasks.Task, ) { if len(tasks) > 0 { - task := tasks[0] - clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(isGlobalNamespace, task.GetVersion()) e.timerProcessor.NotifyNewTimers(clusterName, tasks) } } @@ -2994,6 +2990,7 @@ func (e *historyEngineImpl) GetReplicationMessages( return nil, err } e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks))) + return replicationMessages, nil } diff --git a/service/history/nDCTransactionMgr.go b/service/history/nDCTransactionMgr.go index d3726af7994..7bf3afd683d 100644 --- a/service/history/nDCTransactionMgr.go +++ b/service/history/nDCTransactionMgr.go @@ -285,10 +285,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply( return 0, workflow.TransactionPolicyActive, err } isWorkflowRunning := targetWorkflow.getMutableState().IsWorkflowExecutionRunning() - targetWorkflowActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion( - true, - targetWorkflow.getMutableState().GetNamespaceEntry().FailoverVersion(), - ) + targetWorkflowActiveCluster := targetWorkflow.getMutableState().GetNamespaceEntry().ActiveClusterName() currentCluster := r.clusterMetadata.GetCurrentClusterName() isActiveCluster := targetWorkflowActiveCluster == currentCluster diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 45143e5a625..157e264388a 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -737,8 +737,8 @@ func (s *ContextImpl) addTasksLocked( if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil { return err } - s.engine.NotifyNewTransferTasks(namespaceEntry.IsGlobalNamespace(), request.TransferTasks) - s.engine.NotifyNewTimerTasks(namespaceEntry.IsGlobalNamespace(), request.TimerTasks) + s.engine.NotifyNewTransferTasks(namespaceEntry.ActiveClusterName(), request.TransferTasks) + s.engine.NotifyNewTimerTasks(namespaceEntry.ActiveClusterName(), request.TimerTasks) s.engine.NotifyNewVisibilityTasks(request.VisibilityTasks) s.engine.NotifyNewReplicationTasks(request.ReplicationTasks) return nil diff --git a/service/history/shard/engine.go b/service/history/shard/engine.go index 4b01c0455f1..7dac52d1232 100644 --- a/service/history/shard/engine.go +++ b/service/history/shard/engine.go @@ -83,8 +83,8 @@ type ( GetReplicationStatus(ctx context.Context, request *historyservice.GetReplicationStatusRequest) (*historyservice.ShardReplicationStatus, error) NotifyNewHistoryEvent(event *events.Notification) - NotifyNewTransferTasks(isGlobalNamespace bool, tasks []tasks.Task) - NotifyNewTimerTasks(isGlobalNamespace bool, tasks []tasks.Task) + NotifyNewTransferTasks(clusterName string, tasks []tasks.Task) + NotifyNewTimerTasks(clusterName string, tasks []tasks.Task) NotifyNewVisibilityTasks(tasks []tasks.Task) NotifyNewReplicationTasks(tasks []tasks.Task) } diff --git a/service/history/shard/engine_mock.go b/service/history/shard/engine_mock.go index 4d7002eb84e..e35b1fd2282 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/shard/engine_mock.go @@ -226,27 +226,27 @@ func (mr *MockEngineMockRecorder) NotifyNewReplicationTasks(tasks interface{}) * } // NotifyNewTimerTasks mocks base method. -func (m *MockEngine) NotifyNewTimerTasks(isGlobalNamespace bool, tasks []tasks.Task) { +func (m *MockEngine) NotifyNewTimerTasks(clusterName string, tasks []tasks.Task) { m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTimerTasks", isGlobalNamespace, tasks) + m.ctrl.Call(m, "NotifyNewTimerTasks", clusterName, tasks) } // NotifyNewTimerTasks indicates an expected call of NotifyNewTimerTasks. -func (mr *MockEngineMockRecorder) NotifyNewTimerTasks(isGlobalNamespace, tasks interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) NotifyNewTimerTasks(clusterName, tasks interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTimerTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTimerTasks), isGlobalNamespace, tasks) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTimerTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTimerTasks), clusterName, tasks) } // NotifyNewTransferTasks mocks base method. -func (m *MockEngine) NotifyNewTransferTasks(isGlobalNamespace bool, tasks []tasks.Task) { +func (m *MockEngine) NotifyNewTransferTasks(clusterName string, tasks []tasks.Task) { m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyNewTransferTasks", isGlobalNamespace, tasks) + m.ctrl.Call(m, "NotifyNewTransferTasks", clusterName, tasks) } // NotifyNewTransferTasks indicates an expected call of NotifyNewTransferTasks. -func (mr *MockEngineMockRecorder) NotifyNewTransferTasks(isGlobalNamespace, tasks interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) NotifyNewTransferTasks(clusterName, tasks interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTransferTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTransferTasks), isGlobalNamespace, tasks) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewTransferTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewTransferTasks), clusterName, tasks) } // NotifyNewVisibilityTasks mocks base method. diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 1ecfa2dbefa..3afd82ad76d 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -164,9 +164,6 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers( timerTasks []tasks.Task, ) { - if clusterName == cluster.FakeClusterForEmptyVersion { - return - } if clusterName == t.currentClusterName { t.activeTimerProcessor.notifyNewTimers(timerTasks) return diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index c87b0ef2966..c4a7f61fea4 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -170,9 +170,6 @@ func (t *transferQueueProcessorImpl) NotifyNewTask( transferTasks []tasks.Task, ) { - if clusterName == cluster.FakeClusterForEmptyVersion { - return - } if clusterName == t.currentClusterName { // we will ignore the current time passed in, since the active processor process task immediately if len(transferTasks) != 0 { diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index a0611c4e541..12142d454f9 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -425,7 +425,7 @@ func (c *ContextImpl) CreateWorkflowExecution( if err != nil { return err } - NotifyWorkflowSnapshotTasks(engine, newWorkflow, newMutableState.GetNamespaceEntry().IsGlobalNamespace()) + NotifyWorkflowSnapshotTasks(engine, newWorkflow, newMutableState.GetNamespaceEntry().ActiveClusterName()) emitStateTransitionCount(c.metricsClient, newMutableState) return nil @@ -520,6 +520,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution( newWorkflowEventsSeq, currentWorkflow, currentWorkflowEventsSeq, + resetMutableState.GetNamespaceEntry().ActiveClusterName(), ); err != nil { return err } else { @@ -685,6 +686,7 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew( currentWorkflowEventsSeq, newWorkflow, newWorkflowEventsSeq, + c.MutableState.GetNamespaceEntry().ActiveClusterName(), ); err != nil { return err } else { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 81cea4c8ca9..977f1459470 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4263,6 +4263,9 @@ func (e *MutableStateImpl) closeTransactionWithPolicyCheck( return nil } + // Cannot use e.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in + // passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The + // currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster. activeCluster := e.clusterMetadata.ClusterNameForFailoverVersion(e.namespaceEntry.IsGlobalNamespace(), e.GetCurrentVersion()) currentCluster := e.clusterMetadata.GetCurrentClusterName() diff --git a/service/history/workflow/transaction.go b/service/history/workflow/transaction.go index 7939e543a37..d9ccbf3eaa1 100644 --- a/service/history/workflow/transaction.go +++ b/service/history/workflow/transaction.go @@ -35,6 +35,7 @@ type ( createMode persistence.CreateWorkflowMode, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, error) ConflictResolveWorkflowExecution( @@ -45,6 +46,7 @@ type ( newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, int64, int64, error) UpdateWorkflowExecution( @@ -53,6 +55,7 @@ type ( currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, int64, error) } ) diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index 029ee90c8c9..5fdf34163c5 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -70,16 +70,13 @@ func (t *TransactionImpl) CreateWorkflowExecution( createMode persistence.CreateWorkflowMode, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, error) { engine, err := t.shard.GetEngine() if err != nil { return 0, err } - nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(newWorkflowSnapshot.ExecutionInfo.NamespaceId)) - if err != nil { - return 0, err - } resp, err := createWorkflowExecutionWithRetry(t.shard, &persistence.CreateWorkflowExecutionRequest{ ShardID: t.shard.GetShardID(), @@ -89,7 +86,7 @@ func (t *TransactionImpl) CreateWorkflowExecution( NewWorkflowEvents: newWorkflowEventsSeq, }) if operationPossiblySucceeded(err) { - NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace()) + NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, clusterName) } if err != nil { return 0, err @@ -110,16 +107,13 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution( newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, int64, int64, error) { engine, err := t.shard.GetEngine() if err != nil { return 0, 0, 0, err } - nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(resetWorkflowSnapshot.ExecutionInfo.NamespaceId)) - if err != nil { - return 0, 0, 0, err - } resp, err := conflictResolveWorkflowExecutionWithRetry(t.shard, &persistence.ConflictResolveWorkflowExecutionRequest{ ShardID: t.shard.GetShardID(), @@ -133,9 +127,9 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution( CurrentWorkflowEvents: currentWorkflowEventsSeq, }) if operationPossiblySucceeded(err) { - NotifyWorkflowSnapshotTasks(engine, resetWorkflowSnapshot, nsEntry.IsGlobalNamespace()) - NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace()) - NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace()) + NotifyWorkflowSnapshotTasks(engine, resetWorkflowSnapshot, clusterName) + NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, clusterName) + NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, clusterName) } if err != nil { return 0, 0, 0, err @@ -168,17 +162,13 @@ func (t *TransactionImpl) UpdateWorkflowExecution( currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + clusterName string, ) (int64, int64, error) { engine, err := t.shard.GetEngine() if err != nil { return 0, 0, err } - nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(currentWorkflowMutation.ExecutionInfo.NamespaceId)) - if err != nil { - return 0, 0, err - } - resp, err := updateWorkflowExecutionWithRetry(t.shard, &persistence.UpdateWorkflowExecutionRequest{ ShardID: t.shard.GetShardID(), // RangeID , this is set by shard context @@ -189,8 +179,8 @@ func (t *TransactionImpl) UpdateWorkflowExecution( NewWorkflowEvents: newWorkflowEventsSeq, }) if operationPossiblySucceeded(err) { - NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace()) - NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace()) + NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, clusterName) + NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, clusterName) } if err != nil { return 0, 0, err @@ -522,7 +512,7 @@ func updateWorkflowExecutionWithRetry( func NotifyWorkflowSnapshotTasks( engine shard.Engine, workflowSnapshot *persistence.WorkflowSnapshot, - isGlobalNamespace bool, + clusterName string, ) { if workflowSnapshot == nil { return @@ -533,14 +523,14 @@ func NotifyWorkflowSnapshotTasks( workflowSnapshot.TimerTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.VisibilityTasks, - isGlobalNamespace, + clusterName, ) } func NotifyWorkflowMutationTasks( engine shard.Engine, workflowMutation *persistence.WorkflowMutation, - isGlobalNamespace bool, + clusterName string, ) { if workflowMutation == nil { return @@ -551,7 +541,7 @@ func NotifyWorkflowMutationTasks( workflowMutation.TimerTasks, workflowMutation.ReplicationTasks, workflowMutation.VisibilityTasks, - isGlobalNamespace, + clusterName, ) } @@ -561,10 +551,10 @@ func notifyTasks( timerTasks []tasks.Task, replicationTasks []tasks.Task, visibilityTasks []tasks.Task, - isGlobalNamespace bool, + clusterName string, ) { - engine.NotifyNewTransferTasks(isGlobalNamespace, transferTasks) - engine.NotifyNewTimerTasks(isGlobalNamespace, timerTasks) + engine.NotifyNewTransferTasks(clusterName, transferTasks) + engine.NotifyNewTimerTasks(clusterName, timerTasks) engine.NotifyNewVisibilityTasks(visibilityTasks) engine.NotifyNewReplicationTasks(replicationTasks) } diff --git a/service/history/workflow/transaction_mock.go b/service/history/workflow/transaction_mock.go index f221e2e8115..8d2ae102247 100644 --- a/service/history/workflow/transaction_mock.go +++ b/service/history/workflow/transaction_mock.go @@ -59,9 +59,9 @@ func (m *MockTransaction) EXPECT() *MockTransactionMockRecorder { } // ConflictResolveWorkflowExecution mocks base method. -func (m *MockTransaction) ConflictResolveWorkflowExecution(conflictResolveMode persistence.ConflictResolveWorkflowMode, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, int64, error) { +func (m *MockTransaction) ConflictResolveWorkflowExecution(conflictResolveMode persistence.ConflictResolveWorkflowMode, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, clusterName string) (int64, int64, int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConflictResolveWorkflowExecution", conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq) + ret := m.ctrl.Call(m, "ConflictResolveWorkflowExecution", conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq, clusterName) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(int64) @@ -70,30 +70,30 @@ func (m *MockTransaction) ConflictResolveWorkflowExecution(conflictResolveMode p } // ConflictResolveWorkflowExecution indicates an expected call of ConflictResolveWorkflowExecution. -func (mr *MockTransactionMockRecorder) ConflictResolveWorkflowExecution(conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) ConflictResolveWorkflowExecution(conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq, clusterName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConflictResolveWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).ConflictResolveWorkflowExecution), conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConflictResolveWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).ConflictResolveWorkflowExecution), conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq, clusterName) } // CreateWorkflowExecution mocks base method. -func (m *MockTransaction) CreateWorkflowExecution(createMode persistence.CreateWorkflowMode, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, error) { +func (m *MockTransaction) CreateWorkflowExecution(createMode persistence.CreateWorkflowMode, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, clusterName string) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkflowExecution", createMode, newWorkflowSnapshot, newWorkflowEventsSeq) + ret := m.ctrl.Call(m, "CreateWorkflowExecution", createMode, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. -func (mr *MockTransactionMockRecorder) CreateWorkflowExecution(createMode, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) CreateWorkflowExecution(createMode, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).CreateWorkflowExecution), createMode, newWorkflowSnapshot, newWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).CreateWorkflowExecution), createMode, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName) } // UpdateWorkflowExecution mocks base method. -func (m *MockTransaction) UpdateWorkflowExecution(updateMode persistence.UpdateWorkflowMode, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, error) { +func (m *MockTransaction) UpdateWorkflowExecution(updateMode persistence.UpdateWorkflowMode, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, clusterName string) (int64, int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateWorkflowExecution", updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq) + ret := m.ctrl.Call(m, "UpdateWorkflowExecution", updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(error) @@ -101,7 +101,7 @@ func (m *MockTransaction) UpdateWorkflowExecution(updateMode persistence.UpdateW } // UpdateWorkflowExecution indicates an expected call of UpdateWorkflowExecution. -func (mr *MockTransactionMockRecorder) UpdateWorkflowExecution(updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) UpdateWorkflowExecution(updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).UpdateWorkflowExecution), updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).UpdateWorkflowExecution), updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, clusterName) } diff --git a/service/history/workflow/transaction_test.go b/service/history/workflow/transaction_test.go index 92cad246794..74d017c7c35 100644 --- a/service/history/workflow/transaction_test.go +++ b/service/history/workflow/transaction_test.go @@ -26,7 +26,6 @@ package workflow import ( "errors" - "testing" "github.com/golang/mock/gomock" @@ -127,6 +126,7 @@ func (s *transactionSuite) TestCreateWorkflowExecution_NotifyTaskWhenFailed() { }, }, []*persistence.WorkflowEvents{}, + "active-cluster-name", ) s.Equal(timeoutErr, err) } @@ -153,6 +153,7 @@ func (s *transactionSuite) TestUpdateWorkflowExecution_NotifyTaskWhenFailed() { []*persistence.WorkflowEvents{}, &persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{}, + "active-cluster-name", ) s.Equal(timeoutErr, err) } @@ -182,6 +183,7 @@ func (s *transactionSuite) TestConflictResolveWorkflowExecution_NotifyTaskWhenFa []*persistence.WorkflowEvents{}, &persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{}, + "active-cluster-name", ) s.Equal(timeoutErr, err) } diff --git a/service/history/workflowResetter.go b/service/history/workflowResetter.go index 7b8a37f8552..d3b0615fc03 100644 --- a/service/history/workflowResetter.go +++ b/service/history/workflowResetter.go @@ -365,6 +365,7 @@ func (r *workflowResetterImpl) persistToDB( currentWorkflowEventsSeq, resetWorkflowSnapshot, resetWorkflowEventsSeq, + resetWorkflow.getMutableState().GetNamespaceEntry().ActiveClusterName(), ); err != nil { return err } else { diff --git a/service/history/workflowResetter_test.go b/service/history/workflowResetter_test.go index 9b337e83b53..36bb6d20ac6 100644 --- a/service/history/workflowResetter_test.go +++ b/service/history/workflowResetter_test.go @@ -184,6 +184,16 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() { gomock.Any(), workflow.TransactionPolicyActive, ).Return(resetSnapshot, resetEventsSeq, nil) + resetMutableState.EXPECT().GetNamespaceEntry().Return(namespace.FromPersistentState(&persistence.GetNamespaceResponse{ + Namespace: &persistencespb.NamespaceDetail{ + Info: &persistencespb.NamespaceInfo{}, + Config: &persistencespb.NamespaceConfig{}, + ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ + ActiveClusterName: "active-cluster-name", + }, + }, + IsGlobalNamespace: false, + })) resetContext.EXPECT().GetHistorySize().Return(resetEventsSize).AnyTimes() resetContext.EXPECT().SetHistorySize(resetEventsSize + resetNewEventsSize) @@ -193,6 +203,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() { currentEventsSeq, resetSnapshot, resetEventsSeq, + "active-cluster-name", ).Return(currentNewEventsSize, resetNewEventsSize, nil) err := s.workflowResetter.persistToDB(currentWorkflow, currentMutation, currentEventsSeq, resetWorkflow)