Skip to content

Commit

Permalink
Add rate limiter option to Cassandra writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Isaac Hier committed Oct 26, 2018
1 parent 4a07b78 commit 8dcca83
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 1 deletion.
98 changes: 98 additions & 0 deletions internal/utils/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"errors"
"sync"
"time"
)

var (
ErrInvalidCreditsPerSecond = errors.New("invalid credits per second, must be greater than zero")
ErrInvalidMaxBalance = errors.New("invalid max balance, must be greater than zero")
)

// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits.
type RateLimiter interface {
CheckCredit(itemCost float64) (bool, time.Duration)
}

type rateLimiter struct {
sync.Mutex

creditsPerSecond float64
balance float64
maxBalance float64
lastTick time.Time

timeNow func() time.Time
}

// NewRateLimiter creates a new rate limiter based on leaky bucket algorithm, formulated in terms of a
// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional
// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost
// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased"
// and the balance reduced, indicated by returned value of true with a wait time of zero. Otherwise the balance is
// unchanged and return false with the time until the next credit accrues.
//
// This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the
// max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message
// to determine if the message is within the rate limit.
//
// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput
// as bytes/second, and calling CheckCredit() with the actual message size.
func NewRateLimiter(creditsPerSecond, maxBalance float64) (RateLimiter, error) {
if creditsPerSecond < 0 {
return nil, ErrInvalidCreditsPerSecond
}
if maxBalance < 0 {
return nil, ErrInvalidMaxBalance
}
return &rateLimiter{
creditsPerSecond: creditsPerSecond,
balance: maxBalance,
maxBalance: maxBalance,
lastTick: time.Now(),
timeNow: time.Now,
}, nil
}

func (b *rateLimiter) CheckCredit(itemCost float64) (bool, time.Duration) {
b.Lock()
defer b.Unlock()
b.updateBalance()
// if we have enough credits to pay for current item, then reduce balance and allow
if b.balance >= itemCost {
b.balance -= itemCost
return true, 0
}
creditsRemaining := itemCost - b.balance
waitTime := time.Nanosecond * time.Duration(creditsRemaining*float64(time.Second.Nanoseconds())/b.creditsPerSecond)
return false, waitTime
}

// N.B. Must be called while holding the lock.
func (b *rateLimiter) updateBalance() {
// calculate how much time passed since the last tick, and update current tick
currentTime := b.timeNow()
elapsedTime := currentTime.Sub(b.lastTick)
b.lastTick = currentTime
// calculate how much credit have we accumulated since the last tick
b.balance += elapsedTime.Seconds() * b.creditsPerSecond
if b.balance > b.maxBalance {
b.balance = b.maxBalance
}
}
98 changes: 98 additions & 0 deletions internal/utils/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestRateLimiter(t *testing.T) {
limiter := NewRateLimiter(2.0, 2.0)
// stop time
ts := time.Now()
limiter.(*rateLimiter).lastTick = ts
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts
}
ok, waitTime := limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)
ok, waitTime = limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)
ok, waitTime = limiter.CheckCredit(1.0)
assert.False(t, ok)
assert.Equal(t, time.Second/2, waitTime)
// move time 250ms forward, not enough credits to pay for 1.0 item
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts.Add(time.Second / 4)
}
ok, waitTime = limiter.CheckCredit(1.0)
assert.False(t, ok)
assert.Equal(t, time.Second/4, waitTime)
// move time 500ms forward, now enough credits to pay for 1.0 item
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts.Add(time.Second/4 + time.Second/2)
}
ok, waitTime = limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)
ok, waitTime = limiter.CheckCredit(1.0)
assert.False(t, ok)
assert.Equal(t, time.Second/4, waitTime)
// move time 5s forward, enough to accumulate credits for 10 messages, but it should still be capped at 2
limiter.(*rateLimiter).lastTick = ts
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts.Add(5 * time.Second)
}
for i := 0; i < 2; i++ {
ok, waitTime = limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)
}
for i := 0; i < 3; i++ {
ok, waitTime = limiter.CheckCredit(1.0)
assert.False(t, ok)
assert.Equal(t, time.Second/2, waitTime)
}
}

