diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cfcf27805..551be1b6af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4811](https://github.com/thanos-io/thanos/pull/4811) Query: Fix data race in metadata, rules, and targets servers. - [#4795](https://github.com/thanos-io/thanos/pull/4795) Query: Fix deadlock in endpointset. - [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory. +- [#4962](https://github.com/thanos-io/thanos/pull/4962) Compact/downsample: fix deadlock if error occurs with some backlog of blocks; fixes [this pull request](https://github.com/thanos-io/thanos/pull/4430). Affected versions are 0.22.0 - 0.23.1. ### Changed diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 4c1d6affed..5a9eaa2cf0 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sort" + "sync" "time" extflag "github.com/efficientgo/tools/extkingpin" @@ -20,13 +21,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" - "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" @@ -229,90 +230,104 @@ func downsampleBucket( }) var ( - eg errgroup.Group - ch = make(chan *metadata.Meta, downsampleConcurrency) + wg sync.WaitGroup + metaCh = make(chan *metadata.Meta) + downsampleErrs errutil.MultiError + errCh = make(chan error, downsampleConcurrency) + workerCtx, workerCancel = context.WithCancel(ctx) ) + defer workerCancel() + level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency) for i := 0; i < downsampleConcurrency; i++ { - eg.Go(func() error { - for m := range ch { + wg.Add(1) + go func() { + defer wg.Done() + for m := range metaCh { resolution := downsample.ResLevel1 errMsg := "downsampling to 5 min" if m.Thanos.Downsample.Resolution == downsample.ResLevel1 { resolution = downsample.ResLevel2 errMsg = "downsampling to 60 min" } - if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil { + if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil { metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() - return errors.Wrap(err, errMsg) + errCh <- errors.Wrap(err, errMsg) } metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() } - return nil - }) + }() } // Workers scheduled, distribute blocks. - eg.Go(func() error { - defer close(ch) - for _, mk := range metasULIDS { - m := metas[mk] +metaSendLoop: + for _, mk := range metasULIDS { + m := metas[mk] - switch m.Thanos.Downsample.Resolution { - case downsample.ResLevel2: - continue + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel2: + continue - case downsample.ResLevel0: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources5m[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { - continue + case downsample.ResLevel0: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources5m[id]; !ok { + missing = true + break } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { + continue + } - case downsample.ResLevel1: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources1h[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { - continue + case downsample.ResLevel1: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources1h[id]; !ok { + missing = true + break } } - - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- m: + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { + continue } } - return nil - }) - if err := eg.Wait(); err != nil { - return errors.Wrap(err, "downsample bucket") + select { + case <-workerCtx.Done(): + downsampleErrs.Add(workerCtx.Err()) + break metaSendLoop + case metaCh <- m: + case downsampleErr := <-errCh: + downsampleErrs.Add(downsampleErr) + break metaSendLoop + } } - return nil + + close(metaCh) + wg.Wait() + workerCancel() + close(errCh) + + // Collect any other error reported by the workers. + for downsampleErr := range errCh { + downsampleErrs.Add(downsampleErr) + } + + return downsampleErrs.Err() } func processDownsampling( diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 73a7d656ec..18dd566aa6 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -5,9 +5,12 @@ package main import ( "context" + "fmt" + "io" "io/ioutil" "os" "path" + "strings" "testing" "time" @@ -26,6 +29,144 @@ import ( "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) +type erroringBucket struct { + bkt objstore.InstrumentedBucket +} + +func (b *erroringBucket) Close() error { + return b.bkt.Close() +} + +// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// thanos_objstore_bucket_operation_failures_total metric. +func (b *erroringBucket) WithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return b.bkt.WithExpectedErrs(f) +} + +// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// thanos_objstore_bucket_operation_failures_total metric. +func (b *erroringBucket) ReaderWithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return b.bkt.ReaderWithExpectedErrs(f) +} + +func (b *erroringBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return b.bkt.Iter(ctx, dir, f, options...) +} + +// Get returns a reader for the given object name. +func (b *erroringBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + if strings.Contains(name, "chunk") { + return nil, fmt.Errorf("some random error has occurred") + } + return b.bkt.Get(ctx, name) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *erroringBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if strings.Contains(name, "chunk") { + return nil, fmt.Errorf("some random error has occurred") + } + return b.bkt.GetRange(ctx, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *erroringBucket) Exists(ctx context.Context, name string) (bool, error) { + return b.bkt.Exists(ctx, name) +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *erroringBucket) IsObjNotFoundErr(err error) bool { + return b.bkt.IsObjNotFoundErr(err) +} + +// Attributes returns information about the specified object. +func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + return b.bkt.Attributes(ctx, name) +} + +// Upload the contents of the reader as an object into the bucket. +// Upload should be idempotent. +func (b *erroringBucket) Upload(ctx context.Context, name string, r io.Reader) error { + return b.bkt.Upload(ctx, name, r) +} + +// Delete removes the object with the given name. +// If object does not exists in the moment of deletion, Delete should throw error. +func (b *erroringBucket) Delete(ctx context.Context, name string) error { + return b.bkt.Delete(ctx, name) +} + +// Name returns the bucket name for the provider. +func (b *erroringBucket) Name() string { + return b.bkt.Name() +} + +// Ensures that downsampleBucket() stops its work properly +// after an error occurs with some blocks in the backlog. +// Testing for https://github.com/thanos-io/thanos/issues/4960. +func TestRegression4960_Deadlock(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + dir, err := ioutil.TempDir("", "test-compact-cleanup") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + bkt = &erroringBucket{bkt: bkt} + var id, id2, id3 ulid.ULID + { + id, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "1"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "1"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc)) + } + { + id2, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "2"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "2"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc)) + } + { + id3, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "2"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "2"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc)) + } + + meta, err := block.DownloadMeta(ctx, logger, bkt, id) + testutil.Ok(t, err) + + metrics := newDownsampleMetrics(prometheus.NewRegistry()) + testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos)))) + metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil) + testutil.Ok(t, err) + + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc) + testutil.NotOk(t, err) + + testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred")) + +} + func TestCleanupDownsampleCacheFolder(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir, err := ioutil.TempDir("", "test-compact-cleanup")