Skip to content

Commit

Permalink
introduce an API to set a mock clock
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jul 8, 2022
1 parent e6db3c8 commit 08f714d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ os:
language: go

go:
- 1.13.x
- 1.17.x

env:
global:
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/libp2p/go-flow-metrics

go 1.12
go 1.17

require github.com/benbjohnson/clock v1.3.0
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
13 changes: 8 additions & 5 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ type Meter struct {

// Mark updates the total.
func (m *Meter) Mark(count uint64) {
if count > 0 && atomic.AddUint64(&m.accumulator, count) == count {
// The accumulator is 0 so we probably need to register. We may
// already _be_ registered however, if we are, the registration
// loop will notice that `m.registered` is set and ignore us.
globalSweeper.Register(m)
if count > 0 {
n := atomic.AddUint64(&m.accumulator, count)
if n == count {
// The accumulator is 0 so we probably need to register. We may
// already _be_ registered however, if we are, the registration
// loop will notice that `m.registered` is set and ignore us.
globalSweeper.Register(m)
}
}
}

Expand Down
43 changes: 43 additions & 0 deletions mockclocktest/mock_clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package mockclocktest

import (
"testing"
"time"

"github.com/libp2p/go-flow-metrics"

"github.com/benbjohnson/clock"
)

var cl = clock.NewMock()

func init() {
flow.SetClock(cl)
}

func TestBasic(t *testing.T) {
m := new(flow.Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
cl.Add(40 * time.Millisecond)
}
if rate := m.Snapshot().Rate; rate != 25000 {
t.Errorf("expected rate 25000, got %f", rate)
}

for i := 0; i < 200; i++ {
m.Mark(200)
cl.Add(40 * time.Millisecond)
}

// Adjusts
if rate := m.Snapshot().Rate; rate != 5017.776503840969 {
t.Errorf("expected rate 5017.776503840969, got %f", rate)
}

// Let it settle.
cl.Add(2 * time.Second)
if total := m.Snapshot().Total; total != 340000 {
t.Errorf("expected total 3400000, got %f", total)
}
}
23 changes: 20 additions & 3 deletions sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
Expand All @@ -20,6 +22,14 @@ var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper

var globalClock clock.Clock

// SetClock sets a clock to use in the sweeper.
// This will probably only ever be useful for testing purposes.
func SetClock(cl clock.Clock) {
globalClock = cl
}

type sweeper struct {
sweepOnce sync.Once

Expand All @@ -29,10 +39,17 @@ type sweeper struct {

lastUpdateTime time.Time
registerChannel chan *Meter

clock clock.Clock
}

func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
if globalClock == nil {
sw.clock = clock.New()
} else {
sw.clock = globalClock
}
go sw.run()
}

Expand All @@ -53,10 +70,10 @@ func (sw *sweeper) register(m *Meter) {
}

func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
ticker := sw.clock.Ticker(time.Second)
defer ticker.Stop()

sw.lastUpdateTime = time.Now()
sw.lastUpdateTime = sw.clock.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
Expand All @@ -80,7 +97,7 @@ func (sw *sweeper) update() {
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()

now := time.Now()
now := sw.clock.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
Expand Down

0 comments on commit 08f714d

Please sign in to comment.