diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 1d3362afc0e..c11e29b61f3 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -303,7 +303,6 @@ var Keys = map[Key]string{ MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability", MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability", MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore", - ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster", StandbyTaskReReplicationContextTimeout: "history.standbyTaskReReplicationContextTimeout", EnableDropStuckTaskByNamespaceID: "history.DropStuckTaskByNamespace", SkipReapplicationByNamespaceID: "history.SkipReapplicationByNamespaceID", @@ -950,9 +949,6 @@ const ( // MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded MutableStateChecksumInvalidateBefore - // ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster - ReplicationEventsFromCurrentCluster - // StandbyTaskReReplicationContextTimeout is the context timeout for standby task re-replication StandbyTaskReReplicationContextTimeout diff --git a/service/history/configs/config.go b/service/history/configs/config.go index d349da6770a..2fe13688aa4 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -215,8 +215,7 @@ type Config struct { MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithNamespaceFilter MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn - // Crocess DC Replication configuration - ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithNamespaceFilter + // NDC Replication configuration StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithNamespaceIDFilter SkipReapplicationByNamespaceID dynamicconfig.BoolPropertyFnWithNamespaceIDFilter @@ -416,7 +415,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateChecksumVerifyProbability, 0), MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0), - ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false), StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByNamespaceID(dynamicconfig.StandbyTaskReReplicationContextTimeout, 3*time.Minute), SkipReapplicationByNamespaceID: dc.GetBoolPropertyFnWithNamespaceIDFilter(dynamicconfig.SkipReapplicationByNamespaceID, false), diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index 5ed30ccce53..338cab1dda9 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -250,20 +250,7 @@ func (r *nDCHistoryReplicatorImpl) applyEvents( default: // apply events, other than simple start workflow execution // the continue as new + start workflow execution combination will also be processed here - var mutableState workflow.MutableState - var err error - namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(context.GetNamespaceID()) - if err != nil { - return err - } - - if r.shard.GetConfig().ReplicationEventsFromCurrentCluster(namespaceEntry.Name().String()) { - // this branch is used when replicating events (generated from current cluster)from remote cluster to current cluster. - // this could happen when the events are lost in current cluster and plan to recover them from remote cluster. - mutableState, err = context.LoadWorkflowExecutionForReplication(task.getVersion()) - } else { - mutableState, err = context.LoadWorkflowExecution() - } + mutableState, err := context.LoadWorkflowExecution() switch err.(type) { case nil: // Sanity check to make only 3DC mutable state here diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 12142d454f9..85c28f34f44 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -71,7 +71,6 @@ type ( GetExecution() *commonpb.WorkflowExecution LoadWorkflowExecution() (MutableState, error) - LoadWorkflowExecutionForReplication(incomingVersion int64) (MutableState, error) LoadExecutionStats() (*persistencespb.ExecutionStats, error) Clear() @@ -250,77 +249,6 @@ func (c *ContextImpl) LoadExecutionStats() (*persistencespb.ExecutionStats, erro return c.stats, nil } -func (c *ContextImpl) LoadWorkflowExecutionForReplication( - incomingVersion int64, -) (MutableState, error) { - - namespaceEntry, err := c.shard.GetNamespaceRegistry().GetNamespaceByID(c.namespaceID) - if err != nil { - return nil, err - } - - if c.MutableState == nil { - response, err := getWorkflowExecutionWithRetry(c.shard, &persistence.GetWorkflowExecutionRequest{ - ShardID: c.shard.GetShardID(), - NamespaceID: c.namespaceID.String(), - WorkflowID: c.workflowExecution.GetWorkflowId(), - RunID: c.workflowExecution.GetRunId(), - }) - if err != nil { - return nil, err - } - - c.MutableState, err = newMutableStateBuilderFromDB( - c.shard, - c.shard.GetEventsCache(), - c.logger, - namespaceEntry, - response.State, - response.DBRecordVersion, - ) - if err != nil { - return nil, err - } - - c.stats = response.State.ExecutionInfo.ExecutionStats - } - - lastWriteVersion, err := c.MutableState.GetLastWriteVersion() - if err != nil { - return nil, err - } - - if lastWriteVersion == incomingVersion { - err = c.MutableState.StartTransactionSkipWorkflowTaskFail(namespaceEntry) - if err != nil { - return nil, err - } - } else { - flushBeforeReady, err := c.MutableState.StartTransaction(namespaceEntry) - if err != nil { - return nil, err - } - if !flushBeforeReady { - return c.MutableState, nil - } - - if err = c.UpdateWorkflowExecutionAsActive( - c.shard.GetTimeSource().Now(), - ); err != nil { - return nil, err - } - - flushBeforeReady, err = c.MutableState.StartTransaction(namespaceEntry) - if err != nil { - return nil, err - } - if flushBeforeReady { - return nil, serviceerror.NewInternal("Context counter flushBeforeReady status after loading mutable state from DB") - } - } - return c.MutableState, nil -} - func (c *ContextImpl) LoadWorkflowExecution() (MutableState, error) { namespaceEntry, err := c.shard.GetNamespaceRegistry().GetNamespaceByID(c.namespaceID) diff --git a/service/history/workflow/context_mock.go b/service/history/workflow/context_mock.go index c52d4477c41..a38de550f2b 100644 --- a/service/history/workflow/context_mock.go +++ b/service/history/workflow/context_mock.go @@ -189,21 +189,6 @@ func (mr *MockContextMockRecorder) LoadWorkflowExecution() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadWorkflowExecution", reflect.TypeOf((*MockContext)(nil).LoadWorkflowExecution)) } -// LoadWorkflowExecutionForReplication mocks base method. -func (m *MockContext) LoadWorkflowExecutionForReplication(incomingVersion int64) (MutableState, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LoadWorkflowExecutionForReplication", incomingVersion) - ret0, _ := ret[0].(MutableState) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// LoadWorkflowExecutionForReplication indicates an expected call of LoadWorkflowExecutionForReplication. -func (mr *MockContextMockRecorder) LoadWorkflowExecutionForReplication(incomingVersion interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadWorkflowExecutionForReplication", reflect.TypeOf((*MockContext)(nil).LoadWorkflowExecutionForReplication), incomingVersion) -} - // Lock mocks base method. func (m *MockContext) Lock(ctx context.Context, caller CallerType) error { m.ctrl.T.Helper()