Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a chunk filter hook in the store. #3569

Merged
merged 4 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -207,6 +208,8 @@ func newStoreMock() *storeMock {
return &storeMock{}
}

func (s *storeMock) SetChunkFilterer(storage.RequestChunkFilterer) {}

func (s *storeMock) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
args := s.Called(ctx, req)
res := args.Get(0)
Expand Down
64 changes: 40 additions & 24 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type batchChunkIterator struct {
lastOverlapping []*LazyChunk
metrics *ChunkMetrics
matchers []*labels.Matcher
chunkFilterer ChunkFilterer

begun bool
ctx context.Context
Expand All @@ -101,21 +102,23 @@ func newBatchChunkIterator(
start, end time.Time,
metrics *ChunkMetrics,
matchers []*labels.Matcher,
chunkFilterer ChunkFilterer,
) *batchChunkIterator {
// __name__ is not something we filter by because it's a constant in loki
// and only used for upstream compatibility; therefore remove it.
// The same applies to the sharding label which is injected by the cortex storage code.
matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel)
res := &batchChunkIterator{
batchSize: batchSize,
metrics: metrics,
matchers: matchers,
start: start,
end: end,
direction: direction,
ctx: ctx,
chunks: lazyChunks{direction: direction, chunks: chunks},
next: make(chan *chunkBatch),
batchSize: batchSize,
metrics: metrics,
matchers: matchers,
start: start,
end: end,
direction: direction,
ctx: ctx,
chunks: lazyChunks{direction: direction, chunks: chunks},
next: make(chan *chunkBatch),
chunkFilterer: chunkFilterer,
}
sort.Sort(res.chunks)
return res
Expand Down Expand Up @@ -267,7 +270,7 @@ func (it *batchChunkIterator) nextBatch() *chunkBatch {
}
}
// download chunk for this batch.
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, batch, it.matchers)
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, batch, it.matchers, it.chunkFilterer)
if err != nil {
return &chunkBatch{err: err}
}
Expand Down Expand Up @@ -307,13 +310,14 @@ func newLogBatchIterator(
pipeline logql.Pipeline,
direction logproto.Direction,
start, end time.Time,
chunkFilterer ChunkFilterer,
) (iter.EntryIterator, error) {
ctx, cancel := context.WithCancel(ctx)
return &logBatchIterator{
pipeline: pipeline,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, metrics, matchers),
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, metrics, matchers, chunkFilterer),
}, nil
}

Expand Down Expand Up @@ -441,13 +445,14 @@ func newSampleBatchIterator(
matchers []*labels.Matcher,
extractor logql.SampleExtractor,
start, end time.Time,
chunkFilterer ChunkFilterer,
) (iter.SampleIterator, error) {
ctx, cancel := context.WithCancel(ctx)
return &sampleBatchIterator{
extractor: extractor,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers),
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer),
}, nil
}

Expand Down Expand Up @@ -506,7 +511,6 @@ func (it *sampleBatchIterator) Next() bool {

// newChunksIterator creates an iterator over a set of lazychunks.
func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIterator, error) {

iters, err := it.buildIterators(b.chunksBySeries, b.from, b.through, b.nextChunk)
if err != nil {
return nil, err
Expand All @@ -526,7 +530,6 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La
}
result = append(result, iterator)
}

}

return result, nil
Expand Down Expand Up @@ -565,7 +568,13 @@ func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels
return matchers
}

func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*LazyChunk, matchers []*labels.Matcher) (map[model.Fingerprint][][]*LazyChunk, error) {
func fetchChunkBySeries(
ctx context.Context,
metrics *ChunkMetrics,
chunks []*LazyChunk,
matchers []*labels.Matcher,
chunkFilter ChunkFilterer,
) (map[model.Fingerprint][][]*LazyChunk, error) {
chksBySeries := partitionBySeriesChunks(chunks)

// Make sure the initial chunks are loaded. This is not one chunk
Expand All @@ -576,7 +585,7 @@ func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*La

// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, metrics)
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, chunkFilter, metrics)

var allChunks []*LazyChunk
for _, series := range chksBySeries {
Expand All @@ -600,24 +609,31 @@ func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*La
func filterSeriesByMatchers(
chks map[model.Fingerprint][][]*LazyChunk,
matchers []*labels.Matcher,
chunkFilterer ChunkFilterer,
metrics *ChunkMetrics,
) map[model.Fingerprint][][]*LazyChunk {
var filteredSeries, filteredChks int

removeSeries := func(fp model.Fingerprint, chunks [][]*LazyChunk) {
delete(chks, fp)
filteredSeries++

for _, grp := range chunks {
filteredChks += len(grp)
}
}
outer:
for fp, chunks := range chks {
for _, matcher := range matchers {
if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {

delete(chks, fp)
filteredSeries++

for _, grp := range chunks {
filteredChks += len(grp)
}

removeSeries(fp, chunks)
continue outer
}
}
if chunkFilterer != nil && chunkFilterer.ShouldFilter(chunks[0][0].Chunk.Metric) {
removeSeries(fp, chunks)
continue outer
}
}
metrics.chunks.WithLabelValues(statusDiscarded).Add(float64(filteredChks))
metrics.series.WithLabelValues(statusDiscarded).Add(float64(filteredSeries))
Expand Down
13 changes: 4 additions & 9 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_batchIterSafeStart(t *testing.T) {
newLazyChunk(stream),
}

batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), NilMetrics, []*labels.Matcher{})
batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), NilMetrics, []*labels.Matcher{}, nil)

