Skip to content

Commit

Permalink
Remove hacky force flush buffer check (#2439)
Browse files Browse the repository at this point in the history
* Remove hacky force flush buffer bypass for XDC if incoming version -> current cluster
NOTE: previously, this hacky bypass was added to allow workflow resend back to original
cluster due to local DB dataloss. Removing this hacky bypass due to breaking basic NDC
assumptions.
  • Loading branch information
wxing1292 authored Jan 31, 2022
1 parent 31dc2f8 commit 8c30d1d
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 108 deletions.
4 changes: 0 additions & 4 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ var Keys = map[Key]string{
MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability",
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",
StandbyTaskReReplicationContextTimeout: "history.standbyTaskReReplicationContextTimeout",
EnableDropStuckTaskByNamespaceID: "history.DropStuckTaskByNamespace",
SkipReapplicationByNamespaceID: "history.SkipReapplicationByNamespaceID",
Expand Down Expand Up @@ -950,9 +949,6 @@ const (
// MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded
MutableStateChecksumInvalidateBefore

// ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster
ReplicationEventsFromCurrentCluster

// StandbyTaskReReplicationContextTimeout is the context timeout for standby task re-replication
StandbyTaskReReplicationContextTimeout

Expand Down
4 changes: 1 addition & 3 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ type Config struct {
MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithNamespaceFilter
MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn

// Crocess DC Replication configuration
ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithNamespaceFilter
// NDC Replication configuration
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithNamespaceIDFilter

SkipReapplicationByNamespaceID dynamicconfig.BoolPropertyFnWithNamespaceIDFilter
Expand Down Expand Up @@ -416,7 +415,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateChecksumVerifyProbability, 0),
MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0),

ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByNamespaceID(dynamicconfig.StandbyTaskReReplicationContextTimeout, 3*time.Minute),

SkipReapplicationByNamespaceID: dc.GetBoolPropertyFnWithNamespaceIDFilter(dynamicconfig.SkipReapplicationByNamespaceID, false),
Expand Down
15 changes: 1 addition & 14 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,7 @@ func (r *nDCHistoryReplicatorImpl) applyEvents(
default:
// apply events, other than simple start workflow execution
// the continue as new + start workflow execution combination will also be processed here
var mutableState workflow.MutableState
var err error
namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(context.GetNamespaceID())
if err != nil {
return err
}

if r.shard.GetConfig().ReplicationEventsFromCurrentCluster(namespaceEntry.Name().String()) {
// this branch is used when replicating events (generated from current cluster)from remote cluster to current cluster.
// this could happen when the events are lost in current cluster and plan to recover them from remote cluster.
mutableState, err = context.LoadWorkflowExecutionForReplication(task.getVersion())
} else {
mutableState, err = context.LoadWorkflowExecution()
}
mutableState, err := context.LoadWorkflowExecution()
switch err.(type) {
case nil:
// Sanity check to make only 3DC mutable state here
Expand Down
72 changes: 0 additions & 72 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type (
GetExecution() *commonpb.WorkflowExecution

LoadWorkflowExecution() (MutableState, error)
LoadWorkflowExecutionForReplication(incomingVersion int64) (MutableState, error)
LoadExecutionStats() (*persistencespb.ExecutionStats, error)
Clear()

Expand Down Expand Up @@ -250,77 +249,6 @@ func (c *ContextImpl) LoadExecutionStats() (*persistencespb.ExecutionStats, erro
return c.stats, nil
}

func (c *ContextImpl) LoadWorkflowExecutionForReplication(
incomingVersion int64,
) (MutableState, error) {

namespaceEntry, err := c.shard.GetNamespaceRegistry().GetNamespaceByID(c.namespaceID)
if err != nil {
return nil, err
}

if c.MutableState == nil {
response, err := getWorkflowExecutionWithRetry(c.shard, &persistence.GetWorkflowExecutionRequest{
ShardID: c.shard.GetShardID(),
NamespaceID: c.namespaceID.String(),
WorkflowID: c.workflowExecution.GetWorkflowId(),
RunID: c.workflowExecution.GetRunId(),
})
if err != nil {
return nil, err
}

c.MutableState, err = newMutableStateBuilderFromDB(
c.shard,
c.shard.GetEventsCache(),
c.logger,
namespaceEntry,
response.State,
response.DBRecordVersion,
)
if err != nil {
return nil, err
}

c.stats = response.State.ExecutionInfo.ExecutionStats
}

lastWriteVersion, err := c.MutableState.GetLastWriteVersion()
if err != nil {
return nil, err
}

if lastWriteVersion == incomingVersion {
err = c.MutableState.StartTransactionSkipWorkflowTaskFail(namespaceEntry)
if err != nil {
return nil, err
}
} else {
flushBeforeReady, err := c.MutableState.StartTransaction(namespaceEntry)
if err != nil {
return nil, err
}
if !flushBeforeReady {
return c.MutableState, nil
}

if err = c.UpdateWorkflowExecutionAsActive(
c.shard.GetTimeSource().Now(),
); err != nil {
return nil, err
}

flushBeforeReady, err = c.MutableState.StartTransaction(namespaceEntry)
if err != nil {
return nil, err
}
if flushBeforeReady {
return nil, serviceerror.NewInternal("Context counter flushBeforeReady status after loading mutable state from DB")
}
}
return c.MutableState, nil
}

func (c *ContextImpl) LoadWorkflowExecution() (MutableState, error) {

namespaceEntry, err := c.shard.GetNamespaceRegistry().GetNamespaceByID(c.namespaceID)
Expand Down
15 changes: 0 additions & 15 deletions service/history/workflow/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8c30d1d

Please sign in to comment.