diff --git a/glide.lock b/glide.lock index 274f1ac51db..b48f1f6a2ec 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: eb96c7fd56c8c6523990c19ba5bcf0d826922569a16e1019a88de57d46675775 -updated: 2018-10-12T23:27:30.163799+09:00 +hash: 67360688a603b6dfdfe15e7750bcdb833f9a20ea9617c71066d4fd2f90f459c7 +updated: 2018-10-29T10:53:47.126953-04:00 imports: - name: github.com/apache/thrift version: 53dd39833a08ce33582e5ff31fa18bb4735d6731 @@ -282,6 +282,10 @@ imports: version: 1ea20fb1cbb1cc08cbd0d913a96dead89aa18289 - name: go.uber.org/multierr version: 3c4937480c32f4c13a875a1829af76c98ca3d40a +- name: go.uber.org/ratelimit + version: c15da02342779cb6dc027fc95ee2277787698f36 + subpackages: + - internal/clock - name: go.uber.org/zap version: ff33455a0e382e8a81d14dd7c922020b6b5e7982 subpackages: @@ -293,7 +297,7 @@ imports: - zapcore - zaptest - name: golang.org/x/net - version: 49bb7cea24b1df9410e1712aa6433dae904ff66a + version: c44066c5c816ec500d459a2a324a753f78531ae0 subpackages: - context - context/ctxhttp diff --git a/glide.yaml b/glide.yaml index 655375133f9..fd8321f62e2 100644 --- a/glide.yaml +++ b/glide.yaml @@ -12,6 +12,7 @@ import: - package: github.com/pkg/errors - package: go.uber.org/zap version: ^1 +- package: go.uber.org/ratelimit - package: github.com/uber/jaeger-client-go version: ^2.15.0 subpackages: diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 91566e88561..10edff9cb76 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -28,6 +28,7 @@ import ( "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage/spanstore/ratelimit" ) const ( @@ -101,7 +102,13 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil + writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger) + + if f.Options.writesPerSecond == 0 { + return writer, nil + } + + return ratelimit.NewRateLimitedWriter(writer, f.Options.writesPerSecond) } // CreateDependencyReader implements storage.Factory diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 078c008cf71..c45ea54ee48 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -27,6 +27,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/spanstore/ratelimit" ) var _ storage.Factory = new(Factory) @@ -87,3 +88,23 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() assert.NoError(t, err) } + +func TestCassandraFactoryWriterRateLimit(t *testing.T) { + logger, _ := testutils.NewLogger() + f := NewFactory() + v, command := config.Viperize(f.AddFlags) + command.ParseFlags([]string{ + "--cassandra.writes-per-second=10", + }) + f.InitFromViper(v) + + // Use mock to avoid errors in unit test. See details in previous test case. + f.primaryConfig = &mockSessionBuilder{} + f.archiveConfig = nil + assert.NoError(t, f.Initialize(metrics.NullFactory, logger)) + + writer, err := f.CreateSpanWriter() + assert.NoError(t, err) + dummyRateLimitedWriter, _ := ratelimit.NewRateLimitedWriter(nil, 1) + assert.IsType(t, dummyRateLimitedWriter, writer) +} diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 5427cdec836..e7e033e286f 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -48,6 +48,7 @@ const ( // common storage settings suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl" + suffixWritesPerSecond = ".writes-per-second" ) // Options contains various type of Cassandra configs and provides the ability @@ -57,6 +58,7 @@ type Options struct { primary *namespaceConfig others map[string]*namespaceConfig SpanStoreWriteCacheTTL time.Duration + writesPerSecond int } // the Servers field in config.Configuration is a list, which we cannot represent with flags. @@ -111,6 +113,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") + flagSet.Int(opt.primary.namespace+suffixWritesPerSecond, + opt.writesPerSecond, + "Optional upper limit on the number of span writes per second") } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { @@ -201,6 +206,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { cfg.initFromViper(v) } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL) + opt.writesPerSecond = v.GetInt(opt.primary.namespace + suffixWritesPerSecond) } func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 262172b4edc..852a55d109c 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -58,6 +58,7 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.consistency=ONE", "--cas.proto-version=3", "--cas.socket-keep-alive=42s", + "--cas.writes-per-second=10", // enable aux with a couple overrides "--cas-aux.enabled=true", "--cas-aux.keyspace=jaeger-archive", @@ -69,6 +70,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "jaeger", primary.Keyspace) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, "ONE", primary.Consistency) + assert.Equal(t, 10, opts.writesPerSecond) aux := opts.Get("cas-aux") require.NotNil(t, aux) diff --git a/storage/spanstore/ratelimit/rate_limit.go b/storage/spanstore/ratelimit/rate_limit.go new file mode 100644 index 00000000000..c8abde4f2be --- /dev/null +++ b/storage/spanstore/ratelimit/rate_limit.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "errors" + + rlImpl "go.uber.org/ratelimit" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var errInvalidRate = errors.New("rate must be a positive integer") + +type rateLimitedWriter struct { + writer spanstore.Writer + limiter rlImpl.Limiter +} + +// NewRateLimitedWriter decorates a Writer with a rate limiter in order to limit +// the number of writes per second. +func NewRateLimitedWriter(writer spanstore.Writer, rate int) (spanstore.Writer, error) { + if rate <= 0 { + return nil, errInvalidRate + } + + return &rateLimitedWriter{ + writer: writer, + limiter: rlImpl.New(rate), + }, nil +} + +// WriteSpan wraps the write span method of the inner writer, but invokes the +// rate limiter before each write. +func (r *rateLimitedWriter) WriteSpan(s *model.Span) error { + r.limiter.Take() + return r.writer.WriteSpan(s) +} diff --git a/storage/spanstore/ratelimit/rate_limit_test.go b/storage/spanstore/ratelimit/rate_limit_test.go new file mode 100644 index 00000000000..aec8b37db0e --- /dev/null +++ b/storage/spanstore/ratelimit/rate_limit_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" +) + +type mockWriter struct { + expectedError error +} + +func (m *mockWriter) WriteSpan(s *model.Span) error { + return m.expectedError +} + +type mockRateLimiter struct { + calls int +} + +func (m *mockRateLimiter) Take() time.Time { + m.calls++ + return time.Time{} +} + +func TestRateLimitedWriter(t *testing.T) { + writer := &mockWriter{} + decoratedWriter, err := NewRateLimitedWriter(writer, 10) + require.NoError(t, err) + var rateLimiter mockRateLimiter + decoratedWriter.(*rateLimitedWriter).limiter = &rateLimiter + require.NotEqual(t, writer, decoratedWriter) + err = decoratedWriter.WriteSpan(&model.Span{}) + require.NoError(t, err) + require.Equal(t, 1, rateLimiter.calls) +} + +func TestRateLimitedWriterInvalidWritesPerSecond(t *testing.T) { + writer := &mockWriter{} + decoratedWriter, err := NewRateLimitedWriter(writer, 0) + require.Error(t, err) + require.Nil(t, decoratedWriter) +} + +func TestRateLimitedWriterWithWriteError(t *testing.T) { + var fakeError = errors.New("test") + writer := &mockWriter{ + expectedError: fakeError, + } + decoratedWriter, err := NewRateLimitedWriter(writer, 5) + require.NoError(t, err) + err = decoratedWriter.WriteSpan(nil) + require.Error(t, err) + require.Equal(t, fakeError, err) + writer.expectedError = nil + err = decoratedWriter.WriteSpan(nil) + require.NoError(t, err) +}