Skip to content

Commit

Permalink
Fix fake cluster for empty version (#2376)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Jan 27, 2022
1 parent b042650 commit b87d605
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 73 deletions.
177 changes: 172 additions & 5 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
sdkclient "go.temporal.io/sdk/client"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
sw "go.temporal.io/server/service/worker"
"gopkg.in/yaml.v3"

"go.temporal.io/server/api/historyservice/v1"
Expand All @@ -69,6 +70,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/environment"
"go.temporal.io/server/host"
"go.temporal.io/server/service/worker/migration"
)

type (
Expand Down Expand Up @@ -1844,7 +1846,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {

func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
namespace := "test-activity-heartbeat-workflow-failover-" + common.GenerateRandomString(5)
s.registerNamespace(namespace)
s.registerNamespace(namespace, true)

taskqueue := "integration-activity-heartbeat-workflow-failover-test-taskqueue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")
Expand Down Expand Up @@ -1949,6 +1951,167 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
s.Equal(2, lastAttemptCount)
}

func (s *integrationClustersTestSuite) printHistory(frontendClient workflowservice.WorkflowServiceClient, namespace, workflowID, runID string) {
events := s.getHistory(frontendClient, namespace, &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
})
history := &historypb.History{Events: events}
common.PrettyPrintHistory(history, s.logger)
}

func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
namespace := "local-ns-to-be-promote-" + common.GenerateRandomString(5)
s.registerNamespace(namespace, false)

taskqueue := "integration-local-ns-to-be-promote-taskqueue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")

testWorkflowFn := func(ctx workflow.Context, sleepInterval time.Duration) error {
err := workflow.Sleep(ctx, sleepInterval)
return err
}
worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

// Start wf1 (in local ns)
workflowID := "local-ns-wf-1"
run1, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Millisecond*10)

s.NoError(err)
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(context.Background(), nil)
s.NoError(err)

// Start wf2 (start in local ns, and then promote to global ns, wf2 close in global ns)
workflowID2 := "local-ns-wf-2"
run2, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID2,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Second*15 /* longer than ns refresh */)
s.NoError(err)
s.NotEmpty(run2.GetRunID())
s.logger.Info("start wf2", tag.WorkflowRunID(run2.GetRunID()))

// promote ns
frontendClient1 := s.cluster1.GetFrontendClient()
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
PromoteNamespace: true,
})
s.NoError(err)
nsResp, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(1, len(nsResp.ReplicationConfig.Clusters))

// wait until wf2 complete
err = run2.Get(context.Background(), nil)
s.NoError(err)

// update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
},
})
s.NoError(err)
// wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err = frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// start wf3 (start in global ns)
workflowID3 := "local-ns-wf-3"
run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID3,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Millisecond*10)
s.NoError(err)
s.NotEmpty(run3.GetRunID())
s.logger.Info("start wf3", tag.WorkflowRunID(run3.GetRunID()))
// wait until wf3 complete
err = run3.Get(context.Background(), nil)
s.NoError(err)

// start force-replicate wf
sysClient, err := sdkclient.NewClient(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: "temporal-system",
})
workflowID4 := "force-replication-wf-4"
run4, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID4,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "force-replication", migration.ForceReplicationParams{
Namespace: namespace,
RpsPerActivity: 10,
})

s.NoError(err)
err = run4.Get(context.Background(), nil)
s.NoError(err)

// start namespace-handover wf
workflowID5 := "namespace-handover-wf-5"
run5, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID5,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "namespace-handover", migration.NamespaceHandoverParams{
Namespace: namespace,
RemoteCluster: clusterName[1],
AllowedLaggingSeconds: 10,
HandoverTimeoutSeconds: 30,
})
s.NoError(err)
err = run5.Get(context.Background(), nil)
s.NoError(err)

// at this point ns migration is done.
// verify namespace is now active in cluster2
nsResp2, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp2.IsGlobalNamespace)
s.Equal(2, len(nsResp2.ReplicationConfig.Clusters))
s.Equal(clusterName[1], nsResp2.ReplicationConfig.ActiveClusterName)

// verify all wf in ns is now available in cluster2
client2, err := sdkclient.NewClient(sdkclient.Options{
HostPort: s.cluster2.GetHost().FrontendGRPCAddress(),
Namespace: namespace,
})
s.NoError(err)
verify := func(wfID string, expectedRunID string) {
desc1, err := client2.DescribeWorkflowExecution(host.NewContext(), wfID, "")
s.NoError(err)
s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status)
}
verify(workflowID, run1.GetRunID())
verify(workflowID2, run2.GetRunID())
verify(workflowID3, run3.GetRunID())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand Down Expand Up @@ -1996,12 +2159,16 @@ func (s *integrationClustersTestSuite) failover(
time.Sleep(cacheRefreshInterval)
}

