Skip to content

Commit

Permalink
make it generic
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Aug 8, 2024
1 parent 80e14fe commit 5537a8f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 39 deletions.
57 changes: 28 additions & 29 deletions logp/ratelimited.go → periodic/ratelimited.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,31 @@
// specific language governing permissions and limitations
// under the License.

package logp
package periodic

import (
"sync"
"sync/atomic"
"time"
)

// RateLimitedLogger is a logger that limits log messages to at most once within
// a specified period.
// It is intended for logging events that occur frequently, providing a summary
// with the number of occurrences within the given period.
// Doer limits an action to be executed at most once within a specified period.
// It is intended for managing events that occur frequently, but instead of an
// action being taken for every event, the action should be executed at most
// once within a given period of time.
//
// RateLimitedLogger takes a logger function, logFn, which is called every time
// the specified period has elapsed to log the summary.
type RateLimitedLogger struct {
// Doer takes a function to execute, doFn, which is called every time
// the specified period has elapsed with the number of events and the period.
type Doer struct {
count atomic.Uint64

period time.Duration

// logFn is called for logging, which receives the count of events and the
// duration since the last call.
logFn func(count uint64, d time.Duration)
lastLog time.Time
done chan struct{}
// doFn is called for executing the action every period if at least one
// event happened. It receives the count of events and the period.
doFn func(count uint64, d time.Duration)
lastDone time.Time
done chan struct{}

// nowFn is used to acquire the current time instead of time.Now so it can
// be mocked for tests.
Expand All @@ -53,35 +53,34 @@ type RateLimitedLogger struct {
ticker *time.Ticker
}

// NewRateLimited returns a new RateLimitedLogger. It takes a logFn, which is
// called with the count of events and the period between each call,
// and a period determining how often the log function should be called.
func NewRateLimited(period time.Duration, logFn func(count uint64, d time.Duration)) *RateLimitedLogger {
return &RateLimitedLogger{
// NewDoer returns a new Doer. It takes a doFn, which is
// called with the current count of events and the period.
func NewDoer(period time.Duration, doFn func(count uint64, d time.Duration)) *Doer {
return &Doer{
period: period,
logFn: logFn,
doFn: doFn,

nowFn: time.Now,
newTickerFn: time.NewTicker,
}
}

func (r *RateLimitedLogger) Add() {
func (r *Doer) Add() {
r.count.Add(1)
}

func (r *RateLimitedLogger) AddN(n uint64) {
func (r *Doer) AddN(n uint64) {
r.count.Add(n)
}

func (r *RateLimitedLogger) Start() {
func (r *Doer) Start() {
if r.started.Load() {
return
}

r.done = make(chan struct{})
r.started.Store(true)
r.lastLog = r.nowFn()
r.lastDone = r.nowFn()
r.ticker = r.newTickerFn(r.period)

r.wg.Add(1)
Expand All @@ -93,16 +92,16 @@ func (r *RateLimitedLogger) Start() {
for {
select {
case <-r.ticker.C:
r.log()
r.do()
case <-r.done:
r.log()
r.do()
return
}
}
}()
}

func (r *RateLimitedLogger) Stop() {
func (r *Doer) Stop() {
if !r.started.Load() {
return
}
Expand All @@ -112,10 +111,10 @@ func (r *RateLimitedLogger) Stop() {
r.started.Store(false)
}

func (r *RateLimitedLogger) log() {
func (r *Doer) do() {
count := r.count.Swap(0)
if count > 0 {
r.lastLog = r.nowFn()
r.logFn(count, r.period)
r.lastDone = r.nowFn()
r.doFn(count, r.period)
}
}
20 changes: 10 additions & 10 deletions logp/ratelimited_test.go → periodic/ratelimited_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package logp
package periodic

import (
"bytes"
Expand Down Expand Up @@ -65,19 +65,19 @@ func TestRateLimitedLogger(t *testing.T) {
now := time.Now()

t.Run("Start", func(t *testing.T) {
r := NewRateLimited(math.MaxInt64, func(count uint64, d time.Duration) {})
r := NewDoer(math.MaxInt64, func(count uint64, d time.Duration) {})
defer r.Stop()
r.nowFn = func() time.Time { return now }

r.Start()

assert.True(t, r.started.Load(),
"Start() was called, thus 'started' should be true")
assert.NotEmpty(t, r.lastLog, "lastLog should have been set")
assert.NotEmpty(t, r.lastDone, "lastDone should have been set")
})

t.Run("Start twice", func(t *testing.T) {
r := NewRateLimited(math.MaxInt64, func(count uint64, d time.Duration) {})
r := NewDoer(math.MaxInt64, func(count uint64, d time.Duration) {})
defer r.Stop()

r.nowFn = func() time.Time { return now }
Expand All @@ -86,7 +86,7 @@ func TestRateLimitedLogger(t *testing.T) {
r.nowFn = func() time.Time { return now.Add(time.Minute) }
r.Start()

assert.Equal(t, now, r.lastLog, "lastLog should have been set a second time")
assert.Equal(t, now, r.lastDone, "lastDone should have been set a second time")
})

t.Run("Stop", func(t *testing.T) {
Expand All @@ -101,7 +101,7 @@ func TestRateLimitedLogger(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
buff, logFn := newLogger()
r := NewRateLimited(42*time.Second, logFn)
r := NewDoer(42*time.Second, logFn)
r.nowFn = func() time.Time { return now }

tch := make(chan time.Time)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestRateLimitedLogger(t *testing.T) {

t.Run("Add", func(t *testing.T) {
buff, logFn := newLogger()
r := NewRateLimited(42*time.Second, logFn)
r := NewDoer(42*time.Second, logFn)
defer r.Stop()

r.nowFn = func() time.Time { return now }
Expand All @@ -157,14 +157,14 @@ func TestRateLimitedLogger(t *testing.T) {
logs = strings.TrimSpace(string(bs))

return len(strings.Split(logs, "\n")) == 1
}, time.Second, 100*time.Millisecond, "should have found 1 log")
}, time.Second, 100*time.Millisecond, "should have found 1 do")

assert.Contains(t, logs, fmt.Sprintf(pattern, 1, 42*time.Second))
})

t.Run("AddN", func(t *testing.T) {
buff, logFn := newLogger()
r := NewRateLimited(42*time.Second, logFn)
r := NewDoer(42*time.Second, logFn)
defer r.Stop()

r.nowFn = func() time.Time { return now }
Expand All @@ -187,7 +187,7 @@ func TestRateLimitedLogger(t *testing.T) {
logs = strings.TrimSpace(string(bs))

return len(strings.Split(logs, "\n")) == 1
}, time.Second, 100*time.Millisecond, "should have found 1 log")
}, time.Second, 100*time.Millisecond, "should have found 1 do")

assert.Contains(t, logs, fmt.Sprintf(pattern, 42, 42*time.Second))
})
Expand Down

0 comments on commit 5537a8f

Please sign in to comment.