From 08f714d98d12ac090ef49a6adf736b694175a1c0 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 8 Jul 2022 23:17:46 +0000 Subject: [PATCH] introduce an API to set a mock clock --- .travis.yml | 2 +- go.mod | 4 ++- go.sum | 2 ++ meter.go | 13 ++++++---- mockclocktest/mock_clock_test.go | 43 ++++++++++++++++++++++++++++++++ sweeper.go | 23 ++++++++++++++--- 6 files changed, 77 insertions(+), 10 deletions(-) create mode 100644 go.sum create mode 100644 mockclocktest/mock_clock_test.go diff --git a/.travis.yml b/.travis.yml index a156d3e..67e608f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ os: language: go go: - - 1.13.x + - 1.17.x env: global: diff --git a/go.mod b/go.mod index e9b50d4..0c94703 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/libp2p/go-flow-metrics -go 1.12 +go 1.17 + +require github.com/benbjohnson/clock v1.3.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c284fd6 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/meter.go b/meter.go index b70593e..9ca79fb 100644 --- a/meter.go +++ b/meter.go @@ -42,11 +42,14 @@ type Meter struct { // Mark updates the total. func (m *Meter) Mark(count uint64) { - if count > 0 && atomic.AddUint64(&m.accumulator, count) == count { - // The accumulator is 0 so we probably need to register. We may - // already _be_ registered however, if we are, the registration - // loop will notice that `m.registered` is set and ignore us. - globalSweeper.Register(m) + if count > 0 { + n := atomic.AddUint64(&m.accumulator, count) + if n == count { + // The accumulator is 0 so we probably need to register. We may + // already _be_ registered however, if we are, the registration + // loop will notice that `m.registered` is set and ignore us. + globalSweeper.Register(m) + } } } diff --git a/mockclocktest/mock_clock_test.go b/mockclocktest/mock_clock_test.go new file mode 100644 index 0000000..4bf0899 --- /dev/null +++ b/mockclocktest/mock_clock_test.go @@ -0,0 +1,43 @@ +package mockclocktest + +import ( + "testing" + "time" + + "github.com/libp2p/go-flow-metrics" + + "github.com/benbjohnson/clock" +) + +var cl = clock.NewMock() + +func init() { + flow.SetClock(cl) +} + +func TestBasic(t *testing.T) { + m := new(flow.Meter) + for i := 0; i < 300; i++ { + m.Mark(1000) + cl.Add(40 * time.Millisecond) + } + if rate := m.Snapshot().Rate; rate != 25000 { + t.Errorf("expected rate 25000, got %f", rate) + } + + for i := 0; i < 200; i++ { + m.Mark(200) + cl.Add(40 * time.Millisecond) + } + + // Adjusts + if rate := m.Snapshot().Rate; rate != 5017.776503840969 { + t.Errorf("expected rate 5017.776503840969, got %f", rate) + } + + // Let it settle. + cl.Add(2 * time.Second) + if total := m.Snapshot().Total; total != 340000 { + t.Errorf("expected total 3400000, got %f", total) + } +} diff --git a/sweeper.go b/sweeper.go index ceed13f..7acb4b8 100644 --- a/sweeper.go +++ b/sweeper.go @@ -5,6 +5,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/benbjohnson/clock" ) // IdleRate the rate at which we declare a meter idle (and stop tracking it @@ -20,6 +22,14 @@ var alpha = 1 - math.Exp(-1.0) // The global sweeper. var globalSweeper sweeper +var globalClock clock.Clock + +// SetClock sets a clock to use in the sweeper. +// This will probably only ever be useful for testing purposes. +func SetClock(cl clock.Clock) { + globalClock = cl +} + type sweeper struct { sweepOnce sync.Once @@ -29,10 +39,17 @@ type sweeper struct { lastUpdateTime time.Time registerChannel chan *Meter + + clock clock.Clock } func (sw *sweeper) start() { sw.registerChannel = make(chan *Meter, 16) + if globalClock == nil { + sw.clock = clock.New() + } else { + sw.clock = globalClock + } go sw.run() } @@ -53,10 +70,10 @@ func (sw *sweeper) register(m *Meter) { } func (sw *sweeper) runActive() { - ticker := time.NewTicker(time.Second) + ticker := sw.clock.Ticker(time.Second) defer ticker.Stop() - sw.lastUpdateTime = time.Now() + sw.lastUpdateTime = sw.clock.Now() for len(sw.meters) > 0 { // Scale back allocation. if len(sw.meters)*2 < cap(sw.meters) { @@ -80,7 +97,7 @@ func (sw *sweeper) update() { sw.snapshotMu.Lock() defer sw.snapshotMu.Unlock() - now := time.Now() + now := sw.clock.Now() tdiff := now.Sub(sw.lastUpdateTime) if tdiff <= 0 { return