Skip to content

Commit

Permalink
Implement rate limiting for storage writers
Browse files Browse the repository at this point in the history
Signed-off-by: Isaac Hier <ihier@uber.com>
  • Loading branch information
Isaac Hier committed Dec 5, 2018
1 parent 6237e2f commit 891eef8
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 4 deletions.
10 changes: 7 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import:
- package: github.com/pkg/errors
- package: go.uber.org/zap
version: ^1
- package: go.uber.org/ratelimit
- package: github.com/uber/jaeger-client-go
version: ^2.15.0
subpackages:
Expand Down
9 changes: 8 additions & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/ratelimit"
)

const (
Expand Down Expand Up @@ -101,7 +102,13 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil
writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger)

if f.Options.writesPerSecond == 0 {
return writer, nil
}

return ratelimit.NewRateLimitedWriter(writer, f.Options.writesPerSecond)
}

// CreateDependencyReader implements storage.Factory
Expand Down
21 changes: 21 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore/ratelimit"
)

var _ storage.Factory = new(Factory)
Expand Down Expand Up @@ -87,3 +88,23 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}

func TestCassandraFactoryWriterRateLimit(t *testing.T) {
logger, _ := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
"--cassandra.writes-per-second=10",
})
f.InitFromViper(v)

// Use mock to avoid errors in unit test. See details in previous test case.
f.primaryConfig = &mockSessionBuilder{}
f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, logger))

writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
dummyRateLimitedWriter, _ := ratelimit.NewRateLimitedWriter(nil, 1)
assert.IsType(t, dummyRateLimitedWriter, writer)
}
6 changes: 6 additions & 0 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixWritesPerSecond = ".writes-per-second"
)

// Options contains various type of Cassandra configs and provides the ability
Expand All @@ -57,6 +58,7 @@ type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
writesPerSecond int
}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand Down Expand Up @@ -111,6 +113,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.Int(opt.primary.namespace+suffixWritesPerSecond,
opt.writesPerSecond,
"Optional upper limit on the number of span writes per second")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -201,6 +206,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
cfg.initFromViper(v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.writesPerSecond = v.GetInt(opt.primary.namespace + suffixWritesPerSecond)
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.consistency=ONE",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
"--cas.writes-per-second=10",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
Expand All @@ -69,6 +70,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, "ONE", primary.Consistency)
assert.Equal(t, 10, opts.writesPerSecond)

aux := opts.Get("cas-aux")
require.NotNil(t, aux)
Expand Down
51 changes: 51 additions & 0 deletions storage/spanstore/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"errors"

rlImpl "go.uber.org/ratelimit"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var errInvalidRate = errors.New("rate must be a positive integer")

type rateLimitedWriter struct {
writer spanstore.Writer
limiter rlImpl.Limiter
}

// NewRateLimitedWriter decorates a Writer with a rate limiter in order to limit
// the number of writes per second.
func NewRateLimitedWriter(writer spanstore.Writer, rate int) (spanstore.Writer, error) {
if rate <= 0 {
return nil, errInvalidRate
}

return &rateLimitedWriter{
writer: writer,
limiter: rlImpl.New(rate),
}, nil
}

// WriteSpan wraps the write span method of the inner writer, but invokes the
// rate limiter before each write.
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error {
r.limiter.Take()
return r.writer.WriteSpan(s)
}
76 changes: 76 additions & 0 deletions storage/spanstore/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)

type mockWriter struct {
expectedError error
}

func (m *mockWriter) WriteSpan(s *model.Span) error {
return m.expectedError
}

type mockRateLimiter struct {
calls int
}

func (m *mockRateLimiter) Take() time.Time {
m.calls++
return time.Time{}
}

func TestRateLimitedWriter(t *testing.T) {
writer := &mockWriter{}
decoratedWriter, err := NewRateLimitedWriter(writer, 10)
require.NoError(t, err)
var rateLimiter mockRateLimiter
decoratedWriter.(*rateLimitedWriter).limiter = &rateLimiter
require.NotEqual(t, writer, decoratedWriter)
err = decoratedWriter.WriteSpan(&model.Span{})
require.NoError(t, err)
require.Equal(t, 1, rateLimiter.calls)
}

func TestRateLimitedWriterInvalidWritesPerSecond(t *testing.T) {
writer := &mockWriter{}
decoratedWriter, err := NewRateLimitedWriter(writer, 0)
require.Error(t, err)
require.Nil(t, decoratedWriter)
}

func TestRateLimitedWriterWithWriteError(t *testing.T) {
var fakeError = errors.New("test")
writer := &mockWriter{
expectedError: fakeError,
}
decoratedWriter, err := NewRateLimitedWriter(writer, 5)
require.NoError(t, err)
err = decoratedWriter.WriteSpan(nil)
require.Error(t, err)
require.Equal(t, fakeError, err)
writer.expectedError = nil
err = decoratedWriter.WriteSpan(nil)
require.NoError(t, err)
}

0 comments on commit 891eef8

Please sign in to comment.