From f498246e5b922c2c8a04e6c0d282b12293daa917 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 May 2021 15:54:07 -0400 Subject: [PATCH] some go timer fixes --- batched/system.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/batched/system.go b/batched/system.go index 751cfe2..0968fb3 100644 --- a/batched/system.go +++ b/batched/system.go @@ -116,15 +116,7 @@ func (s *BatchProvidingSystem) Run() { defer pauseDetectTimer.Stop() for { - // Reset timers - if !pauseDetectTimer.Stop() { - <-pauseDetectTimer.C - } pauseDetectTimer.Reset(maxTimeWaitingForProvides) - - if !maxCollectionDurationTimer.Stop() { - <-maxCollectionDurationTimer.C - } maxCollectionDurationTimer.Reset(maxCollectionDuration) performedReprovide := false @@ -133,7 +125,7 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) continue default: } @@ -141,14 +133,16 @@ func (s *BatchProvidingSystem) Run() { select { case c := <-provCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) case c := <-s.dynamicCh: m[c] = struct{}{} - pauseDetectTimer.Reset(pauseDetectionThreshold) + resetTimer(pauseDetectTimer, pauseDetectionThreshold) performedReprovide = true case <-pauseDetectTimer.C: + emptyTimer(maxCollectionDurationTimer) break loop case <-maxCollectionDurationTimer.C: + emptyTimer(pauseDetectTimer) break loop case <-s.ctx.Done(): return @@ -252,6 +246,19 @@ func (s *BatchProvidingSystem) Run() { }() } +func emptyTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func resetTimer(t *time.Timer, dur time.Duration) { + if !t.Stop() { + <-t.C + } + t.Reset(dur) +} + func storeTime(t time.Time) []byte { val := []byte(fmt.Sprintf("%d", t.UnixNano())) return val