From d8fe6c12dcfc3f922b6e5b7af835ee5069bd194b Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 24 Apr 2024 12:45:34 +0200 Subject: [PATCH] Correctly return unfiltered chunks for series Signed-off-by: Christian Haudum --- pkg/bloomgateway/querier.go | 5 +- pkg/bloomgateway/querier_test.go | 4 +- pkg/bloomgateway/resolver.go | 35 ++++++- pkg/bloomgateway/resolver_test.go | 156 ++++++++++++++++++++++++++++-- 4 files changed, 184 insertions(+), 16 deletions(-) diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index bbb9f7495d8e..a6209f9ccf34 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -99,7 +99,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from // only covers a single day, and if not, it's at most two days. for _, s := range partitionSeriesByDay(from, through, grouped) { day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day)) - blocks, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series) + blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series) if err != nil { return nil, err } @@ -121,6 +121,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from return nil, err } + // add chunk refs from series that were not mapped to any blocks + refs = append(refs, skipped...) + for i := range refs { seriesSeen[refs[i].Fingerprint] = struct{}{} for _, ref := range refs[i].Refs { diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index a27d90a02124..516f1cd403bb 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -40,7 +40,7 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In type mockBlockResolver struct{} // Resolve implements BlockResolver. -func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) { +func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) { day := truncateDay(interval.Start) first, last := getFirstLast(series) block := bloomshipper.BlockRef{ @@ -53,7 +53,7 @@ func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval blo Checksum: 0, }, } - return []blockWithSeries{{block: block, series: series}}, nil + return []blockWithSeries{{block: block, series: series}}, nil, nil } var _ BlockResolver = &mockBlockResolver{} diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 3c5d8853d9ab..c10ebc33dff3 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -15,7 +15,7 @@ import ( ) type BlockResolver interface { - Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) + Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) } type blockWithSeries struct { @@ -28,7 +28,7 @@ type defaultBlockResolver struct { logger log.Logger } -func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) { +func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) { minFp, maxFp := getFirstLast(series) metaSearch := bloomshipper.MetaSearchParams{ TenantID: tenant, @@ -52,10 +52,12 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter ) if err != nil { - return nil, err + return nil, series, err } - return blocksMatchingSeries(metas, interval, series), nil + mapped := blocksMatchingSeries(metas, interval, series) + skipped := unassignedSeries(mapped, series) + return mapped, skipped, nil } func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries { @@ -96,6 +98,31 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter return result } +func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { + skipped := make([]*logproto.GroupedChunkRefs, len(series)) + _ = copy(skipped, series) + + for _, block := range mapped { + minFp, maxFp := getFirstLast(block.series) + + minIdx := sort.Search(len(skipped), func(i int) bool { + return skipped[i].Fingerprint >= minFp.Fingerprint + }) + + maxIdx := sort.Search(len(skipped), func(i int) bool { + return skipped[i].Fingerprint >= maxFp.Fingerprint + }) + + if minIdx == len(skipped) || maxIdx == 0 || minIdx == maxIdx { + continue + } + + skipped = append(skipped[0:minIdx], skipped[maxIdx+1:]...) + } + + return skipped +} + func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver { return &defaultBlockResolver{ store: store, diff --git a/pkg/bloomgateway/resolver_test.go b/pkg/bloomgateway/resolver_test.go index a2cd422e1594..7214537d6885 100644 --- a/pkg/bloomgateway/resolver_test.go +++ b/pkg/bloomgateway/resolver_test.go @@ -11,18 +11,22 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" ) +func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef { + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: "tenant", + TableName: "table", + Bounds: v1.NewBounds(minFp, maxFp), + StartTimestamp: from, + EndTimestamp: through, + }, + } +} + func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.Meta { return bloomshipper.Meta{ Blocks: []bloomshipper.BlockRef{ - { - Ref: bloomshipper.Ref{ - TenantID: "tenant", - TableName: "table", - Bounds: v1.NewBounds(minFp, maxFp), - StartTimestamp: from, - EndTimestamp: through, - }, - }, + makeBlockRef(minFp, maxFp, from, through), }, } } @@ -113,3 +117,137 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) { require.Equal(t, expected, res) }) } + +func TestBlockResolver_UnassignedSeries(t *testing.T) { + series := []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + } + + testCases := []struct { + desc string + mapped []blockWithSeries + expected []*logproto.GroupedChunkRefs + }{ + { + desc: "no blocks - all unassigned", + mapped: []blockWithSeries{}, + expected: series, + }, + { + desc: "block has no overlapping series - all unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0xf0}, + {Fingerprint: 0xff}, + }, + }, + }, + expected: series, + }, + { + desc: "single block covering all series - no unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{}, + }, + { + desc: "multiple blocks covering all series - no unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{}, + }, + { + desc: "single block overlapping some series", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + { + desc: "multiple blocks overlapping some series", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0xe0}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + result := unassignedSeries(tc.mapped, series) + require.Equal(t, result, tc.expected) + }) + } +}