Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter option to Cassandra writer (WIP) #1147

Closed
wants to merge 3 commits into from

Conversation

isaachier
Copy link
Contributor

Which problem is this PR solving?

Short description of the changes

  • Add rate limiter to Cassandra writer so we do not overload Cassandra with write requests.

@vprithvi
Copy link
Contributor

vprithvi commented Oct 26, 2018

Few questions:

  • What are the advantages of scoping this to Cassandra as opposed to creating a Writer decorator that can be used for all writers?
  • What are the disadvantages of using https://github.com/uber-go/ratelimit in lieu of writing our own implementation?

@isaachier
Copy link
Contributor Author

  • What are the advantages of scoping this to Cassandra as opposed to creating a Writer decorator that can be used for all writers?

I had considered that as well. I think it makes sense. The only issue is that config tends to be database specific, so the main change will probably still be in the Cassandra config (unless you'd like me to add to all database implementations immediately).

I'd love to use uber-go/ratelimit, but they only support integer rates, not floating point rates (not sure why). If I can get a PR through to them, that would be the best solution. Then again, it is probably a breaking change for their API.

@vprithvi
Copy link
Contributor

I'd love to use uber-go/ratelimit, but they only support integer rates, not floating point rates

What makes floating point rates indispensable for us?

@yurishkuro
Copy link
Member

I don't envision us having rate per ingester of less than one msg per second, so integer rates seem fine

@isaachier
Copy link
Contributor Author

OK cool. I can remove the rate limiter too.

@isaachier
Copy link
Contributor Author

Actually, ratelimit is broken. Here is the error:

vendor/github.com/uber-go/ratelimit/ratelimit.go:27:2: use of internal package github.com/jaegertracing/jaeger/vendor/go.uber.org/ratelimit/internal/clock not allowed

Looks like the package is not really maintained based on the history and lack of versions.

@codecov
Copy link

codecov bot commented Oct 28, 2018

Codecov Report

Merging #1147 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@          Coverage Diff           @@
##           master   #1147   +/-   ##
======================================
  Coverage     100%    100%           
======================================
  Files         159     160    +1     
  Lines        7145    7162   +17     
======================================
+ Hits         7145    7162   +17
Impacted Files Coverage Δ
storage/spanstore/ratelimit/rate_limit.go 100% <100%> (ø)
plugin/storage/cassandra/factory.go 100% <100%> (ø) ⬆️
plugin/storage/cassandra/options.go 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7b18bd9...2f11922. Read the comment docs.

@isaachier
Copy link
Contributor Author

isaachier commented Oct 29, 2018

Can someone please rerun test? ES unit test failed.

@black-adder
Copy link
Contributor

ratelimit is broken

I find that hard to believe given that it's being used by multiple other teams at Uber. Looks like the issue is more a vendoring issue.

@isaachier
Copy link
Contributor Author

ratelimit is broken

I find that hard to believe given that it's being used by multiple other teams at Uber. Looks like the issue is more a vendoring issue.

Interesting thought. I'm surprised it works. Seems to use internal package types in public interface. I'll try again and let you know.

// rate limiter before each write.
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error {
const cost = 1
for ok, waitTime := r.l.CheckCredit(cost); !ok; ok, waitTime = r.l.CheckCredit(cost) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't

for ok, waitTime := r.l.CheckCredit(cost); !ok {
   time.Sleep(waitTime)

work? Not sure why the second call is needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well let's imagine Java/C++/wtvr other language:

for (bool ok = r.l.CheckCredit(cost); !ok; ok = r.l.CheckCredit(cost)) {
    // ...
}

First statement is a declaration/initialization statement. Last statement repeats each time the loop is repeated. Not 100% sure, but I imagine Go works the same way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, this explains it: https://tour.golang.org/flowcontrol/1.

@@ -48,6 +48,8 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixWritesPerSecond = ".writer-rate-limit.operations-per-second"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this for now but I think these configs should be storage type agnostic (ie ES can also use the rate limiter but we shouldn't have to pollute ES storage configs with .writer-rate-limit.operations-per-second and .writer-rate-limit.max-burst)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish the configs generally were less storage specific. I'm not sure if we share any config at the moment.

// rateLimitConfig defines common arguments to a rate limiter.
type rateLimitConfig struct {
operationsPerSecond float64
maxBurst float64
Copy link
Contributor

@vprithvi vprithvi Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meant to delete this. Thanks for the reminder.

@isaachier isaachier changed the title Add rate limiter option to Cassandra writer (WIP) Add rate limiter option to Cassandra writer Oct 29, 2018
require.Nil(t, decoratedWriter)
}

func TestRateLimitedWriterUnlimitedWritesPerSecond(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that this test doesn't test anything additional over TestRateLimitedWriter. Could this be made better?
(If not, it may make sense to implement a no-op Limiter inside our decorator so that we can test it better)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. I didn't want to use time alone seeing how flakey that can be. Will try mocking soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of type testing, but tried it out in latest revision.

@isaachier isaachier changed the title Add rate limiter option to Cassandra writer Add rate limiter option to Cassandra writer (WIP) Oct 30, 2018
@@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/decorator"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the "decorator" package? Metrics is also a decorator, it's in its own package. I suggest spanstore/ratelimited instead


return decorator.NewRateLimitedWriter(
writer,
f.Options.writesPerSecond,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if f.Options.writesPerSecond == 0, do we need to wrap?

Copy link
Contributor Author

@isaachier isaachier Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol see what @vprithvi suggested earlier. We are wrapping with noop rate limiter in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have an if statement, I'd rather have it here and not add two more layers of indirection. Then in your rate limited writer, return an error if rate < 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man. I want to wait on @vprithvi's response because this is literally what I was wondering above.

Good idea. I wasn't sure if a passthrough would hurt performance due to the indirection.

#1147 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a preference towards having this logic be in the rate limiter because it means that it doesn't need to be in every storage implementation. However, it's not a strong preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a separate concern though, the config and application of rate limiter is currently in the cassandra factory, but we could easily pull it up to the higher level (meta) factory (I just don't think we need to do it right now). This keeps the rate limiter simpler - you can only instantiate it with a positive rate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering when we care about performance of indirection, etc. But I'll go back to the original if statement here.


type rateLimitedWriter struct {
w spanstore.Writer
l ratelimit.Limiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use full field names. One letter names are ok for local variables when there is context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

@isaachier
Copy link
Contributor Author

ES integration test again!

@jpkrohling
Copy link
Contributor

Job restarted

plugin/storage/cassandra/factory_test.go Show resolved Hide resolved
@@ -111,6 +113,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.Int(opt.primary.namespace+suffixWritesPerSecond,
opt.writesPerSecond,
"The number of writes per second using rate limiter")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional upper limit on the number of span writes per second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's better.

@@ -57,6 +58,7 @@ type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
writesPerSecond int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be public? And qualified with SpanStore prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK why SpanStoreWriteCacheTTL is public either. There is no serialization here. I think it might be a workaround to pass options to another package that need to read SpanStoreWriteCacheTTL, but doesn't make much sense given that passing the int is just as effective.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we using this internally? SpanStoreWriteCacheTTL is externally settable option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these are initialized by command-line flags, so neither should be public. I did a code search and could not find a reference to SpanStoreWriteCacheTTL (not even in our internal Uber repo). Seems to be vestigial from some point where we needed access from our internal repo into this part of the OSS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use cli flags internally. If we need to set these two fields how would we do it without them being public? Did you try your branch without our internal main?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ git grep -l SpanStoreWriteCacheTTL
plugin/storage/cassandra/factory.go
plugin/storage/cassandra/options.go

@@ -82,4 +83,6 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary")
assert.Equal(t, 3, aux.ProtoVersion)
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive)

assert.Equal(t, 10, opts.writesPerSecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to L73

"github.com/jaegertracing/jaeger/storage/spanstore"
)

var errInvalidRate = errors.New("rate must be a non-zero positive integer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

positive already implies non-zero

Copy link
Member

@yurishkuro yurishkuro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we do the metrics in a separate PR? This is a scope creep.

if rate <= 0 {
return nil, errInvalidRate
}

m := metricsFactory.Namespace("rate-limited-writer", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is using dashes in metric names consistent with the rest of the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the format from the storage/spanstore/metrics/writer_metrics.go file, but I cannot guarantee that file is correct.

// WriteSpan wraps the write span method of the inner writer, but invokes the
// rate limiter before each write.
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error {
r.limiter.Take()
const threshold = 100 * time.Microsecond
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a complete hack / arbitrary threshold. We should already have a write latency metric from the main writer. You can add another timer here that would measure full latency (wait + actual write) and emit that.

As for the counter, unless uber-go limiter returns a value indicating whether there was any sleep, I don't think we should be trying to reverse-engineer it with some arbitrary thredsholds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happen to agree wholeheartedly. Team suggested I add a metric with number of delayed spans. The current rate limiter API makes this essentially impossible. IMO the real issue is that the rate limiter API has limited use in its current form. I believe it would have made more sense to return the duration slept as opposed to the timestamp of the wake time.

@isaachier
Copy link
Contributor Author

I just added metrics here while testing. I understand the concern regarding scope creep, and will move it to another PR soon.

Isaac Hier added 3 commits December 7, 2018 08:18
Signed-off-by: Isaac Hier <ihier@uber.com>
Signed-off-by: Isaac Hier <ihier@uber.com>
Signed-off-by: Isaac Hier <ihier@uber.com>
@yurishkuro
Copy link
Member

I think this is solved differently by #1353

@yurishkuro yurishkuro closed this Apr 6, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants