Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter option to Cassandra writer (WIP) #1147

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import:
- package: github.com/pkg/errors
- package: go.uber.org/zap
version: ^1
- package: go.uber.org/ratelimit
- package: github.com/uber/jaeger-client-go
version: ^2.15.0
subpackages:
Expand Down Expand Up @@ -64,7 +65,6 @@ import:
- package: github.com/gogo/googleapis
version: b23578765ee54ff6bceff57f397d833bf4ca6869
- package: github.com/gogo/protobuf
version: fd9a4790f3963525fb889cc00e0a8f828e0b3a29
- package: github.com/grpc-ecosystem/grpc-gateway
version: 58f78b988bc393694cef62b92c5cde77e4742ff5
- package: google.golang.org/grpc
Expand Down
9 changes: 8 additions & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/ratelimit"
)

const (
Expand Down Expand Up @@ -101,7 +102,13 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil
writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger)

if f.Options.writesPerSecond == 0 {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return writer, nil
}

return ratelimit.NewRateLimitedWriter(writer, f.Options.writesPerSecond)
}

// CreateDependencyReader implements storage.Factory
Expand Down
21 changes: 21 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore/ratelimit"
)

var _ storage.Factory = new(Factory)
Expand Down Expand Up @@ -87,3 +88,23 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}

func TestCassandraFactoryWriterRateLimit(t *testing.T) {
logger, _ := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
"--cassandra.writes-per-second=10",
})
f.InitFromViper(v)

// Use mock to avoid errors in unit test. See details in previous test case.
f.primaryConfig = &mockSessionBuilder{}
f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, logger))

writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
dummyRateLimitedWriter, _ := ratelimit.NewRateLimitedWriter(nil, 1)
assert.IsType(t, dummyRateLimitedWriter, writer)
}
6 changes: 6 additions & 0 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixWritesPerSecond = ".writes-per-second"
)

// Options contains various type of Cassandra configs and provides the ability
Expand All @@ -57,6 +58,7 @@ type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
writesPerSecond int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be public? And qualified with SpanStore prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK why SpanStoreWriteCacheTTL is public either. There is no serialization here. I think it might be a workaround to pass options to another package that need to read SpanStoreWriteCacheTTL, but doesn't make much sense given that passing the int is just as effective.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we using this internally? SpanStoreWriteCacheTTL is externally settable option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these are initialized by command-line flags, so neither should be public. I did a code search and could not find a reference to SpanStoreWriteCacheTTL (not even in our internal Uber repo). Seems to be vestigial from some point where we needed access from our internal repo into this part of the OSS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use cli flags internally. If we need to set these two fields how would we do it without them being public? Did you try your branch without our internal main?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ git grep -l SpanStoreWriteCacheTTL
plugin/storage/cassandra/factory.go
plugin/storage/cassandra/options.go

}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand Down Expand Up @@ -111,6 +113,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.Int(opt.primary.namespace+suffixWritesPerSecond,
opt.writesPerSecond,
"Optional upper limit on the number of span writes per second")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -201,6 +206,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
cfg.initFromViper(v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.writesPerSecond = v.GetInt(opt.primary.namespace + suffixWritesPerSecond)
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.consistency=ONE",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
"--cas.writes-per-second=10",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
Expand All @@ -69,6 +70,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, "ONE", primary.Consistency)
assert.Equal(t, 10, opts.writesPerSecond)

aux := opts.Get("cas-aux")
require.NotNil(t, aux)
Expand Down
51 changes: 51 additions & 0 deletions storage/spanstore/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"errors"

rlImpl "go.uber.org/ratelimit"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var errInvalidRate = errors.New("rate must be a positive integer")

type rateLimitedWriter struct {
writer spanstore.Writer
limiter rlImpl.Limiter
}

// NewRateLimitedWriter decorates a Writer with a rate limiter in order to limit
// the number of writes per second.
func NewRateLimitedWriter(writer spanstore.Writer, rate int) (spanstore.Writer, error) {
if rate <= 0 {
return nil, errInvalidRate
}

return &rateLimitedWriter{
writer: writer,
limiter: rlImpl.New(rate),
}, nil
}

// WriteSpan wraps the write span method of the inner writer, but invokes the
// rate limiter before each write.
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error {
r.limiter.Take()
return r.writer.WriteSpan(s)
}
76 changes: 76 additions & 0 deletions storage/spanstore/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)

type mockWriter struct {
expectedError error
}

func (m *mockWriter) WriteSpan(s *model.Span) error {
return m.expectedError
}

type mockRateLimiter struct {
calls int
}

func (m *mockRateLimiter) Take() time.Time {
m.calls++
return time.Time{}
}

func TestRateLimitedWriter(t *testing.T) {
writer := &mockWriter{}
decoratedWriter, err := NewRateLimitedWriter(writer, 10)
require.NoError(t, err)
var rateLimiter mockRateLimiter
decoratedWriter.(*rateLimitedWriter).limiter = &rateLimiter
require.NotEqual(t, writer, decoratedWriter)
err = decoratedWriter.WriteSpan(&model.Span{})
require.NoError(t, err)
require.Equal(t, 1, rateLimiter.calls)
}

func TestRateLimitedWriterInvalidWritesPerSecond(t *testing.T) {
writer := &mockWriter{}
decoratedWriter, err := NewRateLimitedWriter(writer, 0)
require.Error(t, err)
require.Nil(t, decoratedWriter)
}

func TestRateLimitedWriterWithWriteError(t *testing.T) {
var fakeError = errors.New("test")
writer := &mockWriter{
expectedError: fakeError,
}
decoratedWriter, err := NewRateLimitedWriter(writer, 5)
require.NoError(t, err)
err = decoratedWriter.WriteSpan(nil)
require.Error(t, err)
require.Equal(t, fakeError, err)
writer.expectedError = nil
err = decoratedWriter.WriteSpan(nil)
require.NoError(t, err)
}