Skip to content

Commit

Permalink
compact: Improved memory usage while downsampling (#529)
Browse files Browse the repository at this point in the history
Add instant writer implementation to shrink memory consumption during the downsampling stage.
Encoded chunks are written to chunks blob files right away after series was handled.
Flush method closes chunk writer and sync all symbols, series, labels, posting and meta data to files.
It still works in one thread, hence operates only on one core.

Estimated memory consumption is unlikely more than 1Gb, but depends on data set, labels size and series' density: chunk data size (512MB) + encoded buffers + index data

Fixes #297

* compact: clarify purpose of streamed block writer

Add comments and close resources properly.

* downsample: fix postings index

Use proper posting index to fetch series data with label set and chunks

* Add stream writer an ability to write index data right during
the downsampling process.

One of the trade-offs is to preserve symbols from raw blocks, as we have to write them
before preserving the series.

Stream writer allows downsample a huge data blocks with no needs to keep
all series in RAM, the only need it preserve label values and postings references.

* fix nitpicks

* downsampling: simplify StreamedBlockWriter interface

Reduce of use public Flush method to finalize index and meta files.
In case of error, a caller has to remove block directory with a preserved
garbage inside.

Rid of use tmp directories and renaming, syncing the final block on disk
before upload.
  • Loading branch information
xjewer authored and bwplotka committed Feb 6, 2019
1 parent e1b0cb6 commit c5acb9c
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 181 deletions.
2 changes: 1 addition & 1 deletion .errcheck_excludes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log
fmt.Fprintln
fmt.Fprint
fmt.Fprint
4 changes: 2 additions & 2 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil {
return err
return errors.Wrap(err, "downsampling to 5 min")
}

case 5 * 60 * 1000:
Expand All @@ -194,7 +194,7 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil {
return err
return errors.Wrap(err, "downsampling to 60 min")
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
MetaFilename = "meta.json"
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
MetaVersion1 = iota + 1
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
// extends it by Thanos-specific fields.
type Meta struct {
Expand Down Expand Up @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
if m.Version != MetaVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
Expand Down
238 changes: 71 additions & 167 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package downsample

import (
"math"
"path/filepath"
"sort"

"github.com/improbable-eng/thanos/pkg/block/metadata"

"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"

"math/rand"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
Expand Down Expand Up @@ -53,40 +51,65 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

rng := origMeta.MaxTime - origMeta.MinTime
// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))

// Write downsampled data in a custom memory block where we have fine-grained control
// over created chunks.
// This is necessary since we need to inject special values at the end of chunks for
// some aggregations.
newb := newMemBlock()
// Create block directory to populate with chunks, meta and index files into.
blockDir := filepath.Join(dir, uid.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return id, errors.Wrap(err, "mkdir block dir")
}

// Remove blockDir in case of errors.
defer func() {
if err != nil {
var merr tsdb.MultiError
merr.Add(err)
merr.Add(os.RemoveAll(blockDir))
err = merr.Err()
}
}()

// Copy original meta to the new one. Update downsampling resolution and ULID for a new block.
newMeta := *origMeta
newMeta.Thanos.Downsample.Resolution = resolution
newMeta.ULID = uid

pall, err := indexr.Postings(index.AllPostingsKey())
// Writes downsampled chunks right into the files, avoiding excess memory allocation.
// Flushes index and meta data after aggregations.
streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta)
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")

postings, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return id, errors.Wrap(err, "get all postings list")
}
var (
aggrChunks []*AggrChunk
all []sample
chks []chunks.Meta
lset labels.Labels
)
for pall.Next() {
var lset labels.Labels
for postings.Next() {
lset = lset[:0]
chks = chks[:0]
all = all[:0]
aggrChunks = aggrChunks[:0]

// Get series labels and chunks. Downsampled data is sensitive to chunk boundaries
// and we need to preserve them to properly downsample previously downsampled data.
if err := indexr.Series(pall.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", pall.At())
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}
// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, postings.At())
}
chks[i].Chunk = chk
}
Expand All @@ -95,155 +118,41 @@ func Downsample(
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d", c.Ref)
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
}
newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)})
continue
}