func (s *integrationClustersTestSuite) registerNamespace(namespace string) {
func (s *integrationClustersTestSuite) registerNamespace(namespace string, isGlobalNamespace bool) {
clusters := clusterReplicationConfig
if !isGlobalNamespace {
clusters = clusterReplicationConfig[0:1]
}
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
IsGlobalNamespace: isGlobalNamespace,
Clusters: clusters,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
Expand All @@ -2017,7 +2184,7 @@ func (s *integrationClustersTestSuite) registerNamespace(namespace string) {
s.NoError(err)
s.NotNil(resp)
s.Equal(namespace, resp.NamespaceInfo.Name)
s.Equal(true, resp.IsGlobalNamespace)
s.Equal(isGlobalNamespace, resp.IsGlobalNamespace)
}

func (s *integrationClustersTestSuite) newClientAndWorker(hostport, namespace, taskqueue, identity string) (sdkclient.Client, sdkworker.Worker) {
Expand Down
9 changes: 3 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2650,24 +2650,20 @@ func (e *historyEngineImpl) NotifyNewHistoryEvent(
}

func (e *historyEngineImpl) NotifyNewTransferTasks(
isGlobalNamespace bool,
clusterName string,
tasks []tasks.Task,
) {
if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(isGlobalNamespace, task.GetVersion())
e.txProcessor.NotifyNewTask(clusterName, tasks)
}
}

func (e *historyEngineImpl) NotifyNewTimerTasks(
isGlobalNamespace bool,
clusterName string,
tasks []tasks.Task,
) {

if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(isGlobalNamespace, task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
}
}
Expand Down Expand Up @@ -2994,6 +2990,7 @@ func (e *historyEngineImpl) GetReplicationMessages(
return nil, err
}
e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks)))

return replicationMessages, nil
}

Expand Down
5 changes: 1 addition & 4 deletions service/history/nDCTransactionMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflowEventsReapply(
return 0, workflow.TransactionPolicyActive, err
}
isWorkflowRunning := targetWorkflow.getMutableState().IsWorkflowExecutionRunning()
targetWorkflowActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(
true,
targetWorkflow.getMutableState().GetNamespaceEntry().FailoverVersion(),
)
targetWorkflowActiveCluster := targetWorkflow.getMutableState().GetNamespaceEntry().ActiveClusterName()
currentCluster := r.clusterMetadata.GetCurrentClusterName()
isActiveCluster := targetWorkflowActiveCluster == currentCluster

Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,8 @@ func (s *ContextImpl) addTasksLocked(
if err = s.handleErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
return err
}
s.engine.NotifyNewTransferTasks(namespaceEntry.IsGlobalNamespace(), request.TransferTasks)
s.engine.NotifyNewTimerTasks(namespaceEntry.IsGlobalNamespace(), request.TimerTasks)
s.engine.NotifyNewTransferTasks(namespaceEntry.ActiveClusterName(), request.TransferTasks)
s.engine.NotifyNewTimerTasks(namespaceEntry.ActiveClusterName(), request.TimerTasks)
s.engine.NotifyNewVisibilityTasks(request.VisibilityTasks)
s.engine.NotifyNewReplicationTasks(request.ReplicationTasks)
return nil
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type (
GetReplicationStatus(ctx context.Context, request *historyservice.GetReplicationStatusRequest) (*historyservice.ShardReplicationStatus, error)

NotifyNewHistoryEvent(event *events.Notification)
NotifyNewTransferTasks(isGlobalNamespace bool, tasks []tasks.Task)
NotifyNewTimerTasks(isGlobalNamespace bool, tasks []tasks.Task)
NotifyNewTransferTasks(clusterName string, tasks []tasks.Task)
NotifyNewTimerTasks(clusterName string, tasks []tasks.Task)
NotifyNewVisibilityTasks(tasks []tasks.Task)
NotifyNewReplicationTasks(tasks []tasks.Task)
}
Expand Down
16 changes: 8 additions & 8 deletions service/history/shard/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(
timerTasks []tasks.Task,
) {

if clusterName == cluster.FakeClusterForEmptyVersion {
return
}
if clusterName == t.currentClusterName {
t.activeTimerProcessor.notifyNewTimers(timerTasks)
return
Expand Down
3 changes: 0 additions & 3 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ func (t *transferQueueProcessorImpl) NotifyNewTask(
transferTasks []tasks.Task,
) {

if clusterName == cluster.FakeClusterForEmptyVersion {
return
}
if clusterName == t.currentClusterName {
// we will ignore the current time passed in, since the active processor process task immediately
if len(transferTasks) != 0 {
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (c *ContextImpl) CreateWorkflowExecution(
if err != nil {
return err
}
NotifyWorkflowSnapshotTasks(engine, newWorkflow, newMutableState.GetNamespaceEntry().IsGlobalNamespace())
NotifyWorkflowSnapshotTasks(engine, newWorkflow, newMutableState.GetNamespaceEntry().ActiveClusterName())
emitStateTransitionCount(c.metricsClient, newMutableState)

return nil
Expand Down Expand Up @@ -520,6 +520,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
newWorkflowEventsSeq,
currentWorkflow,
currentWorkflowEventsSeq,
resetMutableState.GetNamespaceEntry().ActiveClusterName(),
); err != nil {
return err
} else {
Expand Down Expand Up @@ -685,6 +686,7 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
currentWorkflowEventsSeq,
newWorkflow,
newWorkflowEventsSeq,
c.MutableState.GetNamespaceEntry().ActiveClusterName(),
); err != nil {
return err
} else {
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4263,6 +4263,9 @@ func (e *MutableStateImpl) closeTransactionWithPolicyCheck(
return nil
}

// Cannot use e.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in
// passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The
// currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster.
activeCluster := e.clusterMetadata.ClusterNameForFailoverVersion(e.namespaceEntry.IsGlobalNamespace(), e.GetCurrentVersion())
currentCluster := e.clusterMetadata.GetCurrentClusterName()

Expand Down
Loading

0 comments on commit b87d605

Please sign in to comment.