Skip to content

Commit

Permalink
downsample: fix deadlock if error occurs (#4962)
Browse files Browse the repository at this point in the history
Fix deadlock that occurs if there is some backlog of blocks to be
deduplicated, and an error occurs. For this to trigger, there needs to
be at least `downsampleConcurrency + 2` blocks in the backlog.

Add test to cover this regression. Affected versions 0.22.0 - 0.23.1.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Dec 20, 2021
1 parent a6dd23a commit 5c38b9c
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4811](https://github.com/thanos-io/thanos/pull/4811) Query: Fix data race in metadata, rules, and targets servers.
- [#4795](https://github.com/thanos-io/thanos/pull/4795) Query: Fix deadlock in endpointset.
- [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory.
- [#4962](https://github.com/thanos-io/thanos/pull/4962) Compact/downsample: fix deadlock if error occurs with some backlog of blocks; fixes [this pull request](https://github.com/thanos-io/thanos/pull/4430). Affected versions are 0.22.0 - 0.23.1.

### Changed

Expand Down
131 changes: 73 additions & 58 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand All @@ -20,13 +21,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -229,90 +230,104 @@ func downsampleBucket(
})

var (
eg errgroup.Group
ch = make(chan *metadata.Meta, downsampleConcurrency)
wg sync.WaitGroup
metaCh = make(chan *metadata.Meta)
downsampleErrs errutil.MultiError
errCh = make(chan error, downsampleConcurrency)
workerCtx, workerCancel = context.WithCancel(ctx)
)

defer workerCancel()

level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency)
for i := 0; i < downsampleConcurrency; i++ {
eg.Go(func() error {
for m := range ch {
wg.Add(1)
go func() {
defer wg.Done()
for m := range metaCh {
resolution := downsample.ResLevel1
errMsg := "downsampling to 5 min"
if m.Thanos.Downsample.Resolution == downsample.ResLevel1 {
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, errMsg)
errCh <- errors.Wrap(err, errMsg)
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
}
return nil
})
}()
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
for _, mk := range metasULIDS {
m := metas[mk]
metaSendLoop:
for _, mk := range metasULIDS {
m := metas[mk]

switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue

case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}

case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}

select {
case <-ctx.Done():
return ctx.Err()
case ch <- m:
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
}
return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "downsample bucket")
select {
case <-workerCtx.Done():
downsampleErrs.Add(workerCtx.Err())
break metaSendLoop
case metaCh <- m:
case downsampleErr := <-errCh:
downsampleErrs.Add(downsampleErr)
break metaSendLoop
}
}
return nil

close(metaCh)
wg.Wait()
workerCancel()
close(errCh)

// Collect any other error reported by the workers.
for downsampleErr := range errCh {
downsampleErrs.Add(downsampleErr)
}

return downsampleErrs.Err()
}

func processDownsampling(
Expand Down
141 changes: 141 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"

Expand All @@ -26,6 +29,144 @@ import (
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

type erroringBucket struct {
bkt objstore.InstrumentedBucket
}

func (b *erroringBucket) Close() error {
return b.bkt.Close()
}

// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) WithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return b.bkt.WithExpectedErrs(f)
}

// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) ReaderWithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.bkt.ReaderWithExpectedErrs(f)
}

func (b *erroringBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bkt.Iter(ctx, dir, f, options...)
}

// Get returns a reader for the given object name.
func (b *erroringBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.Get(ctx, name)
}

// GetRange returns a new range reader for the given object name and range.
func (b *erroringBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.GetRange(ctx, name, off, length)
}

// Exists checks if the given object exists in the bucket.
func (b *erroringBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.bkt.Exists(ctx, name)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *erroringBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// Attributes returns information about the specified object.
func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bkt.Attributes(ctx, name)
}

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (b *erroringBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.bkt.Upload(ctx, name, r)
}

// Delete removes the object with the given name.
// If object does not exists in the moment of deletion, Delete should throw error.
func (b *erroringBucket) Delete(ctx context.Context, name string) error {
return b.bkt.Delete(ctx, name)
}

// Name returns the bucket name for the provider.
func (b *erroringBucket) Name() string {
return b.bkt.Name()
}

// Ensures that downsampleBucket() stops its work properly
// after an error occurs with some blocks in the backlog.
// Testing for https://github.com/thanos-io/thanos/issues/4960.
func TestRegression4960_Deadlock(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
bkt = &erroringBucket{bkt: bkt}
var id, id2, id3 ulid.ULID
{
id, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}
{
id2, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc))
}
{
id3, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)
testutil.NotOk(t, err)

testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred"))

}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
Expand Down

0 comments on commit 5c38b9c

Please sign in to comment.