Skip to content

Commit

Permalink
Merge pull request #1404 from bnb-chain/retry-backoff-policy
Browse files Browse the repository at this point in the history
 fix: use exponential backoff for task retry policy
  • Loading branch information
ruojunm committed May 24, 2024
2 parents fda8b27 + 6a61c48 commit 6128fdb
Showing 1 changed file with 100 additions and 1 deletion.
101 changes: 100 additions & 1 deletion modular/manager/task_retry_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/prysmaticlabs/prysm/crypto/bls"

"github.com/bnb-chain/greenfield-common/go/hash"
"github.com/bnb-chain/greenfield-storage-provider/base/gfspapp"
"github.com/bnb-chain/greenfield-storage-provider/base/gfsptqueue"
Expand All @@ -17,7 +19,6 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/store/sqldb"
"github.com/bnb-chain/greenfield-storage-provider/util"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/prysmaticlabs/prysm/crypto/bls"
)

const (
Expand All @@ -33,6 +34,10 @@ const (
// retryIntervalSecond is used to control rate limit which avoid too big pressure
// on other modules, such as db, chain, or other keypoint workflows.
retryIntervalSecond = 1 * time.Second

// maxLengthOfNextExecutionBackOffMap is used to define the max length of NextExecutionBackOffMap.
// When the length is exceeded, the map should be cleared in case the memory is leaked.
maxLengthOfNextExecutionBackOffMap = 100000
)

type RetryTaskType int32
Expand All @@ -43,10 +48,19 @@ const (
retryRejectUnseal RetryTaskType = 2
)

type TaskRetryInfo struct {
TaskKey string
TriedTimes int64
NextExecutionTime time.Time
}

// TaskRetryScheduler is used to schedule background task retry.
type TaskRetryScheduler struct {
manager *ManageModular
rejectUnsealThresholdSecond int64
replicateTaskBackOffMap map[string]*TaskRetryInfo
sealTaskBackOffMap map[string]*TaskRetryInfo
rejectUnsealTaskBackOffMap map[string]*TaskRetryInfo
}

// NewTaskRetryScheduler returns a task retry scheduler instance.
Expand All @@ -58,6 +72,9 @@ func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler {
return &TaskRetryScheduler{
manager: m,
rejectUnsealThresholdSecond: rejectUnsealThresholdSecond,
replicateTaskBackOffMap: make(map[string]*TaskRetryInfo),
sealTaskBackOffMap: make(map[string]*TaskRetryInfo),
rejectUnsealTaskBackOffMap: make(map[string]*TaskRetryInfo),
}
}

Expand All @@ -69,6 +86,22 @@ func (s *TaskRetryScheduler) Start() {
log.Info("task retry scheduler startup")
}

func (s *TaskRetryScheduler) resetReplicateTaskBackoffMap() {
if len(s.replicateTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.replicateTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}
func (s *TaskRetryScheduler) resetSealTaskBackoffMap() {
if len(s.sealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.sealTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}
func (s *TaskRetryScheduler) resetRejectUnsealTaskBackoffMap() {
if len(s.rejectUnsealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.rejectUnsealTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}

func (s *TaskRetryScheduler) startReplicateTaskRetry() {
var (
iter *TaskIterator
Expand All @@ -80,14 +113,36 @@ func (s *TaskRetryScheduler) startReplicateTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetReplicateTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryReplicate, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry replicate", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retryReplicate, iter.Value().ObjectID)
if _, existed := s.replicateTaskBackOffMap[taskKey]; !existed {
s.replicateTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.replicateTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry replicate task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retryReplicateTask(iter.Value())
s.replicateTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.replicateTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.replicateTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry replicate task",
"task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey])
} else {
delete(s.replicateTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry replicate task",
Expand All @@ -113,14 +168,36 @@ func (s *TaskRetryScheduler) startSealTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetSealTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retrySeal, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry seal", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retrySeal, iter.Value().ObjectID)
if _, existed := s.sealTaskBackOffMap[taskKey]; !existed {
s.sealTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.sealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry seal task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retrySealTask(iter.Value())
s.sealTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.sealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.sealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry seal task",
"task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey])
} else {
delete(s.sealTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry seal task",
Expand All @@ -146,14 +223,36 @@ func (s *TaskRetryScheduler) startRejectUnsealTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetRejectUnsealTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryRejectUnseal, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry reject unseal task", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retryRejectUnseal, iter.Value().ObjectID)
if _, existed := s.rejectUnsealTaskBackOffMap[taskKey]; !existed {
s.rejectUnsealTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry reject unseal task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retryRejectUnsealTask(iter.Value())
s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry reject unseal task",
"task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey])
} else {
delete(s.rejectUnsealTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry reject unseal task",
Expand Down

0 comments on commit 6128fdb

Please sign in to comment.