From caf4d963c6250e120f85db689c0fc9b3146fcf93 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Wed, 6 Feb 2019 02:00:16 +0000 Subject: [PATCH] downsampling: simplify StreamedBlockWriter interface Reduce of use public Flush method to finalize index and meta files. In case of error, a caller has to remove block directory with a preserved garbage inside. Rid of use tmp directories and renaming, syncing the final block on disk before upload. --- pkg/block/metadata/meta.go | 7 +- pkg/compact/downsample/downsample.go | 48 ++- pkg/compact/downsample/downsample_test.go | 9 +- .../downsample/streamed_block_writer.go | 310 +++++++----------- 4 files changed, 164 insertions(+), 210 deletions(-) diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 44f1d3768de..0d8b22dd190 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -35,6 +35,11 @@ const ( MetaFilename = "meta.json" ) +const ( + // MetaVersion is a enumeration of versions supported by Thanos. + MetaVersion1 = iota + 1 +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) { if err := json.Unmarshal(b, &m); err != nil { return nil, err } - if m.Version != 1 { + if m.Version != MetaVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } return &m, nil diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index e8b08917e3d..2263d613597 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -2,6 +2,10 @@ package downsample import ( "math" + "math/rand" + "os" + "path/filepath" + "time" "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block/metadata" @@ -47,9 +51,33 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") + // Generate new block id. + uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) + + // Create block directory to populate with chunks, meta and index files into. + blockDir := filepath.Join(dir, uid.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return id, errors.Wrap(err, "mkdir block dir") + } + + // Remove blockDir in case of errors. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + merr.Add(os.RemoveAll(blockDir)) + err = merr.Err() + } + }() + + // Copy original meta to the new one. Update downsampling resolution and ULID for a new block. + newMeta := *origMeta + newMeta.Thanos.Downsample.Resolution = resolution + newMeta.ULID = uid + // Writes downsampled chunks right into the files, avoiding excess memory allocation. - // Flushes index and meta data afterwards aggregations. - streamedBlockWriter, err := NewWriter(dir, indexr, logger, *origMeta, resolution) + // Flushes index and meta data after aggregations. + streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta) if err != nil { return id, errors.Wrap(err, "get streamed block writer") } @@ -93,11 +121,11 @@ func Downsample( return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) } } - if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil { + if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil { return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At()) } } else { - // Downsample a block that contains aggregate chunks already. + // Downsample a block that contains aggregated chunks already. for _, c := range chks { aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) } @@ -112,8 +140,8 @@ func Downsample( if err != nil { return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At()) } - if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil { - return id, errors.Wrapf(err, "downsample aggregated block, series: %d", postings.At()) + if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil { + return id, errors.Wrapf(err, "write series: %d", postings.At()) } } } @@ -121,12 +149,8 @@ func Downsample( return id, errors.Wrap(postings.Err(), "iterate series set") } - id, err = streamedBlockWriter.Flush() - if err != nil { - return id, errors.Wrap(err, "flush data in stream data") - } - - return id, nil + id = uid + return } // currentWindow returns the end timestamp of the window that t falls into. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 65e49831e04..d54d31f0e99 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -467,9 +467,16 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } + +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 5d51345ebe5..fd049b83c04 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -1,18 +1,14 @@ package downsample import ( - "encoding/json" "io" - "math/rand" - "os" "path/filepath" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" - "github.com/oklog/ulid" + "github.com/improbable-eng/thanos/pkg/runutil" "github.com/pkg/errors" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunks" @@ -47,88 +43,95 @@ func (lv labelsValues) add(labelSet labels.Labels) { } } -// StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption -// by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be +// streamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption +// by writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be // sealed afterwards, when there aren't more series to process. -type StreamedBlockWriter struct { - dir string - tmpDir string - logger log.Logger - uid ulid.ULID - - // postings is a current posting position. - postings uint64 +type streamedBlockWriter struct { + blockDir string + finalized bool // set to true, if Close was called + logger log.Logger + ignoreFinalize bool // if true Close does not finalize block due to internal error. + meta metadata.Meta + totalChunks uint64 + totalSamples uint64 chunkWriter tsdb.ChunkWriter indexWriter tsdb.IndexWriter indexReader tsdb.IndexReader closers []io.Closer - meta metadata.Meta - totalChunks uint64 - totalSamples uint64 - - // labelsValues list of used label sets: name -> []values. - labelsValues labelsValues - - // memPostings contains references from label name:value -> postings. - memPostings *index.MemPostings - sealed bool + labelsValues labelsValues // labelsValues list of used label sets: name -> []values. + memPostings *index.MemPostings // memPostings contains references from label name:value -> postings. + postings uint64 // postings is a current posting position. } -// NewWriter returns StreamedBlockWriter instance. -// Caller is responsible to finalize the writing with Flush method to write the meta and index file and Close all io.Closers -func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMeta metadata.Meta, resolution int64) (*StreamedBlockWriter, error) { - // change downsampling resolution to the new one. - originMeta.Thanos.Downsample.Resolution = resolution - - // Generate new block id. - uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) +// NewStreamedBlockWriter returns streamedBlockWriter instance, it's not concurrency safe. +// Caller is responsible to Close all io.Closers by calling the Close when downsampling is done. +// In case if error happens outside of the StreamedBlockWriter during the processing, +// index and meta files will be written anyway, so the caller is always responsible for removing block directory with +// a garbage on error. +// This approach simplifies StreamedBlockWriter interface, which is a best trade-off taking into account the error is an +// exception, not a general case. +func NewStreamedBlockWriter( + blockDir string, + indexReader tsdb.IndexReader, + logger log.Logger, + originMeta metadata.Meta, +) (w *streamedBlockWriter, err error) { + closers := make([]io.Closer, 0, 2) + + // We should close any opened Closer up to an error. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + for _, cl := range closers { + merr.Add(cl.Close()) + } + err = merr.Err() + } + }() - // Populate chunk, meta and index files into temporary directory with - // data of all blocks. - dir = filepath.Join(dir, uid.String()) - tmpDir, err := createTmpDir(dir) + chunkWriter, err := chunks.NewWriter(filepath.Join(blockDir, block.ChunksDirname)) if err != nil { - return nil, err - } - - sw := &StreamedBlockWriter{ - logger: l, - dir: dir, - indexReader: indexReader, - tmpDir: tmpDir, - uid: uid, - meta: originMeta, - closers: make([]io.Closer, 0), - labelsValues: make(labelsValues, 1024), - memPostings: index.NewUnorderedMemPostings(), + return nil, errors.Wrap(err, "create chunk writer in streamedBlockWriter") } + closers = append(closers, chunkWriter) - sw.chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) + indexWriter, err := index.NewWriter(filepath.Join(blockDir, block.IndexFilename)) if err != nil { - return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter") + return nil, errors.Wrap(err, "open index writer in streamedBlockWriter") } - sw.closers = append(sw.closers, sw.chunkWriter) + closers = append(closers, indexWriter) - sw.indexWriter, err = index.NewWriter(filepath.Join(tmpDir, block.IndexFilename)) + symbols, err := indexReader.Symbols() if err != nil { - return nil, errors.Wrap(err, "open index StreamedBlockWriter") + return nil, errors.Wrap(err, "read symbols") } - sw.closers = append(sw.closers, sw.indexWriter) - if err := sw.init(); err != nil { - return nil, err + err = indexWriter.AddSymbols(symbols) + if err != nil { + return nil, errors.Wrap(err, "add symbols") } - return sw, nil + return &streamedBlockWriter{ + logger: logger, + blockDir: blockDir, + indexReader: indexReader, + indexWriter: indexWriter, + chunkWriter: chunkWriter, + meta: originMeta, + closers: closers, + labelsValues: make(labelsValues, 1024), + memPostings: index.NewUnorderedMemPostings(), + }, nil } -// AddSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to -// labelsValues sets and memPostings to be written on the Flush state in the end of downsampling process. -func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta) error { - if w.sealed { - return errors.Errorf("Series can't be added, writers has been flushed|closed") +// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process. +func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error { + if w.finalized || w.ignoreFinalize { + return errors.Errorf("series can't be added, writers has been closed or internal error happened") } if len(chunks) == 0 { @@ -137,10 +140,12 @@ func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta } if err := w.chunkWriter.WriteChunks(chunks...); err != nil { - return errors.Wrap(err, "add series") + w.ignoreFinalize = true + return errors.Wrap(err, "add chunks") } if err := w.indexWriter.AddSeries(w.postings, lset, chunks...); err != nil { + w.ignoreFinalize = true return errors.Wrap(err, "add series") } @@ -156,25 +161,46 @@ func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta return nil } -// Flush saves prepared index and meta data to corresponding files. -// Be sure to call this, if all series have to be handled by this moment, you can't call AddSeries afterwards. -func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { - w.sealed = true +// Close calls finalizer to complete index and meta files and closes all io.CLoser writers. +// Idempotent. +func (w *streamedBlockWriter) Close() error { + if w.finalized { + return nil + } + var merr tsdb.MultiError + w.finalized = true + + // Finalise data block only if there wasn't any internal errors. + if !w.ignoreFinalize { + merr.Add(w.finalize()) + } + + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + + return errors.Wrap(merr.Err(), "close closers") +} + +// finalize saves prepared index and meta data to corresponding files. +// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway, +// so it's a caller's responsibility to remove the block's directory. +func (w *streamedBlockWriter) finalize() error { if err := w.writeLabelSets(); err != nil { - return w.uid, errors.Wrap(err, "write label sets") + return errors.Wrap(err, "write label sets") } if err := w.writeMemPostings(); err != nil { - return w.uid, errors.Wrap(err, "write mem postings") + return errors.Wrap(err, "write mem postings") } if err := w.writeMetaFile(); err != nil { - return w.uid, errors.Wrap(err, "write meta meta") + return errors.Wrap(err, "write meta meta") } - if err := w.finalize(); err != nil { - return w.uid, errors.Wrap(err, "sync and rename tmp dir") + if err := w.syncDir(); err != nil { + return errors.Wrap(err, "sync blockDir") } level.Info(w.logger).Log( @@ -184,79 +210,28 @@ func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { "ulid", w.meta.ULID, "resolution", w.meta.Thanos.Downsample.Resolution, ) - return w.uid, nil -} - -// Close closes all io.CLoser writers -func (w *StreamedBlockWriter) Close() error { - var merr tsdb.MultiError - w.sealed = true - - if w.tmpDir != "" { - merr.Add(os.RemoveAll(w.tmpDir)) - } - - for _, cl := range w.closers { - merr.Add(cl.Close()) - } - - w.chunkWriter = nil - w.indexWriter = nil - - return errors.Wrap(merr.Err(), "close closers") -} - -// init writes all available symbols in the beginning of the index file. -func (w *StreamedBlockWriter) init() error { - symbols, err := w.indexReader.Symbols() - if err != nil { - return errors.Wrap(err, "read symbols") - } - - if err := w.indexWriter.AddSymbols(symbols); err != nil { - return errors.Wrap(err, "add symbols") - } - return nil } -// finalize sync tmp dir on disk and rename to dir. -func (w *StreamedBlockWriter) finalize() error { - df, err := fileutil.OpenDir(w.tmpDir) +// syncDir syncs blockDir on disk. +func (w *streamedBlockWriter) syncDir() error { + df, err := fileutil.OpenDir(w.blockDir) if err != nil { - return errors.Wrap(err, "open temporary block dir") - } - defer func() { - if df != nil { - if err := df.Close(); err != nil { - log.Logger(w.logger).Log(err, "close temporary block dir") - } - } - }() - - if err := fileutil.Fsync(df); err != nil { - return errors.Wrap(err, "sync temporary dir") + return errors.Wrap(err, "open temporary block blockDir") } - // Close temp dir before rename block dir (for windows platform). - if err = df.Close(); err != nil { - return errors.Wrap(err, "close temporary dir") - } - df = nil + defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir") - // Block successfully written, make visible and remove old ones. - err = renameFile(w.tmpDir, w.dir) - // Assume we cleaned tmp dir up - w.tmpDir = "" + err = fileutil.Fsync(df) if err != nil { - return errors.Wrap(err, "rename block dir") + return errors.Wrap(err, "sync temporary blockDir") } - return nil + return err } // writeLabelSets fills the index writer with label sets. -func (w *StreamedBlockWriter) writeLabelSets() error { +func (w *streamedBlockWriter) writeLabelSets() error { s := make([]string, 0, 256) for n, v := range w.labelsValues { s = s[:0] @@ -269,7 +244,7 @@ func (w *StreamedBlockWriter) writeLabelSets() error { } // writeMemPostings fills the index writer with mem postings. -func (w *StreamedBlockWriter) writeMemPostings() error { +func (w *streamedBlockWriter) writeMemPostings() error { w.memPostings.EnsureOrder() for _, l := range w.memPostings.SortedKeys() { if err := w.indexWriter.WritePostings(l.Name, l.Value, w.memPostings.Get(l.Name, l.Value)); err != nil { @@ -279,70 +254,13 @@ func (w *StreamedBlockWriter) writeMemPostings() error { return nil } -// TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. -// writeMetaFile writes meta file -func (w *StreamedBlockWriter) writeMetaFile() error { - var merr tsdb.MultiError - - w.meta.ULID = w.uid - w.meta.Version = 1 +// writeMetaFile writes meta file. +func (w *streamedBlockWriter) writeMetaFile() error { + w.meta.Version = metadata.MetaVersion1 w.meta.Thanos.Source = metadata.CompactorSource w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = w.postings - // Make any changes to the file appear atomic. - path := filepath.Join(w.tmpDir, block.MetaFilename) - - f, err := os.Create(path) - if err != nil { - return errors.Wrapf(err, "create tmp meta file %s", path) - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if merr.Add(enc.Encode(w.meta)); merr.Err() != nil { - merr.Add(f.Close()) - return errors.Wrapf(merr.Err(), "encoding meta file to json %s", path) - } - - merr.Add(errors.Wrapf(fileutil.Fsync(f), "sync meta file %s", path)) - merr.Add(errors.Wrapf(f.Close(), "close meta file %s", path)) - - return merr.Err() -} - -func renameFile(from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - var merr tsdb.MultiError - merr.Add(fileutil.Fsync(pdir)) - merr.Add(pdir.Close()) - return merr.Err() -} - -func createTmpDir(parent string) (string, error) { - tmp := parent + ".tmp" - - if err := os.RemoveAll(tmp); err != nil { - return "", errors.Wrap(err, "removing tmp dir") - } - - if err := os.MkdirAll(tmp, 0777); err != nil { - return "", errors.Wrap(err, "mkdir tmp dir") - } - - return tmp, nil + return metadata.Write(w.logger, w.blockDir, &w.meta) }