Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: ratelimiting by label #7597

Merged
merged 2 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

* [7462](https://github.com/grafana/loki/pull/7462) **MarNicGit**: Allow excluding event message from Windows Event Log entries.
* [7602](https://github.com/grafana/loki/pull/7602) **vmax**: Add decolorize stage to Promtail to easily parse colored logs.
* [7597](https://github.com/grafana/loki/pull/7597) **redbaron**: allow ratelimiting by label

##### Fixes
* [7771](https://github.com/grafana/loki/pull/7771) **GeorgeTsilias**: Handle nil error on target Details() call.
Expand Down
83 changes: 69 additions & 14 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@ package stages

import (
"context"
"fmt"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/util"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const (
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
ErrLimitStageByLabelMustDrop = "When ratelimiting by label, drop must be true"
MinReasonableMaxDistinctLabels = 10000 // 80bytes per rate.Limiter ~ 1MiB memory
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
)

var ratelimitDropReason = "ratelimit_drop_stage"

type LimitConfig struct {
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
ByLabelName string `mapstructure:"by_label_name"`
MaxDistinctLabels int `mapstructure:"max_distinct_labels"`
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
}

func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
Expand All @@ -34,36 +44,61 @@ func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.
return nil, err
}

logger = log.With(logger, "component", "stage", "type", "limit")
if cfg.ByLabelName != "" && cfg.MaxDistinctLabels < MinReasonableMaxDistinctLabels {
level.Warn(logger).Log(
"msg",
fmt.Sprintf("max_distinct_labels was adjusted up to the minimal reasonable value of %d", MinReasonableMaxDistinctLabels),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any rationale behind not to have < MinReasonableMaxDistrinctLabels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_distinct_labels is a safeguard to prevent OOM in case of misconfiguration. There is no preallocation happening, so setting it less doesn't give you any benefit. Current limit "costs" ~1MiB in my non-scientific calculations when limit is hit and user need to increase it should they need more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and user need to increase it should they need more.

Just to clarify this bit, user need to increase limit in case if number actively hit ratelimiters exceed it or they'll have innacurate ratelimiting (more entrics will go through then should have). Should some labels values be transient and NOT being in active use (lets say pod_name) , then it is perfectly fine for the system to reach max limit and get then GCed, there is no need to increase limit to accomodate for them.

)
cfg.MaxDistinctLabels = MinReasonableMaxDistinctLabels
}

r := &limitStage{
logger: log.With(logger, "component", "stage", "type", "limit"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst),
logger: logger,
cfg: cfg,
dropCount: getDropCountMetric(registerer),
}

if cfg.ByLabelName != "" {
r.dropCountByLabel = getDropCountByLabelMetric(registerer)
newRateLimiter := func() *rate.Limiter { return rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst) }
gcCb := func() { r.dropCountByLabel.Reset() }
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
r.rateLimiterByLabel = util.NewGenMap[model.LabelValue, *rate.Limiter](cfg.MaxDistinctLabels, newRateLimiter, gcCb)
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
} else {
r.rateLimiter = rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst)
}

return r, nil
}

func validateLimitConfig(cfg *LimitConfig) error {
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}

if cfg.ByLabelName != "" && !cfg.Drop {
return errors.Errorf(ErrLimitStageByLabelMustDrop)
}
return nil
}

// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
dropCount *prometheus.CounterVec
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
rateLimiterByLabel util.GenerationalMap[model.LabelValue, *rate.Limiter]
dropCount *prometheus.CounterVec
dropCountByLabel *prometheus.CounterVec
byLabelName model.LabelName
}

