Skip to content

Commit

Permalink
Label Volume Endpoint (#9588)
Browse files Browse the repository at this point in the history
For a given set of matchers, returns the top N associated label/value
pairs by volume. A query for `{cluster=prod}` will return

```
cluster=prod: size (total logs matching this matcher)
 .
 .
 .
nth-label=nth-value
```

This is to service use cases where users want to understand where their
log volume has come from by label without making multiple requests to
the stats endpoint.

Note: This PR is a monster but it's mostly plumbing. I've pointed out
the most interesting bits that actually get the volumes from
ingesters/indexs
  • Loading branch information
MasslessParticle authored Jun 12, 2023
1 parent 4d997a5 commit 065bee7
Show file tree
Hide file tree
Showing 85 changed files with 3,868 additions and 447 deletions.
1 change: 1 addition & 0 deletions clients/pkg/promtail/client/client_writeto.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/util"
)
Expand Down
4 changes: 3 additions & 1 deletion clients/pkg/promtail/client/client_writeto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package client

import (
"fmt"
"go.uber.org/atomic"
"math/rand"
"os"
"sync"
"testing"
"time"

"go.uber.org/atomic"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
)
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/promtail_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/utils"
"github.com/grafana/loki/clients/pkg/promtail/wal"

"github.com/grafana/loki/pkg/push"
util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/server/ui/assets_vfsdata.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Code generated by vfsgen; DO NOT EDIT.

//go:build !dev
// +build !dev

package ui
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/targets/azureeventhubs/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/logproto"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package azureeventhubs

import (
"errors"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
)

func Test_validateConfig(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/targets/kafka/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/logproto"
)

Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/targets/kafka/target_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/util"
)

Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/targets/windows/win_eventlog/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

// Event is the event entry representation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
3 changes: 2 additions & 1 deletion clients/pkg/promtail/targets/windows/win_eventlog/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//go:build !windows
// +build !windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//go:build windows
// +build windows

//revive:disable-next-line:var-naming
// Package win_eventlog Input plugin to collect Windows Event Log messages
//
//revive:disable-next-line:var-naming
package win_eventlog

import (
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/wal/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
Expand Down
1 change: 1 addition & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
)

Expand Down
1 change: 1 addition & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/querylimits"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/writefailures/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Man
logger = log.With(logger, "insight", "true")
}

strat := newStrategy(cfg.LogRate.Val(), float64(cfg.LogRate.Val()))
strategy := newStrategy(cfg.LogRate.Val(), float64(cfg.LogRate.Val()))

return &Manager{
limiter: limiter.NewRateLimiter(strat, time.Minute),
limiter: limiter.NewRateLimiter(strategy, time.Minute),
logger: logger,
tenantCfgs: tenants,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ func (s *testStore) Stats(ctx context.Context, userID string, from, through mode
return &stats.Stats{}, nil
}

func (s *testStore) LabelVolume(ctx context.Context, userID string, from, through model.Time, limit int32, matchers ...*labels.Matcher) (*logproto.LabelVolumeResponse, error) {
return &logproto.LabelVolumeResponse{}, nil
}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}

Expand Down
47 changes: 47 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/grafana/loki/pkg/storage/stores/index/labelvolume"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
Expand Down Expand Up @@ -171,6 +173,7 @@ type ChunkStore interface {
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error)
GetSchemaConfigs() []config.PeriodConfig
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error)
LabelVolume(ctx context.Context, userID string, from, through model.Time, limit int32, matchers ...*labels.Matcher) (*logproto.LabelVolumeResponse, error)
}

// Interface is an interface for the Ingester
Expand Down Expand Up @@ -1137,6 +1140,50 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
return &merged, nil
}

func (i *Ingester) GetLabelVolume(ctx context.Context, req *logproto.LabelVolumeRequest) (*logproto.LabelVolumeResponse, error) {
user, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

instance, err := i.GetOrCreateInstance(user)
if err != nil {
return nil, err
}

matchers, err := syntax.ParseMatchers(req.Matchers)
if err != nil && req.Matchers != labelvolume.MatchAny {
return nil, err
}

type f func() (*logproto.LabelVolumeResponse, error)
jobs := []f{
f(func() (*logproto.LabelVolumeResponse, error) {
return instance.GetLabelVolume(ctx, req)
}),
f(func() (*logproto.LabelVolumeResponse, error) {
return i.store.LabelVolume(ctx, user, req.From, req.Through, req.Limit, matchers...)
}),
}
resps := make([]*logproto.LabelVolumeResponse, len(jobs))

if err := concurrency.ForEachJob(
ctx,
len(jobs),
2,
func(ctx context.Context, idx int) error {
res, err := jobs[idx]()
resps[idx] = res
return err
},
); err != nil {
return nil, err
}

merged := labelvolume.Merge(resps, req.Limit)
return merged, nil
}

// Watch implements grpc_health_v1.HealthCheck.
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return nil
Expand Down
70 changes: 69 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,21 @@ func (s *mockStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher {
}

func (s *mockStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
return &stats.Stats{}, nil
return &stats.Stats{
Streams: 2,
Chunks: 5,
Bytes: 25,
Entries: 100,
}, nil
}

func (s *mockStore) LabelVolume(ctx context.Context, userID string, from, through model.Time, limit int32, matchers ...*labels.Matcher) (*logproto.LabelVolumeResponse, error) {
return &logproto.LabelVolumeResponse{
Volumes: []logproto.LabelVolume{
{Name: "foo", Value: "bar", Volume: 38},
},
Limit: limit,
}, nil
}

func (s *mockStore) Stop() {}
Expand Down Expand Up @@ -1040,6 +1054,60 @@ func Test_DedupeIngesterParser(t *testing.T) {
})
}

func TestStats(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)

i.instances["test"] = defaultInstance(t)

ctx := user.InjectOrgID(context.Background(), "test")

resp, err := i.GetStats(ctx, &logproto.IndexStatsRequest{
From: 0,
Through: 11000,
Matchers: `{host="agent"}`,
})
require.NoError(t, err)

require.Equal(t, &logproto.IndexStatsResponse{
Streams: 4,
Chunks: 7,
Bytes: 185,
Entries: 110,
}, resp)
}

func TestLabelVolume(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)

i.instances["test"] = defaultInstance(t)

ctx := user.InjectOrgID(context.Background(), "test")
volumes, err := i.GetLabelVolume(ctx, &logproto.LabelVolumeRequest{
From: 0,
Through: 10000,
Matchers: "{}",
Limit: 4,
})
require.NoError(t, err)

require.Equal(t, []logproto.LabelVolume{
{Name: "host", Value: "agent", Volume: 160},
{Name: "job", Value: "3", Volume: 160},
{Name: "log_stream", Value: "dispatcher", Volume: 90},
{Name: "log_stream", Value: "worker", Volume: 70},
}, volumes.Volumes)
}

type ingesterClient struct {
logproto.PusherClient
logproto.QuerierClient
Expand Down
Loading

0 comments on commit 065bee7

Please sign in to comment.