Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use exponential backoff for task retry policy #1404

Merged
merged 1 commit into from
May 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion modular/manager/task_retry_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/prysmaticlabs/prysm/crypto/bls"

"github.com/bnb-chain/greenfield-common/go/hash"
"github.com/bnb-chain/greenfield-storage-provider/base/gfspapp"
"github.com/bnb-chain/greenfield-storage-provider/base/gfsptqueue"
Expand All @@ -17,7 +19,6 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/store/sqldb"
"github.com/bnb-chain/greenfield-storage-provider/util"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/prysmaticlabs/prysm/crypto/bls"
)

const (
Expand All @@ -33,6 +34,10 @@ const (
// retryIntervalSecond is used to control rate limit which avoid too big pressure
// on other modules, such as db, chain, or other keypoint workflows.
retryIntervalSecond = 1 * time.Second

// maxLengthOfNextExecutionBackOffMap is used to define the max length of NextExecutionBackOffMap.
// When the length is exceeded, the map should be cleared in case the memory is leaked.
maxLengthOfNextExecutionBackOffMap = 100000
)

type RetryTaskType int32
Expand All @@ -43,10 +48,19 @@ const (
retryRejectUnseal RetryTaskType = 2
)

type TaskRetryInfo struct {
TaskKey string
TriedTimes int64
NextExecutionTime time.Time
}

// TaskRetryScheduler is used to schedule background task retry.
type TaskRetryScheduler struct {
manager *ManageModular
rejectUnsealThresholdSecond int64
replicateTaskBackOffMap map[string]*TaskRetryInfo
sealTaskBackOffMap map[string]*TaskRetryInfo
rejectUnsealTaskBackOffMap map[string]*TaskRetryInfo
}

// NewTaskRetryScheduler returns a task retry scheduler instance.
Expand All @@ -58,6 +72,9 @@ func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler {
return &TaskRetryScheduler{
manager: m,
rejectUnsealThresholdSecond: rejectUnsealThresholdSecond,
replicateTaskBackOffMap: make(map[string]*TaskRetryInfo),
sealTaskBackOffMap: make(map[string]*TaskRetryInfo),
rejectUnsealTaskBackOffMap: make(map[string]*TaskRetryInfo),
}
}

Expand All @@ -69,6 +86,22 @@ func (s *TaskRetryScheduler) Start() {
log.Info("task retry scheduler startup")
}

func (s *TaskRetryScheduler) resetReplicateTaskBackoffMap() {
if len(s.replicateTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.replicateTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}
func (s *TaskRetryScheduler) resetSealTaskBackoffMap() {
if len(s.sealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.sealTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}
func (s *TaskRetryScheduler) resetRejectUnsealTaskBackoffMap() {
if len(s.rejectUnsealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap {
s.rejectUnsealTaskBackOffMap = make(map[string]*TaskRetryInfo)
}
}

func (s *TaskRetryScheduler) startReplicateTaskRetry() {
var (
iter *TaskIterator
Expand All @@ -80,14 +113,36 @@ func (s *TaskRetryScheduler) startReplicateTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetReplicateTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryReplicate, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry replicate", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retryReplicate, iter.Value().ObjectID)
if _, existed := s.replicateTaskBackOffMap[taskKey]; !existed {
s.replicateTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.replicateTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry replicate task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retryReplicateTask(iter.Value())
s.replicateTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.replicateTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.replicateTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry replicate task",
"task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey])
} else {
delete(s.replicateTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry replicate task",
Expand All @@ -113,14 +168,36 @@ func (s *TaskRetryScheduler) startSealTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetSealTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retrySeal, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry seal", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retrySeal, iter.Value().ObjectID)
if _, existed := s.sealTaskBackOffMap[taskKey]; !existed {
s.sealTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.sealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry seal task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retrySealTask(iter.Value())
s.sealTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.sealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.sealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry seal task",
"task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey])
} else {
delete(s.sealTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry seal task",
Expand All @@ -146,14 +223,36 @@ func (s *TaskRetryScheduler) startRejectUnsealTaskRetry() {

for {
time.Sleep(retryIntervalSecond * 100)
s.resetRejectUnsealTaskBackoffMap()
iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryRejectUnseal, s.rejectUnsealThresholdSecond)
log.Infow("start a new loop to retry reject unseal task", "iterator", iter,
"loop_number", loopNumber, "total_retry_number", totalRetryNumber)

for iter.Valid() {
time.Sleep(retryIntervalSecond)
taskKey := fmt.Sprintf("%d_%d", retryRejectUnseal, iter.Value().ObjectID)
if _, existed := s.rejectUnsealTaskBackOffMap[taskKey]; !existed {
s.rejectUnsealTaskBackOffMap[taskKey] = &TaskRetryInfo{
TaskKey: taskKey,
TriedTimes: 0,
NextExecutionTime: time.Now(),
}
}
if s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map
log.Infow("skip for retry reject unseal task, as the nextExecutionTime is in the future",
"task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey])
continue
}
// ignore retry error, this task can be retried in next loop.
err = s.retryRejectUnsealTask(iter.Value())
s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes += 1
if err != nil {
s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond)
log.Infow("reset backoff execution time for retry reject unseal task",
"task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey])
} else {
delete(s.rejectUnsealTaskBackOffMap, taskKey)
}
currentLoopRetryNumber++
totalRetryNumber++
log.Infow("retry reject unseal task",
Expand Down
Loading