func TestMaxBalance(t *testing.T) {
limiter := NewRateLimiter(0.1, 1.0)
// stop time
ts := time.Now()
limiter.(*rateLimiter).lastTick = ts
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts
}
// on initialization, should have enough credits for 1 message
ok, waitTime := limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)

// move time 20s forward, enough to accumulate credits for 2 messages, but it should still be capped at 1
limiter.(*rateLimiter).timeNow = func() time.Time {
return ts.Add(time.Second * 20)
}
ok, waitTime = limiter.CheckCredit(1.0)
assert.True(t, ok)
assert.Equal(t, time.Duration(0), waitTime)
ok, waitTime = limiter.CheckCredit(1.0)
assert.False(t, ok)
assert.Equal(t, 10*time.Second, waitTime)
}
41 changes: 40 additions & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package cassandra

import (
"flag"
"io"
"time"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/utils"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
Expand Down Expand Up @@ -101,7 +105,42 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil
writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger)
if f.Options.writerRateLimit.operationsPerSecond == 0 && f.Options.writerRateLimit.maxBurst == 0 {
return writer, nil
}

rateLimiter, err := utils.NewRateLimiter(
f.Options.writerRateLimit.operationsPerSecond,
f.Options.writerRateLimit.maxBurst,
)
if err != nil {
return nil, err
}
return newRateLimitedSpanWriter(writer, rateLimiter), nil
}

func newRateLimitedSpanWriter(writer spanstore.Writer, rateLimiter utils.RateLimiter) spanstore.Writer {
return &rateLimitedSpanWriter{
writer: writer,
rateLimiter: rateLimiter,
}
}

type rateLimitedSpanWriter struct {
writer spanstore.Writer
rateLimiter utils.RateLimiter
io.Closer
}

func (w *rateLimitedSpanWriter) WriteSpan(span *model.Span) error {
const cost = 1.0
ok, waitTime := w.rateLimiter.CheckCredit(cost)
for !ok {
time.Sleep(waitTime)
ok, waitTime = w.rateLimiter.CheckCredit(cost)
}
return w.writer.WriteSpan(span)
}

// CreateDependencyReader implements storage.Factory
Expand Down
17 changes: 17 additions & 0 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixWritesPerSecond = ".writer-rate-limit.operations-per-second"
suffixMaxBurstWrites = ".writer-rate-limit.max-burst"
)

// Options contains various type of Cassandra configs and provides the ability
Expand All @@ -57,6 +59,7 @@ type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
writerRateLimit rateLimitConfig
}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand All @@ -70,6 +73,12 @@ type namespaceConfig struct {
Enabled bool
}

// rateLimitConfig defines common arguments to a rate limiter.
type rateLimitConfig struct {
operationsPerSecond float64
maxBurst float64
}

// NewOptions creates a new Options struct.
func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
// TODO all default values should be defined via cobra flags
Expand Down Expand Up @@ -111,6 +120,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.Float64(opt.primary.namespace+suffixWritesPerSecond,
opt.writerRateLimit.operationsPerSecond,
"The number of writes per second using rate limiter")
flagSet.Float64(opt.primary.namespace+suffixMaxBurstWrites,
opt.writerRateLimit.maxBurst,
"The maximum number of writes in a single burst using rate limiter")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -201,6 +216,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
cfg.initFromViper(v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.writerRateLimit.operationsPerSecond = v.GetFloat64(opt.primary.namespace + suffixWritesPerSecond)
opt.writerRateLimit.maxBurst = v.GetFloat64(opt.primary.namespace + suffixMaxBurstWrites)
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.consistency=ONE",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
"--cas.writer-rate-limit.operations-per-second=10",
"--cas.writer-rate-limit.max-burst=1",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
Expand All @@ -82,4 +84,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary")
assert.Equal(t, 3, aux.ProtoVersion)
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive)

assert.Equal(t, 10.0, opts.writerRateLimit.operationsPerSecond)
assert.Equal(t, 1.0, opts.writerRateLimit.maxBurst)
}

0 comments on commit 8dcca83

Please sign in to comment.