// if it was started already, we should see a panic before this
time.Sleep(time.Millisecond)
Expand All @@ -52,11 +52,9 @@ func Test_batchIterSafeStart(t *testing.T) {
batch.Start()

require.NotNil(t, batch.Next())

}

func Test_newLogBatchChunkIterator(t *testing.T) {

tests := map[string]struct {
chunks []*LazyChunk
expected []logproto.Stream
Expand Down Expand Up @@ -946,7 +944,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.NewNoopPipeline(), tt.direction, tt.start, tt.end)
it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.NewNoopPipeline(), tt.direction, tt.start, tt.end, nil)
require.NoError(t, err)
streams, _, err := iter.ReadBatch(it, 1000)
_ = it.Close()
Expand All @@ -955,13 +953,11 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
}

assertStream(t, tt.expected, streams.Streams)

})
}
}

func Test_newSampleBatchChunkIterator(t *testing.T) {

tests := map[string]struct {
chunks []*LazyChunk
expected []logproto.Series
Expand Down Expand Up @@ -1234,7 +1230,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
require.NoError(t, err)

it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end)
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end, nil)
require.NoError(t, err)
series, _, err := iter.ReadSampleBatch(it, 1000)
_ = it.Close()
Expand All @@ -1243,7 +1239,6 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
}

assertSeries(t, tt.expected, series.Series)

})
}
}
Expand Down Expand Up @@ -1514,7 +1509,7 @@ func TestBatchCancel(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
it, err := newLogBatchIterator(ctx, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now())
it, err := newLogBatchIterator(ctx, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now(), nil)
require.NoError(t, err)
defer require.NoError(t, it.Close())
for it.Next() {
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func getStore() (lstore.Store, error) {
}

func fillStore() error {

store, err := getStore()
if err != nil {
return err
Expand Down Expand Up @@ -134,7 +133,6 @@ func fillStore() error {
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
}
}

}(i)

}
Expand Down
41 changes: 37 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,26 @@ type Store interface {
SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
GetSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error)
GetSchemaConfigs() []chunk.PeriodConfig
SetChunkFilterer(chunkFilter RequestChunkFilterer)
}

// RequestChunkFilterer creates ChunkFilterer for a given request context.
type RequestChunkFilterer interface {
ForRequest(ctx context.Context) ChunkFilterer
}

// ChunkFilterer filters chunks based on the metric.
type ChunkFilterer interface {
ShouldFilter(metric labels.Labels) bool
}

type store struct {
chunk.Store
cfg Config
chunkMetrics *ChunkMetrics
schemaCfg SchemaConfig

chunkFilterer RequestChunkFilterer
}

// NewStore creates a new Loki Store using configuration supplied.
Expand Down Expand Up @@ -148,6 +161,10 @@ func decodeReq(req logql.QueryParams) ([]*labels.Matcher, model.Time, model.Time
return matchers, from, through, nil
}

func (s *store) SetChunkFilterer(chunkFilterer RequestChunkFilterer) {
s.chunkFilterer = chunkFilterer
}

// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries`
func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down Expand Up @@ -230,6 +247,11 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]log
split = len(firstChunksPerSeries)
}

var chunkFilterer ChunkFilterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
}

for split > 0 {
groups = append(groups, firstChunksPerSeries[:split])
firstChunksPerSeries = firstChunksPerSeries[split:]
Expand All @@ -252,6 +274,10 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]log
}
}

if chunkFilterer != nil && chunkFilterer.ShouldFilter(chk.Chunk.Metric) {
continue outer
}

m := chk.Chunk.Metric.Map()
delete(m, labels.MetricName)
results = append(results, logproto.SeriesIdentifier{
Expand All @@ -261,7 +287,6 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]log
}
sort.Sort(results)
return results, nil

}

// SelectLogs returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront
Expand Down Expand Up @@ -290,9 +315,12 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter
if len(lazyChunks) == 0 {
return iter.NoopIterator, nil
}
var chunkFilterer ChunkFilterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
}

return newLogBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End)

return newLogBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer)
}

func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
Expand All @@ -319,7 +347,12 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
if len(lazyChunks) == 0 {
return iter.NoopIterator, nil
}
return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End)
var chunkFilterer ChunkFilterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
}

return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer)
}

func (s *store) GetSchemaConfigs() []chunk.PeriodConfig {
Expand Down
Loading