From 6a61c484f02429665275c6b1942c632afbed6375 Mon Sep 17 00:00:00 2001 From: Ruojun Meng Date: Wed, 22 May 2024 17:26:55 +0800 Subject: [PATCH] fix: use exponential backoff for task retry policy --- modular/manager/task_retry_scheduler.go | 101 +++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/modular/manager/task_retry_scheduler.go b/modular/manager/task_retry_scheduler.go index b1215551e..b9bc3bfaa 100644 --- a/modular/manager/task_retry_scheduler.go +++ b/modular/manager/task_retry_scheduler.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/prysmaticlabs/prysm/crypto/bls" + "github.com/bnb-chain/greenfield-common/go/hash" "github.com/bnb-chain/greenfield-storage-provider/base/gfspapp" "github.com/bnb-chain/greenfield-storage-provider/base/gfsptqueue" @@ -17,7 +19,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" "github.com/bnb-chain/greenfield-storage-provider/util" storagetypes "github.com/bnb-chain/greenfield/x/storage/types" - "github.com/prysmaticlabs/prysm/crypto/bls" ) const ( @@ -33,6 +34,10 @@ const ( // retryIntervalSecond is used to control rate limit which avoid too big pressure // on other modules, such as db, chain, or other keypoint workflows. retryIntervalSecond = 1 * time.Second + + // maxLengthOfNextExecutionBackOffMap is used to define the max length of NextExecutionBackOffMap. + // When the length is exceeded, the map should be cleared in case the memory is leaked. + maxLengthOfNextExecutionBackOffMap = 100000 ) type RetryTaskType int32 @@ -43,10 +48,19 @@ const ( retryRejectUnseal RetryTaskType = 2 ) +type TaskRetryInfo struct { + TaskKey string + TriedTimes int64 + NextExecutionTime time.Time +} + // TaskRetryScheduler is used to schedule background task retry. type TaskRetryScheduler struct { manager *ManageModular rejectUnsealThresholdSecond int64 + replicateTaskBackOffMap map[string]*TaskRetryInfo + sealTaskBackOffMap map[string]*TaskRetryInfo + rejectUnsealTaskBackOffMap map[string]*TaskRetryInfo } // NewTaskRetryScheduler returns a task retry scheduler instance. @@ -58,6 +72,9 @@ func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler { return &TaskRetryScheduler{ manager: m, rejectUnsealThresholdSecond: rejectUnsealThresholdSecond, + replicateTaskBackOffMap: make(map[string]*TaskRetryInfo), + sealTaskBackOffMap: make(map[string]*TaskRetryInfo), + rejectUnsealTaskBackOffMap: make(map[string]*TaskRetryInfo), } } @@ -69,6 +86,22 @@ func (s *TaskRetryScheduler) Start() { log.Info("task retry scheduler startup") } +func (s *TaskRetryScheduler) resetReplicateTaskBackoffMap() { + if len(s.replicateTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap { + s.replicateTaskBackOffMap = make(map[string]*TaskRetryInfo) + } +} +func (s *TaskRetryScheduler) resetSealTaskBackoffMap() { + if len(s.sealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap { + s.sealTaskBackOffMap = make(map[string]*TaskRetryInfo) + } +} +func (s *TaskRetryScheduler) resetRejectUnsealTaskBackoffMap() { + if len(s.rejectUnsealTaskBackOffMap) > maxLengthOfNextExecutionBackOffMap { + s.rejectUnsealTaskBackOffMap = make(map[string]*TaskRetryInfo) + } +} + func (s *TaskRetryScheduler) startReplicateTaskRetry() { var ( iter *TaskIterator @@ -80,14 +113,36 @@ func (s *TaskRetryScheduler) startReplicateTaskRetry() { for { time.Sleep(retryIntervalSecond * 100) + s.resetReplicateTaskBackoffMap() iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryReplicate, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry replicate", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) for iter.Valid() { time.Sleep(retryIntervalSecond) + taskKey := fmt.Sprintf("%d_%d", retryReplicate, iter.Value().ObjectID) + if _, existed := s.replicateTaskBackOffMap[taskKey]; !existed { + s.replicateTaskBackOffMap[taskKey] = &TaskRetryInfo{ + TaskKey: taskKey, + TriedTimes: 0, + NextExecutionTime: time.Now(), + } + } + if s.replicateTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map + log.Infow("skip for retry replicate task, as the nextExecutionTime is in the future", + "task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey]) + continue + } // ignore retry error, this task can be retried in next loop. err = s.retryReplicateTask(iter.Value()) + s.replicateTaskBackOffMap[taskKey].TriedTimes += 1 + if err != nil { + s.replicateTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.replicateTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond) + log.Infow("reset backoff execution time for retry replicate task", + "task", iter.Value(), "TaskRetryInfo", s.replicateTaskBackOffMap[taskKey]) + } else { + delete(s.replicateTaskBackOffMap, taskKey) + } currentLoopRetryNumber++ totalRetryNumber++ log.Infow("retry replicate task", @@ -113,14 +168,36 @@ func (s *TaskRetryScheduler) startSealTaskRetry() { for { time.Sleep(retryIntervalSecond * 100) + s.resetSealTaskBackoffMap() iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retrySeal, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry seal", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) for iter.Valid() { time.Sleep(retryIntervalSecond) + taskKey := fmt.Sprintf("%d_%d", retrySeal, iter.Value().ObjectID) + if _, existed := s.sealTaskBackOffMap[taskKey]; !existed { + s.sealTaskBackOffMap[taskKey] = &TaskRetryInfo{ + TaskKey: taskKey, + TriedTimes: 0, + NextExecutionTime: time.Now(), + } + } + if s.sealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map + log.Infow("skip for retry seal task, as the nextExecutionTime is in the future", + "task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey]) + continue + } // ignore retry error, this task can be retried in next loop. err = s.retrySealTask(iter.Value()) + s.sealTaskBackOffMap[taskKey].TriedTimes += 1 + if err != nil { + s.sealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.sealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond) + log.Infow("reset backoff execution time for retry seal task", + "task", iter.Value(), "TaskRetryInfo", s.sealTaskBackOffMap[taskKey]) + } else { + delete(s.sealTaskBackOffMap, taskKey) + } currentLoopRetryNumber++ totalRetryNumber++ log.Infow("retry seal task", @@ -146,14 +223,36 @@ func (s *TaskRetryScheduler) startRejectUnsealTaskRetry() { for { time.Sleep(retryIntervalSecond * 100) + s.resetRejectUnsealTaskBackoffMap() iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryRejectUnseal, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry reject unseal task", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) for iter.Valid() { time.Sleep(retryIntervalSecond) + taskKey := fmt.Sprintf("%d_%d", retryRejectUnseal, iter.Value().ObjectID) + if _, existed := s.rejectUnsealTaskBackOffMap[taskKey]; !existed { + s.rejectUnsealTaskBackOffMap[taskKey] = &TaskRetryInfo{ + TaskKey: taskKey, + TriedTimes: 0, + NextExecutionTime: time.Now(), + } + } + if s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime.After(time.Now()) { // skip this task as it can only be executed after the "nextExecutionTime" stored in the map + log.Infow("skip for retry reject unseal task, as the nextExecutionTime is in the future", + "task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey]) + continue + } // ignore retry error, this task can be retried in next loop. err = s.retryRejectUnsealTask(iter.Value()) + s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes += 1 + if err != nil { + s.rejectUnsealTaskBackOffMap[taskKey].NextExecutionTime = time.Now().Add((2 << s.rejectUnsealTaskBackOffMap[taskKey].TriedTimes) * retryIntervalSecond) + log.Infow("reset backoff execution time for retry reject unseal task", + "task", iter.Value(), "TaskRetryInfo", s.rejectUnsealTaskBackOffMap[taskKey]) + } else { + delete(s.rejectUnsealTaskBackOffMap, taskKey) + } currentLoopRetryNumber++ totalRetryNumber++ log.Infow("retry reject unseal task",