Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
some go timer fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed May 12, 2021
1 parent 4407f46 commit f498246
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions batched/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,7 @@ func (s *BatchProvidingSystem) Run() {
defer pauseDetectTimer.Stop()

for {
// Reset timers
if !pauseDetectTimer.Stop() {
<-pauseDetectTimer.C
}
pauseDetectTimer.Reset(maxTimeWaitingForProvides)

if !maxCollectionDurationTimer.Stop() {
<-maxCollectionDurationTimer.C
}
maxCollectionDurationTimer.Reset(maxCollectionDuration)

performedReprovide := false
Expand All @@ -133,22 +125,24 @@ func (s *BatchProvidingSystem) Run() {
select {
case c := <-provCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
resetTimer(pauseDetectTimer, pauseDetectionThreshold)
continue
default:
}

select {
case c := <-provCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
resetTimer(pauseDetectTimer, pauseDetectionThreshold)
case c := <-s.dynamicCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
resetTimer(pauseDetectTimer, pauseDetectionThreshold)
performedReprovide = true
case <-pauseDetectTimer.C:
emptyTimer(maxCollectionDurationTimer)
break loop
case <-maxCollectionDurationTimer.C:
emptyTimer(pauseDetectTimer)
break loop
case <-s.ctx.Done():
return
Expand Down Expand Up @@ -252,6 +246,19 @@ func (s *BatchProvidingSystem) Run() {
}()
}

func emptyTimer(t *time.Timer) {
if !t.Stop() {
<-t.C
}
}

func resetTimer(t *time.Timer, dur time.Duration) {
if !t.Stop() {
<-t.C
}
t.Reset(dur)
}

func storeTime(t time.Time) []byte {
val := []byte(fmt.Sprintf("%d", t.UnixNano()))
return val
Expand Down

0 comments on commit f498246

Please sign in to comment.