func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle() {
if !m.shouldThrottle(e.Labels) {
out <- e
continue
}
Expand All @@ -72,7 +107,21 @@ func (m *limitStage) Run(in chan Entry) chan Entry {
return out
}

func (m *limitStage) shouldThrottle() bool {
func (m *limitStage) shouldThrottle(labels model.LabelSet) bool {
if m.cfg.ByLabelName != "" {
labelValue, ok := labels[model.LabelName(m.cfg.ByLabelName)]
if !ok {
return false // if no label found, dont ratelimit
}
rl := m.rateLimiterByLabel.GetOrCreate(labelValue)
if rl.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
m.dropCountByLabel.WithLabelValues(m.cfg.ByLabelName, string(labelValue)).Inc()
return true
}

if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
Expand All @@ -88,3 +137,9 @@ func (m *limitStage) shouldThrottle() bool {
func (m *limitStage) Name() string {
return StageTypeLimit
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
[]string{"label_name", "label_value"})
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
}
77 changes: 75 additions & 2 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,31 @@ pipeline_stages:
drop: true
`

var testLimitByLabelYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
rate: 1
burst: 1
drop: true
by_label_name: app
`

var testNonAppLogLine = `
{
"time":"2012-11-01T22:08:41+00:00",
"msg" : "Non app log line"
}
`

var plName = "testPipeline"

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
Expand All @@ -60,7 +81,6 @@ func TestLimitWaitPipeline(t *testing.T) {
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 10
Expand All @@ -75,3 +95,56 @@ func TestLimitDropPipeline(t *testing.T) {
assert.Len(t, out, 1)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitByLabelPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitByLabelPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitByLabelYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "poki"}, testMatchLogLineApp2, time.Now()))
}
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{}, testNonAppLogLine, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only one entry of each app will go through + all log lines without expected label
assert.Len(t, out, 2+logCount)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
assert.Equal(t, out[1].Line, testMatchLogLineApp2)
assert.Equal(t, out[3].Line, testNonAppLogLine)

var hasTotal, hasByLabel bool
mfs, _ := registry.Gather()
for _, mf := range mfs {
if *mf.Name == "logentry_dropped_lines_total" {
hasTotal = true
assert.Len(t, mf.Metric, 1)
assert.Equal(t, 8, int(mf.Metric[0].Counter.GetValue()))
} else if *mf.Name == "logentry_dropped_lines_by_label_total" {
hasByLabel = true
assert.Len(t, mf.Metric, 2)
assert.Equal(t, 4, int(mf.Metric[0].Counter.GetValue()))
assert.Equal(t, 4, int(mf.Metric[1].Counter.GetValue()))

assert.Equal(t, mf.Metric[0].Label[0].GetName(), "label_name")
assert.Equal(t, mf.Metric[0].Label[0].GetValue(), "app")
assert.Equal(t, mf.Metric[0].Label[1].GetName(), "label_value")
assert.Equal(t, mf.Metric[0].Label[1].GetValue(), "loki")

assert.Equal(t, mf.Metric[1].Label[0].GetName(), "label_name")
assert.Equal(t, mf.Metric[1].Label[0].GetValue(), "app")
assert.Equal(t, mf.Metric[1].Label[1].GetName(), "label_value")
assert.Equal(t, mf.Metric[1].Label[1].GetValue(), "poki")
}
}
assert.True(t, hasTotal)
assert.True(t, hasByLabel)
}
22 changes: 22 additions & 0 deletions docs/sources/clients/promtail/stages/limit.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ limit:

# The cap in the quantity of burst lines that Promtail will push to Loki
[burst: <int>]

# Ratelimit each label value independently. If label is not found, log line is not
# considered for ratelimiting. Drop must be true if this is set.
[by_label_name: <string>]

# When ratelimiting by label is enabled, keep track of this many last used labels
[max_distinct_labels: <int> | default = 10000]