// Downsample a block that contains aggregate chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
res, err := downsampleAggr(
aggrChunks,
&all,
chks[0].MinTime,
chks[len(chks)-1].MaxTime,
origMeta.Thanos.Downsample.Resolution,
resolution,
)

if err != nil {
return id, errors.Wrap(err, "downsample aggregate block")
if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At())
}
} else {
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
downsampledChunks, err := downsampleAggr(
aggrChunks,
&all,
chks[0].MinTime,
chks[len(chks)-1].MaxTime,
origMeta.Thanos.Downsample.Resolution,
resolution,
)
if err != nil {
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At())
}
if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil {
return id, errors.Wrapf(err, "write series: %d", postings.At())
}
}
newb.addSeries(&series{lset: lset, chunks: res})
}
if pall.Err() != nil {
return id, errors.Wrap(pall.Err(), "iterate series set")
}
comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool())
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())

var tmeta metadata.Thanos
tmeta = origMeta.Thanos
tmeta.Source = metadata.CompactorSource
tmeta.Downsample.Resolution = resolution

_, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
if postings.Err() != nil {
return id, errors.Wrap(postings.Err(), "iterate series set")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return id, errors.Wrap(err, "remove tombstones")
}
return id, nil
id = uid
return
}

// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
// to allow tsdb.LeveledCompactor to persist the data as a block.
type memBlock struct {
// Dummies to implement unused methods.
tsdb.IndexReader

symbols map[string]struct{}
postings []uint64
series []*series
chunks []chunkenc.Chunk
}

func newMemBlock() *memBlock {
return &memBlock{symbols: map[string]struct{}{}}
}

func (b *memBlock) addSeries(s *series) {
sid := uint64(len(b.series))
b.postings = append(b.postings, sid)
b.series = append(b.series, s)

for _, l := range s.lset {
b.symbols[l.Name] = struct{}{}
b.symbols[l.Value] = struct{}{}
}

for i, cm := range s.chunks {
cid := uint64(len(b.chunks))
s.chunks[i].Ref = cid
b.chunks = append(b.chunks, cm.Chunk)
}
}

func (b *memBlock) Postings(name, val string) (index.Postings, error) {
allName, allVal := index.AllPostingsKey()

if name != allName || val != allVal {
return nil, errors.New("unsupported call to Postings()")
}
sort.Slice(b.postings, func(i, j int) bool {
return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0
})
return index.NewListPostings(b.postings), nil
}

func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if id >= uint64(len(b.series)) {
return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id)
}
s := b.series[id]

*lset = append((*lset)[:0], s.lset...)
*chks = append((*chks)[:0], s.chunks...)

return nil
}

func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) {
if id >= uint64(len(b.chunks)) {
return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id)
}
return b.chunks[id], nil
}

func (b *memBlock) Symbols() (map[string]struct{}, error) {
return b.symbols, nil
}

func (b *memBlock) SortedPostings(p index.Postings) index.Postings {
return p
}

func (b *memBlock) Index() (tsdb.IndexReader, error) {
return b, nil
}

func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
return b, nil
}

func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
return emptyTombstoneReader{}, nil
}

func (b *memBlock) Close() error {
return nil
}

type emptyTombstoneReader struct{}

func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
func (emptyTombstoneReader) Total() uint64 { return 0 }
func (emptyTombstoneReader) Close() error { return nil }

// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
Expand Down Expand Up @@ -492,7 +401,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
return res, nil
}

// expandChunkIterator reads all samples from the iterater and appends them to buf.
// expandChunkIterator reads all samples from the iterator and appends them to buf.
// Stale markers and out of order samples are skipped.
func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
// For safety reasons, we check for each sample that it does not go back in time.
Expand Down Expand Up @@ -614,11 +523,6 @@ type sample struct {
v float64
}

type series struct {
lset labels.Labels
chunks []chunks.Meta
}

// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing
// values as counter reset.
// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk
Expand Down
Loading

0 comments on commit c5acb9c

Please sign in to comment.