diff --git a/internal/utils/rate_limiter.go b/internal/utils/rate_limiter.go new file mode 100644 index 00000000000..9fd13950bdc --- /dev/null +++ b/internal/utils/rate_limiter.go @@ -0,0 +1,98 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "errors" + "sync" + "time" +) + +var ( + ErrInvalidCreditsPerSecond = errors.New("invalid credits per second, must be greater than zero") + ErrInvalidMaxBalance = errors.New("invalid max balance, must be greater than zero") +) + +// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits. +type RateLimiter interface { + CheckCredit(itemCost float64) (bool, time.Duration) +} + +type rateLimiter struct { + sync.Mutex + + creditsPerSecond float64 + balance float64 + maxBalance float64 + lastTick time.Time + + timeNow func() time.Time +} + +// NewRateLimiter creates a new rate limiter based on leaky bucket algorithm, formulated in terms of a +// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional +// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost +// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased" +// and the balance reduced, indicated by returned value of true with a wait time of zero. Otherwise the balance is +// unchanged and return false with the time until the next credit accrues. +// +// This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the +// max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message +// to determine if the message is within the rate limit. +// +// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput +// as bytes/second, and calling CheckCredit() with the actual message size. +func NewRateLimiter(creditsPerSecond, maxBalance float64) (RateLimiter, error) { + if creditsPerSecond < 0 { + return nil, ErrInvalidCreditsPerSecond + } + if maxBalance < 0 { + return nil, ErrInvalidMaxBalance + } + return &rateLimiter{ + creditsPerSecond: creditsPerSecond, + balance: maxBalance, + maxBalance: maxBalance, + lastTick: time.Now(), + timeNow: time.Now, + }, nil +} + +func (b *rateLimiter) CheckCredit(itemCost float64) (bool, time.Duration) { + b.Lock() + defer b.Unlock() + b.updateBalance() + // if we have enough credits to pay for current item, then reduce balance and allow + if b.balance >= itemCost { + b.balance -= itemCost + return true, 0 + } + creditsRemaining := itemCost - b.balance + waitTime := time.Nanosecond * time.Duration(creditsRemaining*float64(time.Second.Nanoseconds())/b.creditsPerSecond) + return false, waitTime +} + +// N.B. Must be called while holding the lock. +func (b *rateLimiter) updateBalance() { + // calculate how much time passed since the last tick, and update current tick + currentTime := b.timeNow() + elapsedTime := currentTime.Sub(b.lastTick) + b.lastTick = currentTime + // calculate how much credit have we accumulated since the last tick + b.balance += elapsedTime.Seconds() * b.creditsPerSecond + if b.balance > b.maxBalance { + b.balance = b.maxBalance + } +} diff --git a/internal/utils/rate_limiter_test.go b/internal/utils/rate_limiter_test.go new file mode 100644 index 00000000000..d17021bdaae --- /dev/null +++ b/internal/utils/rate_limiter_test.go @@ -0,0 +1,98 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRateLimiter(t *testing.T) { + limiter := NewRateLimiter(2.0, 2.0) + // stop time + ts := time.Now() + limiter.(*rateLimiter).lastTick = ts + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts + } + ok, waitTime := limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + ok, waitTime = limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + ok, waitTime = limiter.CheckCredit(1.0) + assert.False(t, ok) + assert.Equal(t, time.Second/2, waitTime) + // move time 250ms forward, not enough credits to pay for 1.0 item + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts.Add(time.Second / 4) + } + ok, waitTime = limiter.CheckCredit(1.0) + assert.False(t, ok) + assert.Equal(t, time.Second/4, waitTime) + // move time 500ms forward, now enough credits to pay for 1.0 item + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts.Add(time.Second/4 + time.Second/2) + } + ok, waitTime = limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + ok, waitTime = limiter.CheckCredit(1.0) + assert.False(t, ok) + assert.Equal(t, time.Second/4, waitTime) + // move time 5s forward, enough to accumulate credits for 10 messages, but it should still be capped at 2 + limiter.(*rateLimiter).lastTick = ts + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts.Add(5 * time.Second) + } + for i := 0; i < 2; i++ { + ok, waitTime = limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + } + for i := 0; i < 3; i++ { + ok, waitTime = limiter.CheckCredit(1.0) + assert.False(t, ok) + assert.Equal(t, time.Second/2, waitTime) + } +} + +func TestMaxBalance(t *testing.T) { + limiter := NewRateLimiter(0.1, 1.0) + // stop time + ts := time.Now() + limiter.(*rateLimiter).lastTick = ts + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts + } + // on initialization, should have enough credits for 1 message + ok, waitTime := limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + + // move time 20s forward, enough to accumulate credits for 2 messages, but it should still be capped at 1 + limiter.(*rateLimiter).timeNow = func() time.Time { + return ts.Add(time.Second * 20) + } + ok, waitTime = limiter.CheckCredit(1.0) + assert.True(t, ok) + assert.Equal(t, time.Duration(0), waitTime) + ok, waitTime = limiter.CheckCredit(1.0) + assert.False(t, ok) + assert.Equal(t, 10*time.Second, waitTime) +} diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 91566e88561..75926fe0c86 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -16,11 +16,15 @@ package cassandra import ( "flag" + "io" + "time" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/utils" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/config" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" @@ -101,7 +105,42 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil + writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger) + if f.Options.writerRateLimit.operationsPerSecond == 0 && f.Options.writerRateLimit.maxBurst == 0 { + return writer, nil + } + + rateLimiter, err := utils.NewRateLimiter( + f.Options.writerRateLimit.operationsPerSecond, + f.Options.writerRateLimit.maxBurst, + ) + if err != nil { + return nil, err + } + return newRateLimitedSpanWriter(writer, rateLimiter), nil +} + +func newRateLimitedSpanWriter(writer spanstore.Writer, rateLimiter utils.RateLimiter) spanstore.Writer { + return &rateLimitedSpanWriter{ + writer: writer, + rateLimiter: rateLimiter, + } +} + +type rateLimitedSpanWriter struct { + writer spanstore.Writer + rateLimiter utils.RateLimiter + io.Closer +} + +func (w *rateLimitedSpanWriter) WriteSpan(span *model.Span) error { + const cost = 1.0 + ok, waitTime := w.rateLimiter.CheckCredit(cost) + for !ok { + time.Sleep(waitTime) + ok, waitTime = w.rateLimiter.CheckCredit(cost) + } + return w.writer.WriteSpan(span) } // CreateDependencyReader implements storage.Factory diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 3409fe6a7c7..a30157405f0 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -48,6 +48,8 @@ const ( // common storage settings suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl" + suffixWritesPerSecond = ".writer-rate-limit.operations-per-second" + suffixMaxBurstWrites = ".writer-rate-limit.max-burst" ) // Options contains various type of Cassandra configs and provides the ability @@ -57,6 +59,7 @@ type Options struct { primary *namespaceConfig others map[string]*namespaceConfig SpanStoreWriteCacheTTL time.Duration + writerRateLimit rateLimitConfig } // the Servers field in config.Configuration is a list, which we cannot represent with flags. @@ -70,6 +73,12 @@ type namespaceConfig struct { Enabled bool } +// rateLimitConfig defines common arguments to a rate limiter. +type rateLimitConfig struct { + operationsPerSecond float64 + maxBurst float64 +} + // NewOptions creates a new Options struct. func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { // TODO all default values should be defined via cobra flags @@ -111,6 +120,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") + flagSet.Float64(opt.primary.namespace+suffixWritesPerSecond, + opt.writerRateLimit.operationsPerSecond, + "The number of writes per second using rate limiter") + flagSet.Float64(opt.primary.namespace+suffixMaxBurstWrites, + opt.writerRateLimit.maxBurst, + "The maximum number of writes in a single burst using rate limiter") } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { @@ -201,6 +216,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) { cfg.initFromViper(v) } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL) + opt.writerRateLimit.operationsPerSecond = v.GetFloat64(opt.primary.namespace + suffixWritesPerSecond) + opt.writerRateLimit.maxBurst = v.GetFloat64(opt.primary.namespace + suffixMaxBurstWrites) } func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 262172b4edc..eba835e848e 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -58,6 +58,8 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.consistency=ONE", "--cas.proto-version=3", "--cas.socket-keep-alive=42s", + "--cas.writer-rate-limit.operations-per-second=10", + "--cas.writer-rate-limit.max-burst=1", // enable aux with a couple overrides "--cas-aux.enabled=true", "--cas-aux.keyspace=jaeger-archive", @@ -82,4 +84,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") assert.Equal(t, 3, aux.ProtoVersion) assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) + + assert.Equal(t, 10.0, opts.writerRateLimit.operationsPerSecond) + assert.Equal(t, 1.0, opts.writerRateLimit.maxBurst) }