Skip to content

Commit

Permalink
pkg/timeutil: add Ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jun 11, 2024
1 parent 2cd3e10 commit 5921ad3
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 30 deletions.
39 changes: 39 additions & 0 deletions pkg/services/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package services

import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// DefaultJitter is +/-10%
const DefaultJitter timeutil.JitterPct = 0.1

// NewTicker returns a new timeutil.Ticker configured to:
// - fire the first tick immediately
// - apply DefaultJitter to each period
func NewTicker(period time.Duration) *timeutil.Ticker {
return TickerConfig{JitterPct: DefaultJitter}.NewTicker(period)
}

type TickerConfig struct {
// Initial delay before the first tick.
Initial time.Duration
// JitterPct to apply to each period.
JitterPct timeutil.JitterPct
}

func (c TickerConfig) NewTicker(period time.Duration) *timeutil.Ticker {
first := true
return timeutil.NewTicker(func() time.Duration {
if first {
first = false
return c.Initial
}
p := period
if c.JitterPct != 0.0 {
p = c.JitterPct.Apply(p)
}
return p
})
}
22 changes: 22 additions & 0 deletions pkg/timeutil/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package timeutil

import (
mrand "math/rand"
"time"
)

// JitterPct is a percent by which to scale a duration up or down.
// For example, 0.1 will result in +/- 10%.
type JitterPct float64

func (p JitterPct) Apply(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
ub := max(1, int(float64(d.Abs())*float64(p)))
// #nosec - non critical randomness
jitter := mrand.Intn(2*ub) - ub
return time.Duration(int(d) + jitter)
}
29 changes: 29 additions & 0 deletions pkg/timeutil/jitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package timeutil

import (
"testing"
"time"
)

func TestJitterPct(t *testing.T) {
for _, tt := range []struct {
pct JitterPct
dur time.Duration
from, to time.Duration
}{
{0.1, 0, 0, 0},
{0.1, time.Second, 900 * time.Millisecond, 1100 * time.Millisecond},
{0.1, time.Minute, 54 * time.Second, 66 * time.Second},
{0.1, 24 * time.Hour, 21*time.Hour + 36*time.Minute, 26*time.Hour + 24*time.Minute},
} {
t.Run(tt.dur.String(), func(t *testing.T) {
for i := 0; i < 100; i++ {
got := tt.pct.Apply(tt.dur)
t.Logf("%d: %s", i, got)
if got < tt.from || got > tt.to {
t.Errorf("expected duration %s with jitter to be between (%s, %s) but got: %s", tt.dur, tt.from, tt.to, got)
}
}
})
}
}
58 changes: 58 additions & 0 deletions pkg/timeutil/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package timeutil

import (
"time"
)

// Ticker is like time.Ticker, but with a variable period.
type Ticker struct {
C <-chan time.Time
stop chan struct{}
reset chan struct{}
}

// NewTicker returns a started Ticker which calls nextDur for each period.
// Ticker.Stop should be called to prevent goroutine leaks.
func NewTicker(nextDur func() time.Duration) *Ticker {
c := make(chan time.Time) // unbuffered so we block and delay if not being handled
t := Ticker{C: c, stop: make(chan struct{}), reset: make(chan struct{})}
go t.run(c, nextDur)
return &t
}

// Stop permanently stops the Ticker. It cannot be Reset.
func (t *Ticker) Stop() { close(t.stop) }

func (t *Ticker) run(c chan<- time.Time, nextDur func() time.Duration) {
for {
timer := time.NewTimer(nextDur())
select {
case <-t.stop:
timer.Stop()
return

case <-t.reset:
timer.Stop()

case <-timer.C:
timer.Stop()
select {
case <-t.stop:
return
case c <- time.Now():
case <-t.reset:
}
}
}
}

// Reset starts a new period.
func (t *Ticker) Reset() {
select {
case <-t.stop:
case t.reset <- struct{}{}:
default:
// unnecessary
return
}
}
24 changes: 9 additions & 15 deletions pkg/utils/mailbox/mailbox_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
)

var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand All @@ -30,33 +29,26 @@ type Monitor struct {
lggr logger.Logger

mailboxes sync.Map
stop func()
stopCh services.StopChan
done chan struct{}
}

func NewMonitor(appID string, lggr logger.Logger) *Monitor {
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor")}
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor"), stopCh: make(services.StopChan), done: make(chan struct{})}
}

func (m *Monitor) Name() string { return m.lggr.Name() }

func (m *Monitor) Start(context.Context) error {
return m.StartOnce("Monitor", func() error {
t := time.NewTicker(utils.WithJitter(mailboxPromInterval))
ctx, cancel := context.WithCancel(context.Background())
m.stop = func() {
t.Stop()
cancel()
}
m.done = make(chan struct{})
go m.monitorLoop(ctx, t.C)
go m.monitorLoop()
return nil
})
}

func (m *Monitor) Close() error {
return m.StopOnce("Monitor", func() error {
m.stop()
close(m.stopCh)
<-m.done
return nil
})
Expand All @@ -66,13 +58,15 @@ func (m *Monitor) HealthReport() map[string]error {
return map[string]error{m.Name(): m.Healthy()}
}

func (m *Monitor) monitorLoop(ctx context.Context, c <-chan time.Time) {
func (m *Monitor) monitorLoop() {
defer close(m.done)
t := services.NewTicker(mailboxPromInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
case <-m.stopCh:
return
case <-c:
case <-t.C:
m.mailboxes.Range(func(k, v any) bool {
name, mb := k.(string), v.(mailbox)
c, p := mb.load()
Expand Down
19 changes: 4 additions & 15 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,16 @@ package utils
import (
"context"
"fmt"
"math"
mrand "math/rand"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// WithJitter adds +/- 10% to a duration
func WithJitter(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
max := math.Max(float64(d.Abs())/5.0, 1.)
// #nosec - non critical randomness
jitter := mrand.Intn(int(max))
jitter = jitter - (jitter / 2)
return time.Duration(int(d) + jitter)
}
// WithJitter adds +/- 10% to a duration.
// Deprecated: use timeutil.WithJitter
func WithJitter(d time.Duration) time.Duration { return timeutil.JitterPct(0.1).Apply(d) }

// ContextFromChan creates a context that finishes when the provided channel
// receives or is closed.
Expand Down

0 comments on commit 5921ad3

Please sign in to comment.