# When drop is true, log lines that exceed the current rate limit will be discarded.
# When drop is false, log lines that exceed the current rate limit will only wait
Expand Down Expand Up @@ -56,3 +63,18 @@ Given the pipeline:
```

Would throttle any log line and drop logs when rate limit.

#### Ratelimit by a label
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth mentioning what is the difference between these two.

- limit:
    rate: 10
    burst: 10
    drop: true
    by_label_name: "namespace"

with this

match:
  selector: '{namespace="foo"}"'
  stages:
  - limit:
      rate: 5
      drop: true
match:
  selector: '{namespace!="foo"}"'
  stages:
  - limit:
      rate: 100
      drop: false

Like @liguozhong already mentioned, even my first thought would be to use latter config instead of this new config.

I see you explained, with by_label_name we can get separate ratelimiter per label.

Wondering what advantage does it serve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it allows you to have ratelimit per label without knowing labels value upfront. Lets say you can set ratelimit per namespace or app, but don't need to reconfigure loki every time new app comes into cluster.


Given the pipeline:

```yaml
- limit:
rate: 10
burst: 10
drop: true
by_label_name: "namespace"
```

Would ratelimit messages originating from each namespace independently.
Any message without namespace label will not be ratelimited.
39 changes: 39 additions & 0 deletions pkg/util/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package util

type GenerationalMap[K comparable, V any] struct {
oldgen map[K]V
newgen map[K]V

maxSize int
newV func() V
gcCb func()
}

// NewGenMap created which maintains at most maxSize recently used entries
func NewGenMap[K comparable, V any](maxSize int, newV func() V, gcCb func()) GenerationalMap[K, V] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would our first generic code in Loki I believe. Will wait for few more eyes from contributors.

Can we make it bit more readable may be? failing to understand what are newV and gcCb means here. Can we have may be better naming and some descriptive comment on those?

If we going with this generic map then, a have few tests for this map would be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just started to think why we need this Map in the first place? I see it's getting used only in one place and could be simple map[string]*rate.Limiter?. Trying to understand the rationale here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newV - new value
gcCb - garbage collector callback

I am happy to use any names you suggest for these or any other identifiers.

Purpose of this map is to keep maximum number of last used entries and GC the rest. Difference to LRU is that it doesn't pessimize common case of NOT reaching the limit.

return GenerationalMap[K, V]{
newgen: make(map[K]V),
maxSize: maxSize,
newV: newV,
gcCb: gcCb,
}
}

func (m *GenerationalMap[K, T]) GetOrCreate(key K) T {
v, ok := m.newgen[key]
if !ok {
if v, ok = m.oldgen[key]; !ok {
v = m.newV()
}
m.newgen[key] = v

if len(m.newgen) == m.maxSize {
m.oldgen = m.newgen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some mechanism to clean up oldgen after some time if it's untouched?
I think that it might be a case when we have a spike of values, so we populate oldgen, and after that we remove stabilise our values, but rateLimitters will live in oldgen forever...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially I could trigger gc on timer, but it doesn't feel right to do, because:

  • it affects ratelimiting accuracy
  • because of the above GC period likely need to be exposed as configuration variable for users to tune
  • net win is 1-2MiB tops in memory and with no win in CPU utilization

overall it doesn't worth it IMHO

m.newgen = make(map[K]T)
if m.gcCb != nil {
m.gcCb()
}
}
}
return v
}
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 20 additions & 0 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,23 @@ type CollectorVec interface {
prometheus.Collector
Delete(labels prometheus.Labels) bool
}

// RegisterCounterVec registers new CounterVec with given name,namespace and labels.
// If metric was already registered it returns existing instance.
func RegisterCounterVec(registerer prometheus.Registerer, namespace, name, help string, labels []string) *prometheus.CounterVec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: what's the problem in using existing prometheus.MustRegister helper here? I would let it panic in case of duplicate registration rather than handling it via promtheus.AlreadyRegisteredError handling. Am I missing any specific use case here?

Copy link
Contributor Author

@redbaron redbaron Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function mimics getDropcountMetric from match.go this stage used to use. Now that we need 2 metrics, I created this helper function keepin old behaviour exactly as is

vec := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: name,
Help: help,
}, labels)
err := registerer.Register(vec)
if err != nil {
if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
vec = existing.ExistingCollector.(*prometheus.CounterVec)
} else {
// Same behavior as MustRegister if the error is not for AlreadyRegistered
panic(err)
}
}
return vec
}