Skip to content

Commit

Permalink
feat: aggregate byte and count metrics
Browse files Browse the repository at this point in the history
* aggregate in the pattern ingester and push back into Loki as a stream
  • Loading branch information
trevorwhitney committed Aug 1, 2024
1 parent b49e52b commit ce87fd8
Show file tree
Hide file tree
Showing 15 changed files with 1,256 additions and 96 deletions.
2 changes: 1 addition & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
log_push_observations: true
loki_address: localhost:3100

ruler:
alertmanager_url: http://localhost:9093
Expand Down
16 changes: 16 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,22 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return nil, "", 0, err
}

// We do not want to count service_name added by us in the stream limit so adding it after validating original labels.
// We also don't want to add service name to aggregated metrics.
if !ls.Has(push.LabelServiceName) && !ls.Has(push.AggregatedMetricLabel) &&
len(vContext.discoverServiceName) > 0 {
serviceName := push.ServiceUnknown
for _, labelName := range vContext.discoverServiceName {
if labelVal := ls.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

ls = labels.NewBuilder(ls).Set(push.LabelServiceName, serviceName).Labels()
stream.Labels = ls.String()
}

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
Expand Down
130 changes: 111 additions & 19 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"

"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
Expand Down Expand Up @@ -150,6 +150,7 @@ func Test_IncrementTimestamp(t *testing.T) {

defaultLimits := &validation.Limits{}
flagext.DefaultValues(defaultLimits)
now := time.Now()
defaultLimits.DiscoverLogLevels = false

tests := map[string]struct {
Expand Down Expand Up @@ -397,6 +398,34 @@ func Test_IncrementTimestamp(t *testing.T) {
},
},
},
"default limit adding service_name label": {
limits: defaultLimits,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\", service_name=\"foo\"}",
Hash: 0x86ca305b6d86e8b0,
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -519,46 +548,40 @@ func Test_SortLabelsOnPush(t *testing.T) {
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels)
})
}

func Test_TruncateLogLines(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
t.Run("with service_name added during ingestion", func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.MaxLineSize = 5
limits.MaxLineSizeTruncate = true
return limits, &mockIngester{}
}

t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) {
limits, ingester := setup()
ingester := &mockIngester{}
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", x="y", a="b"}`
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
topVal := ingester.Peek()
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels)
})
}

func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
func Test_TruncateLogLines(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.MaxLineSize = 5
limits.MaxLineSizeTruncate = true
return limits, &mockIngester{}
}

t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) {
t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) {
limits, ingester := setup()
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10)))
require.NoError(t, err)
topVal := ingester.Peek()
require.Nil(t, topVal)
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
})
}

Expand Down Expand Up @@ -838,9 +861,53 @@ func TestParseStreamLabels(t *testing.T) {
expectedErr error
generateLimits func() *validation.Limits
}{
{
name: "service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{foo="bar"}`,
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
},
},
{
name: "no labels defined - service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{}`,
expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
name: "service name label enabled",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: loghttp_push.LabelServiceName,
Value: loghttp_push.ServiceUnknown,
},
},
},
{
name: "service name label should not get counted against max labels count",
origLabels: `{foo="bar", service_name="unknown_service"}`,
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand All @@ -858,6 +925,31 @@ func TestParseStreamLabels(t *testing.T) {
},
},
},
{
name: "use label service as service name",
origLabels: `{container="nginx", foo="bar", service="auth"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "container",
Value: "nginx",
},
{
Name: "foo",
Value: "bar",
},
{
Name: "service",
Value: "auth",
},
{
Name: loghttp_push.LabelServiceName,
Value: "auth",
},
},
},
} {
limits := tc.generateLimits()
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,17 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}

// Skip validation for aggregated metric streams, as we create those for internal use
if ls.Has(push.AggregatedMetricLabel) {
return nil
}

numLabelNames := len(ls)
// This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label
// if incoming requests already have a service_name
if ls.Has(push.LabelServiceName) {
numLabelNames--
}

if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
Expand Down
Loading

0 comments on commit ce87fd8

Please sign in to comment.