From c5acb9cd5a01c01728dc48af3eb54f428adba3f9 Mon Sep 17 00:00:00 2001 From: xjewer Date: Wed, 6 Feb 2019 20:05:18 +0000 Subject: [PATCH] compact: Improved memory usage while downsampling (#529) Add instant writer implementation to shrink memory consumption during the downsampling stage. Encoded chunks are written to chunks blob files right away after series was handled. Flush method closes chunk writer and sync all symbols, series, labels, posting and meta data to files. It still works in one thread, hence operates only on one core. Estimated memory consumption is unlikely more than 1Gb, but depends on data set, labels size and series' density: chunk data size (512MB) + encoded buffers + index data Fixes #297 * compact: clarify purpose of streamed block writer Add comments and close resources properly. * downsample: fix postings index Use proper posting index to fetch series data with label set and chunks * Add stream writer an ability to write index data right during the downsampling process. One of the trade-offs is to preserve symbols from raw blocks, as we have to write them before preserving the series. Stream writer allows downsample a huge data blocks with no needs to keep all series in RAM, the only need it preserve label values and postings references. * fix nitpicks * downsampling: simplify StreamedBlockWriter interface Reduce of use public Flush method to finalize index and meta files. In case of error, a caller has to remove block directory with a preserved garbage inside. Rid of use tmp directories and renaming, syncing the final block on disk before upload. --- .errcheck_excludes.txt | 2 +- cmd/thanos/downsample.go | 4 +- pkg/block/metadata/meta.go | 7 +- pkg/compact/downsample/downsample.go | 238 +++++----------- pkg/compact/downsample/downsample_test.go | 125 ++++++++- .../downsample/streamed_block_writer.go | 265 ++++++++++++++++++ 6 files changed, 460 insertions(+), 181 deletions(-) create mode 100644 pkg/compact/downsample/streamed_block_writer.go diff --git a/.errcheck_excludes.txt b/.errcheck_excludes.txt index 5fad7c252e..9e2e3a71e1 100644 --- a/.errcheck_excludes.txt +++ b/.errcheck_excludes.txt @@ -1,3 +1,3 @@ (github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log fmt.Fprintln -fmt.Fprint \ No newline at end of file +fmt.Fprint diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index e53cd7680b..fed8946bd4 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -173,7 +173,7 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil { - return err + return errors.Wrap(err, "downsampling to 5 min") } case 5 * 60 * 1000: @@ -194,7 +194,7 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil { - return err + return errors.Wrap(err, "downsampling to 60 min") } } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 44f1d3768d..0d8b22dd19 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -35,6 +35,11 @@ const ( MetaFilename = "meta.json" ) +const ( + // MetaVersion is a enumeration of versions supported by Thanos. + MetaVersion1 = iota + 1 +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) { if err := json.Unmarshal(b, &m); err != nil { return nil, err } - if m.Version != 1 { + if m.Version != MetaVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } return &m, nil diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index f5afecdcd0..2263d61359 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -2,21 +2,19 @@ package downsample import ( "math" - "path/filepath" - "sort" - - "github.com/improbable-eng/thanos/pkg/block/metadata" - - "github.com/prometheus/prometheus/pkg/value" - "github.com/prometheus/tsdb/chunkenc" - + "math/rand" "os" + "path/filepath" + "time" "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -53,15 +51,39 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") - rng := origMeta.MaxTime - origMeta.MinTime + // Generate new block id. + uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) - // Write downsampled data in a custom memory block where we have fine-grained control - // over created chunks. - // This is necessary since we need to inject special values at the end of chunks for - // some aggregations. - newb := newMemBlock() + // Create block directory to populate with chunks, meta and index files into. + blockDir := filepath.Join(dir, uid.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return id, errors.Wrap(err, "mkdir block dir") + } + + // Remove blockDir in case of errors. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + merr.Add(os.RemoveAll(blockDir)) + err = merr.Err() + } + }() + + // Copy original meta to the new one. Update downsampling resolution and ULID for a new block. + newMeta := *origMeta + newMeta.Thanos.Downsample.Resolution = resolution + newMeta.ULID = uid - pall, err := indexr.Postings(index.AllPostingsKey()) + // Writes downsampled chunks right into the files, avoiding excess memory allocation. + // Flushes index and meta data after aggregations. + streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta) + if err != nil { + return id, errors.Wrap(err, "get streamed block writer") + } + defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer") + + postings, err := indexr.Postings(index.AllPostingsKey()) if err != nil { return id, errors.Wrap(err, "get all postings list") } @@ -69,24 +91,25 @@ func Downsample( aggrChunks []*AggrChunk all []sample chks []chunks.Meta + lset labels.Labels ) - for pall.Next() { - var lset labels.Labels + for postings.Next() { + lset = lset[:0] chks = chks[:0] all = all[:0] aggrChunks = aggrChunks[:0] // Get series labels and chunks. Downsampled data is sensitive to chunk boundaries // and we need to preserve them to properly downsample previously downsampled data. - if err := indexr.Series(pall.At(), &lset, &chks); err != nil { - return id, errors.Wrapf(err, "get series %d", pall.At()) + if err := indexr.Series(postings.At(), &lset, &chks); err != nil { + return id, errors.Wrapf(err, "get series %d", postings.At()) } // While #183 exists, we sanitize the chunks we retrieved from the block // before retrieving their samples. for i, c := range chks { chk, err := chunkr.Chunk(c.Ref) if err != nil { - return id, errors.Wrapf(err, "get chunk %d", c.Ref) + return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, postings.At()) } chks[i].Chunk = chk } @@ -95,155 +118,41 @@ func Downsample( if origMeta.Thanos.Downsample.Resolution == 0 { for _, c := range chks { if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil { - return id, errors.Wrapf(err, "expand chunk %d", c.Ref) + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) } } - newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}) - continue - } - - // Downsample a block that contains aggregate chunks already. - for _, c := range chks { - aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) - } - res, err := downsampleAggr( - aggrChunks, - &all, - chks[0].MinTime, - chks[len(chks)-1].MaxTime, - origMeta.Thanos.Downsample.Resolution, - resolution, - ) - - if err != nil { - return id, errors.Wrap(err, "downsample aggregate block") + if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil { + return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At()) + } + } else { + // Downsample a block that contains aggregated chunks already. + for _, c := range chks { + aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) + } + downsampledChunks, err := downsampleAggr( + aggrChunks, + &all, + chks[0].MinTime, + chks[len(chks)-1].MaxTime, + origMeta.Thanos.Downsample.Resolution, + resolution, + ) + if err != nil { + return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At()) + } + if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil { + return id, errors.Wrapf(err, "write series: %d", postings.At()) + } } - newb.addSeries(&series{lset: lset, chunks: res}) - } - if pall.Err() != nil { - return id, errors.Wrap(pall.Err(), "iterate series set") - } - comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool()) - if err != nil { - return id, errors.Wrap(err, "create compactor") - } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) - if err != nil { - return id, errors.Wrap(err, "compact head") } - bdir := filepath.Join(dir, id.String()) - - var tmeta metadata.Thanos - tmeta = origMeta.Thanos - tmeta.Source = metadata.CompactorSource - tmeta.Downsample.Resolution = resolution - - _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) - if err != nil { - return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) + if postings.Err() != nil { + return id, errors.Wrap(postings.Err(), "iterate series set") } - if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return id, errors.Wrap(err, "remove tombstones") - } - return id, nil + id = uid + return } -// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface -// to allow tsdb.LeveledCompactor to persist the data as a block. -type memBlock struct { - // Dummies to implement unused methods. - tsdb.IndexReader - - symbols map[string]struct{} - postings []uint64 - series []*series - chunks []chunkenc.Chunk -} - -func newMemBlock() *memBlock { - return &memBlock{symbols: map[string]struct{}{}} -} - -func (b *memBlock) addSeries(s *series) { - sid := uint64(len(b.series)) - b.postings = append(b.postings, sid) - b.series = append(b.series, s) - - for _, l := range s.lset { - b.symbols[l.Name] = struct{}{} - b.symbols[l.Value] = struct{}{} - } - - for i, cm := range s.chunks { - cid := uint64(len(b.chunks)) - s.chunks[i].Ref = cid - b.chunks = append(b.chunks, cm.Chunk) - } -} - -func (b *memBlock) Postings(name, val string) (index.Postings, error) { - allName, allVal := index.AllPostingsKey() - - if name != allName || val != allVal { - return nil, errors.New("unsupported call to Postings()") - } - sort.Slice(b.postings, func(i, j int) bool { - return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 - }) - return index.NewListPostings(b.postings), nil -} - -func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { - if id >= uint64(len(b.series)) { - return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) - } - s := b.series[id] - - *lset = append((*lset)[:0], s.lset...) - *chks = append((*chks)[:0], s.chunks...) - - return nil -} - -func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { - if id >= uint64(len(b.chunks)) { - return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) - } - return b.chunks[id], nil -} - -func (b *memBlock) Symbols() (map[string]struct{}, error) { - return b.symbols, nil -} - -func (b *memBlock) SortedPostings(p index.Postings) index.Postings { - return p -} - -func (b *memBlock) Index() (tsdb.IndexReader, error) { - return b, nil -} - -func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { - return b, nil -} - -func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return emptyTombstoneReader{}, nil -} - -func (b *memBlock) Close() error { - return nil -} - -type emptyTombstoneReader struct{} - -func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } -func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } -func (emptyTombstoneReader) Total() uint64 { return 0 } -func (emptyTombstoneReader) Close() error { return nil } - // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. @@ -492,7 +401,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes return res, nil } -// expandChunkIterator reads all samples from the iterater and appends them to buf. +// expandChunkIterator reads all samples from the iterator and appends them to buf. // Stale markers and out of order samples are skipped. func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error { // For safety reasons, we check for each sample that it does not go back in time. @@ -614,11 +523,6 @@ type sample struct { v float64 } -type series struct { - lset labels.Labels - chunks []chunks.Meta -} - // CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing // values as counter reset. // Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index f55cfc7e61..d54d31f0e9 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -5,6 +5,7 @@ import ( "math" "os" "path/filepath" + "sort" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunkenc" @@ -68,30 +70,30 @@ func TestDownsampleAggr(t *testing.T) { { lset: labels.FromStrings("__name__", "a"), inAggr: map[AggrType][]sample{ - AggrCount: []sample{ + AggrCount: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrSum: []sample{ + AggrSum: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMin: []sample{ + AggrMin: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMax: []sample{ + AggrMax: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrCounter: []sample{ + AggrCounter: { {99, 100}, {299, 150}, {499, 210}, {499, 10}, // chunk 1 {599, 20}, {799, 50}, {999, 120}, {999, 50}, // chunk 2, no reset {1099, 40}, {1199, 80}, {1299, 110}, // chunk 3, reset }, }, output: map[AggrType][]sample{ - AggrCount: []sample{{499, 29}, {999, 100}}, - AggrSum: []sample{{499, 29}, {999, 100}}, - AggrMin: []sample{{499, -3}, {999, 0}}, - AggrMax: []sample{{499, 10}, {999, 100}}, - AggrCounter: []sample{{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, + AggrCount: {{499, 29}, {999, 100}}, + AggrSum: {{499, 29}, {999, 100}}, + AggrMin: {{499, -3}, {999, 0}}, + AggrMax: {{499, 10}, {999, 100}}, + AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, }, }, } @@ -375,3 +377,106 @@ func (it *sampleIterator) Seek(int64) bool { func (it *sampleIterator) At() (t int64, v float64) { return it.l[it.i].t, it.l[it.i].v } + +// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface +// to allow tsdb.StreamedBlockWriter to persist the data as a block. +type memBlock struct { + // Dummies to implement unused methods. + tsdb.IndexReader + + symbols map[string]struct{} + postings []uint64 + series []*series + chunks []chunkenc.Chunk + + numberOfChunks uint64 +} + +type series struct { + lset labels.Labels + chunks []chunks.Meta +} + +func newMemBlock() *memBlock { + return &memBlock{symbols: map[string]struct{}{}} +} + +func (b *memBlock) addSeries(s *series) { + sid := uint64(len(b.series)) + b.postings = append(b.postings, sid) + b.series = append(b.series, s) + + for _, l := range s.lset { + b.symbols[l.Name] = struct{}{} + b.symbols[l.Value] = struct{}{} + } + + for i, cm := range s.chunks { + s.chunks[i].Ref = b.numberOfChunks + b.chunks = append(b.chunks, cm.Chunk) + b.numberOfChunks++ + } +} + +func (b *memBlock) Postings(name, val string) (index.Postings, error) { + allName, allVal := index.AllPostingsKey() + + if name != allName || val != allVal { + return nil, errors.New("unsupported call to Postings()") + } + sort.Slice(b.postings, func(i, j int) bool { + return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 + }) + return index.NewListPostings(b.postings), nil +} + +func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { + if id >= uint64(len(b.series)) { + return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) + } + s := b.series[id] + + *lset = append((*lset)[:0], s.lset...) + *chks = append((*chks)[:0], s.chunks...) + + return nil +} + +func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { + if id >= uint64(b.numberOfChunks) { + return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) + } + + return b.chunks[id], nil +} + +func (b *memBlock) Symbols() (map[string]struct{}, error) { + return b.symbols, nil +} + +func (b *memBlock) SortedPostings(p index.Postings) index.Postings { + return p +} + +func (b *memBlock) Index() (tsdb.IndexReader, error) { + return b, nil +} + +func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { + return b, nil +} + +func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { + return emptyTombstoneReader{}, nil +} + +func (b *memBlock) Close() error { + return nil +} + +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go new file mode 100644 index 0000000000..2e2921ac34 --- /dev/null +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -0,0 +1,265 @@ +package downsample + +import ( + "io" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +type labelValues map[string]struct{} + +func (lv labelValues) add(value string) { + lv[value] = struct{}{} +} + +func (lv labelValues) get(set *[]string) { + for value := range lv { + *set = append(*set, value) + } +} + +type labelsValues map[string]labelValues + +func (lv labelsValues) add(labelSet labels.Labels) { + for _, label := range labelSet { + values, ok := lv[label.Name] + if !ok { + // Add new label. + values = labelValues{} + lv[label.Name] = values + } + values.add(label.Value) + } +} + +// streamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption +// by writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be +// sealed afterwards, when there aren't more series to process. +type streamedBlockWriter struct { + blockDir string + finalized bool // set to true, if Close was called + logger log.Logger + ignoreFinalize bool // if true Close does not finalize block due to internal error. + meta metadata.Meta + totalChunks uint64 + totalSamples uint64 + + chunkWriter tsdb.ChunkWriter + indexWriter tsdb.IndexWriter + indexReader tsdb.IndexReader + closers []io.Closer + + labelsValues labelsValues // labelsValues list of used label sets: name -> []values. + memPostings *index.MemPostings // memPostings contains references from label name:value -> postings. + postings uint64 // postings is a current posting position. +} + +// NewStreamedBlockWriter returns streamedBlockWriter instance, it's not concurrency safe. +// Caller is responsible to Close all io.Closers by calling the Close when downsampling is done. +// In case if error happens outside of the StreamedBlockWriter during the processing, +// index and meta files will be written anyway, so the caller is always responsible for removing block directory with +// a garbage on error. +// This approach simplifies StreamedBlockWriter interface, which is a best trade-off taking into account the error is an +// exception, not a general case. +func NewStreamedBlockWriter( + blockDir string, + indexReader tsdb.IndexReader, + logger log.Logger, + originMeta metadata.Meta, +) (w *streamedBlockWriter, err error) { + closers := make([]io.Closer, 0, 2) + + // We should close any opened Closer up to an error. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + for _, cl := range closers { + merr.Add(cl.Close()) + } + err = merr.Err() + } + }() + + chunkWriter, err := chunks.NewWriter(filepath.Join(blockDir, block.ChunksDirname)) + if err != nil { + return nil, errors.Wrap(err, "create chunk writer in streamedBlockWriter") + } + closers = append(closers, chunkWriter) + + indexWriter, err := index.NewWriter(filepath.Join(blockDir, block.IndexFilename)) + if err != nil { + return nil, errors.Wrap(err, "open index writer in streamedBlockWriter") + } + closers = append(closers, indexWriter) + + symbols, err := indexReader.Symbols() + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + err = indexWriter.AddSymbols(symbols) + if err != nil { + return nil, errors.Wrap(err, "add symbols") + } + + return &streamedBlockWriter{ + logger: logger, + blockDir: blockDir, + indexReader: indexReader, + indexWriter: indexWriter, + chunkWriter: chunkWriter, + meta: originMeta, + closers: closers, + labelsValues: make(labelsValues, 1024), + memPostings: index.NewUnorderedMemPostings(), + }, nil +} + +// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process. +func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error { + if w.finalized || w.ignoreFinalize { + return errors.Errorf("series can't be added, writers has been closed or internal error happened") + } + + if len(chunks) == 0 { + level.Warn(w.logger).Log("empty chunks happened, skip series", lset) + return nil + } + + if err := w.chunkWriter.WriteChunks(chunks...); err != nil { + w.ignoreFinalize = true + return errors.Wrap(err, "add chunks") + } + + if err := w.indexWriter.AddSeries(w.postings, lset, chunks...); err != nil { + w.ignoreFinalize = true + return errors.Wrap(err, "add series") + } + + w.labelsValues.add(lset) + w.memPostings.Add(w.postings, lset) + w.postings++ + + w.totalChunks += uint64(len(chunks)) + for i := range chunks { + w.totalSamples += uint64(chunks[i].Chunk.NumSamples()) + } + + return nil +} + +// Close calls finalizer to complete index and meta files and closes all io.CLoser writers. +// Idempotent. +func (w *streamedBlockWriter) Close() error { + if w.finalized { + return nil + } + + var merr tsdb.MultiError + w.finalized = true + + // Finalise data block only if there wasn't any internal errors. + if !w.ignoreFinalize { + merr.Add(w.finalize()) + } + + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + + return errors.Wrap(merr.Err(), "close closers") +} + +// finalize saves prepared index and meta data to corresponding files. +// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway, +// so it's a caller's responsibility to remove the block's directory. +func (w *streamedBlockWriter) finalize() error { + if err := w.writeLabelSets(); err != nil { + return errors.Wrap(err, "write label sets") + } + + if err := w.writeMemPostings(); err != nil { + return errors.Wrap(err, "write mem postings") + } + + if err := w.writeMetaFile(); err != nil { + return errors.Wrap(err, "write meta meta") + } + + if err := w.syncDir(); err != nil { + return errors.Wrap(err, "sync blockDir") + } + + level.Info(w.logger).Log( + "msg", "write downsampled block", + "mint", w.meta.MinTime, + "maxt", w.meta.MaxTime, + "ulid", w.meta.ULID, + "resolution", w.meta.Thanos.Downsample.Resolution, + ) + return nil +} + +// syncDir syncs blockDir on disk. +func (w *streamedBlockWriter) syncDir() (err error) { + df, err := fileutil.OpenDir(w.blockDir) + if err != nil { + return errors.Wrap(err, "open temporary block blockDir") + } + + defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir") + + if err := fileutil.Fsync(df); err != nil { + return errors.Wrap(err, "sync temporary blockDir") + } + + return nil +} + +// writeLabelSets fills the index writer with label sets. +func (w *streamedBlockWriter) writeLabelSets() error { + s := make([]string, 0, 256) + for n, v := range w.labelsValues { + s = s[:0] + v.get(&s) + if err := w.indexWriter.WriteLabelIndex([]string{n}, s); err != nil { + return errors.Wrap(err, "write label index") + } + } + return nil +} + +// writeMemPostings fills the index writer with mem postings. +func (w *streamedBlockWriter) writeMemPostings() error { + w.memPostings.EnsureOrder() + for _, l := range w.memPostings.SortedKeys() { + if err := w.indexWriter.WritePostings(l.Name, l.Value, w.memPostings.Get(l.Name, l.Value)); err != nil { + return errors.Wrap(err, "write postings") + } + } + return nil +} + +// writeMetaFile writes meta file. +func (w *streamedBlockWriter) writeMetaFile() error { + w.meta.Version = metadata.MetaVersion1 + w.meta.Thanos.Source = metadata.CompactorSource + w.meta.Stats.NumChunks = w.totalChunks + w.meta.Stats.NumSamples = w.totalSamples + w.meta.Stats.NumSeries = w.postings + + return metadata.Write(w.logger, w.blockDir, &w.meta) +}