Skip to content

Commit

Permalink
cherry pick store gateway fix to release 0.30 (thanos-io#6089)
Browse files Browse the repository at this point in the history
* Fix: Failure to close BlockSeriesClient cause store-gateway deadlock (thanos-io#6086)

* Fix: Failure to close BlockSeriesClient cause store-gateway deadlock

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Adding tests

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* reverting the change on get series

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fix lint

Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
Signed-off-by: Ben Ye <benye@amazon.com>
Co-authored-by: Alan Protasio <alanprot@gmail.com>
  • Loading branch information
2 people authored and Nathaniel Graham committed May 17, 2023
1 parent 37984b9 commit 6f800ab
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#6066](https://github.com/thanos-io/thanos/pull/6066) Tracing: fixed panic because of nil sampler
- [#6086](https://github.com/thanos-io/thanos/pull/6086) Store Gateway: Fix store-gateway deadlock due to not close BlockSeriesClient

## [v0.30.1](https://github.com/thanos-io/thanos/tree/release-0.30) - 4.01.2023

Expand Down
51 changes: 51 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
resHints.AddQueriedBlock(b.meta.ULID)
}

<<<<<<< HEAD
var chunkr *bucketChunkReader
// We must keep the readers open until all their data has been sent.
indexr := b.indexReader()
Expand All @@ -1077,6 +1078,32 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.mint": b.meta.MinTime,
"block.maxt": b.meta.MaxTime,
"block.resolution": b.meta.Thanos.Downsample.Resolution,
=======
shardMatcher := req.ShardInfo.Matcher(&s.buffers)

blockClient := newBlockSeriesClient(
srv.Context(),
s.logger,
blk,
req,
chunksLimiter,
bytesLimiter,
shardMatcher,
s.enableChunkHashCalculation,
s.seriesBatchSize,
s.metrics.chunkFetchDuration,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
"block.id": blk.meta.ULID,
"block.mint": blk.meta.MinTime,
"block.maxt": blk.meta.MaxTime,
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
>>>>>>> 6cfab683 (cherry pick store gateway fix to release 0.30 (#6089))
})
defer span.Finish()

Expand Down Expand Up @@ -1302,11 +1329,23 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

result = strutil.MergeSlices(res, extRes)
} else {
<<<<<<< HEAD
seriesSet, _, err := blockSeries(
newCtx,
b.extLset,
indexr,
nil,
=======
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
>>>>>>> 6cfab683 (cherry pick store gateway fix to release 0.30 (#6089))
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
Expand Down Expand Up @@ -1470,11 +1509,23 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
result = res
} else {
<<<<<<< HEAD
seriesSet, _, err := blockSeries(
newCtx,
b.extLset,
indexr,
nil,
=======
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
>>>>>>> 6cfab683 (cherry pick store gateway fix to release 0.30 (#6089))
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
Expand Down
23 changes: 23 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -764,6 +765,10 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelNames(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, vals.Names)
Expand Down Expand Up @@ -867,6 +872,10 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelValues(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, emptyToNil(vals.Values))
Expand All @@ -881,3 +890,17 @@ func emptyToNil(values []string) []string {
}
return values
}

func waitTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(timeout):
t.Fatalf("timeout waiting wg for %v", timeout)
}
}

0 comments on commit 6f800ab

Please sign in to comment.