Skip to content

Commit

Permalink
Remove metrics, minor improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Isaac Hier <ihier@uber.com>
  • Loading branch information
Isaac Hier committed Nov 22, 2018
1 parent 1690ace commit 24aea8b
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 39 deletions.
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return writer, nil
}

return ratelimit.NewRateLimitedWriter(writer, f.Options.writesPerSecond, f.primaryMetricsFactory)
return ratelimit.NewRateLimitedWriter(writer, f.Options.writesPerSecond)
}

// CreateDependencyReader implements storage.Factory
Expand Down
5 changes: 4 additions & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore/ratelimit"
)

var _ storage.Factory = new(Factory)
Expand Down Expand Up @@ -102,6 +103,8 @@ func TestCassandraFactoryWriterRateLimit(t *testing.T) {
f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, logger))

_, err := f.CreateSpanWriter()
writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
dummyRateLimitedWriter, _ := ratelimit.NewRateLimitedWriter(nil, 1)
assert.IsType(t, dummyRateLimitedWriter, writer)
}
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
"The duration to wait before rewriting an existing service or operation name")
flagSet.Int(opt.primary.namespace+suffixWritesPerSecond,
opt.writesPerSecond,
"The number of writes per second using rate limiter")
"Optional upper limit on the number of span writes per second")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down
3 changes: 1 addition & 2 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, "ONE", primary.Consistency)
assert.Equal(t, 10, opts.writesPerSecond)

aux := opts.Get("cas-aux")
require.NotNil(t, aux)
Expand All @@ -83,6 +84,4 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary")
assert.Equal(t, 3, aux.ProtoVersion)
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive)

assert.Equal(t, 10, opts.writesPerSecond)
}
30 changes: 6 additions & 24 deletions storage/spanstore/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package ratelimit

import (
"errors"
"time"

"github.com/uber/jaeger-lib/metrics"
rlImpl "go.uber.org/ratelimit"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -28,42 +26,26 @@ import (
var errInvalidRate = errors.New("rate must be a positive integer")

type rateLimitedWriter struct {
writer spanstore.Writer
limiter rlImpl.Limiter
sleepTime metrics.Timer
delayedWrites metrics.Counter
writer spanstore.Writer
limiter rlImpl.Limiter
}

// NewRateLimitedWriter decorates a Writer with a rate limiter in order to limit
// the number of writes per second.
func NewRateLimitedWriter(
writer spanstore.Writer,
rate int,
metricsFactory metrics.Factory,
) (spanstore.Writer, error) {
func NewRateLimitedWriter(writer spanstore.Writer, rate int) (spanstore.Writer, error) {
if rate <= 0 {
return nil, errInvalidRate
}

m := metricsFactory.Namespace("rate-limited-writer", nil)
return &rateLimitedWriter{
writer: writer,
limiter: rlImpl.New(rate),
sleepTime: m.Timer("sleep-time", nil),
delayedWrites: m.Counter("delayed-writes", nil),
writer: writer,
limiter: rlImpl.New(rate),
}, nil
}

// WriteSpan wraps the write span method of the inner writer, but invokes the
// rate limiter before each write.
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error {
const threshold = 100 * time.Microsecond
startTime := time.Now()
endTime := r.limiter.Take()
duration := endTime.Sub(startTime)
if duration >= threshold {
r.delayedWrites.Inc(1)
r.sleepTime.Record(duration)
}
r.limiter.Take()
return r.writer.WriteSpan(s)
}
13 changes: 3 additions & 10 deletions storage/spanstore/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/model"
)
Expand All @@ -44,8 +43,7 @@ func (m *mockRateLimiter) Take() time.Time {

func TestRateLimitedWriter(t *testing.T) {
writer := &mockWriter{}
metricsFactory := metrics.NewLocalFactory(0)
decoratedWriter, err := NewRateLimitedWriter(writer, 10, metricsFactory)
decoratedWriter, err := NewRateLimitedWriter(writer, 10)
require.NoError(t, err)
var rateLimiter mockRateLimiter
decoratedWriter.(*rateLimitedWriter).limiter = &rateLimiter
Expand All @@ -57,8 +55,7 @@ func TestRateLimitedWriter(t *testing.T) {

func TestRateLimitedWriterInvalidWritesPerSecond(t *testing.T) {
writer := &mockWriter{}
metricsFactory := metrics.NewLocalFactory(0)
decoratedWriter, err := NewRateLimitedWriter(writer, 0, metricsFactory)
decoratedWriter, err := NewRateLimitedWriter(writer, 0)
require.Error(t, err)
require.Nil(t, decoratedWriter)
}
Expand All @@ -68,16 +65,12 @@ func TestRateLimitedWriterWithWriteError(t *testing.T) {
writer := &mockWriter{
expectedError: fakeError,
}
metricsFactory := metrics.NewLocalFactory(0)
decoratedWriter, err := NewRateLimitedWriter(writer, 5, metricsFactory)
decoratedWriter, err := NewRateLimitedWriter(writer, 5)
require.NoError(t, err)
err = decoratedWriter.WriteSpan(nil)
require.Error(t, err)
require.Equal(t, fakeError, err)
writer.expectedError = nil
err = decoratedWriter.WriteSpan(nil)
require.NoError(t, err)
counters, _ := metricsFactory.Snapshot()
require.Contains(t, counters, "rate-limited-writer.delayed-writes")
require.Equal(t, int64(1), counters["rate-limited-writer.delayed-writes"])
}

0 comments on commit 24aea8b

Please sign in to comment.