Skip to content

Commit

Permalink
add basic rate limit to force replication workflow (#2364)
Browse files Browse the repository at this point in the history
* Add RPS to force replication workflow

* fix burst
  • Loading branch information
yiminc authored Jan 12, 2022
1 parent b3c8586 commit dc208e9
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions service/worker/migration/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
)

const (
Expand All @@ -64,6 +65,7 @@ type (
SkipAfterTime time.Time // skip workflows that are updated after this time
ConcurrentActivityCount int32
RemoteCluster string // remote cluster name
RpsPerActivity int // RPS per each activity
}

NamespaceHandoverParams struct {
Expand All @@ -87,10 +89,11 @@ type (
}

genReplicationForShardRange struct {
BeginShardID int32 // inclusive
EndShardID int32 // inclusive
NamespaceID string // only generate replication tasks for workflows in this namespace
SkipAfterTime time.Time // skip workflows whose LastUpdateTime is after this time
BeginShardID int32 // inclusive
EndShardID int32 // inclusive
NamespaceID string // only generate replication tasks for workflows in this namespace
SkipAfterTime time.Time // skip workflows whose LastUpdateTime is after this time
RpsPerActivity int // RPS per activity
}

genReplicationForShard struct {
Expand All @@ -99,6 +102,7 @@ type (
SkipAfterTime time.Time
PageToken []byte
Index int
RPS int
}

heartbeatProgress struct {
Expand Down Expand Up @@ -159,6 +163,9 @@ func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParam
if params.ConcurrentActivityCount <= 0 {
params.ConcurrentActivityCount = 1
}
if params.RpsPerActivity <= 0 {
params.RpsPerActivity = 1
}

retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
Expand Down Expand Up @@ -201,10 +208,11 @@ func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParam
endShardID = shardCount
}
rangeRequest := genReplicationForShardRange{
BeginShardID: beginShardID,
EndShardID: endShardID,
NamespaceID: metadataResp.NamespaceID,
SkipAfterTime: skipAfter,
BeginShardID: beginShardID,
EndShardID: endShardID,
NamespaceID: metadataResp.NamespaceID,
SkipAfterTime: skipAfter,
RpsPerActivity: params.RpsPerActivity,
}
future := workflow.ExecuteActivity(ctx2, a.GenerateReplicationTasks, rangeRequest)
futures = append(futures, future)
Expand Down Expand Up @@ -348,6 +356,7 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request genRe
ShardID: request.BeginShardID,
NamespaceID: request.NamespaceID,
SkipAfterTime: request.SkipAfterTime,
RPS: request.RpsPerActivity,
}
var progress heartbeatProgress
if activity.HasHeartbeatDetails(ctx) {
Expand Down Expand Up @@ -527,6 +536,8 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
func (a *activities) genReplicationTasks(ctx context.Context, request genReplicationForShard) error {
pageToken := request.PageToken
startIndex := request.Index
rateLimiter := quotas.NewRateLimiter(float64(request.RPS), request.RPS)

for {
var listResult *persistence.ListConcreteExecutionsResponse
op := func(ctx context.Context) error {
Expand All @@ -539,6 +550,7 @@ func (a *activities) genReplicationTasks(ctx context.Context, request genReplica
return err
}

rateLimiter.Wait(ctx)
err := backoff.RetryContext(ctx, op, persistenceRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return err
Expand All @@ -560,6 +572,7 @@ func (a *activities) genReplicationTasks(ctx context.Context, request genReplica
// skip if not target namespace
continue
}
rateLimiter.Wait(ctx)
err := a.genReplicationTaskForOneWorkflow(ctx, definition.NewWorkflowKey(request.NamespaceID, ms.ExecutionInfo.WorkflowId, ms.ExecutionState.RunId))
if err != nil {
return err
Expand Down

0 comments on commit dc208e9

Please sign in to comment.