Skip to content

Commit

Permalink
compact: avoid memory leak while downsampling
Browse files Browse the repository at this point in the history
Add instant writer implementation to shrink memory consumption during the downsampling stage.
Encoded chunks are written to chunks blob files right away after series was handled.
Flush method closes chunk writer and sync all symbols, series, labels, posting and meta data to files.
It still works in one thread, hence operates only on one core.

Estimated memory consumption is unlikely more than 1Gb, but depends on data set, labels size and series' density: chunk data size (512MB) + encoded buffers + index data

Fixes #297
  • Loading branch information
xjewer committed Sep 20, 2018
1 parent 984f42e commit 9aa8061
Show file tree
Hide file tree
Showing 4 changed files with 469 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .errcheck_excludes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log
fmt.Fprintln
fmt.Fprint
fmt.Fprint
142 changes: 20 additions & 122 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package downsample

import (
"math"
"path/filepath"
"sort"

"github.com/improbable-eng/thanos/pkg/block"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"

"os"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -52,13 +48,16 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

rng := origMeta.MaxTime - origMeta.MinTime

// Write downsampled data in a custom memory block where we have fine-grained control
// over created chunks.
// NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data.
// Flushes index and meta data afterwards aggregations.
// This is necessary since we need to inject special values at the end of chunks for
// some aggregations.
newb := newMemBlock()
writer, err := NewWriter(dir, logger, *origMeta, resolution)
defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer")

if err != nil {
return id, errors.Wrap(err, "get instantWriter")
}

pall, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
Expand All @@ -85,7 +84,7 @@ func Downsample(
for i, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, pall.At())
}
chks[i].Chunk = chk
}
Expand All @@ -94,10 +93,12 @@ func Downsample(
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d", c.Ref)
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At())
}
}
newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)})
if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At())
}
continue
}

Expand All @@ -114,127 +115,24 @@ func Downsample(
resolution,
)
if err != nil {
return id, errors.Wrap(err, "downsample aggregate block")
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At())
}
if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil {
return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At())
}
newb.addSeries(&series{lset: lset, chunks: res})
}
if pall.Err() != nil {
return id, errors.Wrap(pall.Err(), "iterate series set")
}
comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool())
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())

var tmeta block.ThanosMeta
tmeta = origMeta.Thanos
tmeta.Source = block.CompactorSource
tmeta.Downsample.Resolution = resolution

_, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta)
id, err = writer.Flush()
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
return id, errors.Wrap(err, "compact head")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return id, errors.Wrap(err, "remove tombstones")
}
return id, nil
}

// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
// to allow tsdb.LeveledCompactor to persist the data as a block.
type memBlock struct {
// Dummies to implement unused methods.
tsdb.IndexReader

symbols map[string]struct{}
postings []uint64
series []*series
chunks []chunkenc.Chunk
}

func newMemBlock() *memBlock {
return &memBlock{symbols: map[string]struct{}{}}
}

func (b *memBlock) addSeries(s *series) {
sid := uint64(len(b.series))
b.postings = append(b.postings, sid)
b.series = append(b.series, s)

for _, l := range s.lset {
b.symbols[l.Name] = struct{}{}
b.symbols[l.Value] = struct{}{}
}

for i, cm := range s.chunks {
cid := uint64(len(b.chunks))
s.chunks[i].Ref = cid
b.chunks = append(b.chunks, cm.Chunk)
}
}

func (b *memBlock) Postings(name, val string) (index.Postings, error) {
allName, allVal := index.AllPostingsKey()

if name != allName || val != allVal {
return nil, errors.New("unsupported call to Postings()")
}
sort.Slice(b.postings, func(i, j int) bool {
return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0
})
return index.NewListPostings(b.postings), nil
}

func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if id >= uint64(len(b.series)) {
return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id)
}
s := b.series[id]

*lset = append((*lset)[:0], s.lset...)
*chks = append((*chks)[:0], s.chunks...)

return nil
}

func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) {
if id >= uint64(len(b.chunks)) {
return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id)
}
return b.chunks[id], nil
}

func (b *memBlock) Symbols() (map[string]struct{}, error) {
return b.symbols, nil
}

func (b *memBlock) SortedPostings(p index.Postings) index.Postings {
return p
}

func (b *memBlock) Index() (tsdb.IndexReader, error) {
return b, nil
}

func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
return b, nil
}

func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
return tsdb.EmptyTombstoneReader(), nil
}

func (b *memBlock) Close() error {
return nil
}

// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
Expand Down Expand Up @@ -482,7 +380,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
return res, nil
}

// expandChunkIterator reads all samples from the iterater and appends them to buf.
// expandChunkIterator reads all samples from the iterator and appends them to buf.
// Stale markers and out of order samples are skipped.
func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
// For safety reasons, we check for each sample that it does not go back in time.
Expand Down
122 changes: 106 additions & 16 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
"math"
"os"
"path/filepath"
"sort"
"testing"

"github.com/prometheus/prometheus/pkg/value"

"github.com/prometheus/tsdb/chunks"

"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)
Expand Down Expand Up @@ -69,30 +69,30 @@ func TestDownsampleAggr(t *testing.T) {
{
lset: labels.FromStrings("__name__", "a"),
inAggr: map[AggrType][]sample{
AggrCount: []sample{
AggrCount: {
{199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100},
},
AggrSum: []sample{
AggrSum: {
{199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100},
},
AggrMin: []sample{
AggrMin: {
{199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100},
},
AggrMax: []sample{
AggrMax: {
{199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100},
},
AggrCounter: []sample{
AggrCounter: {
{99, 100}, {299, 150}, {499, 210}, {499, 10}, // chunk 1
{599, 20}, {799, 50}, {999, 120}, {999, 50}, // chunk 2, no reset
{1099, 40}, {1199, 80}, {1299, 110}, // chunk 3, reset
},
},
output: map[AggrType][]sample{
AggrCount: []sample{{499, 29}, {999, 100}},
AggrSum: []sample{{499, 29}, {999, 100}},
AggrMin: []sample{{499, -3}, {999, 0}},
AggrMax: []sample{{499, 10}, {999, 100}},
AggrCounter: []sample{{499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
AggrCount: {{499, 29}, {999, 100}},
AggrSum: {{499, 29}, {999, 100}},
AggrMin: {{499, -3}, {999, 0}},
AggrMax: {{499, 10}, {999, 100}},
AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
},
},
}
Expand Down Expand Up @@ -157,7 +157,6 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, r
}
mb.addSeries(ser)
}

id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution)
testutil.Ok(t, err)

Expand Down Expand Up @@ -375,3 +374,94 @@ func (it *sampleIterator) Seek(int64) bool {
func (it *sampleIterator) At() (t int64, v float64) {
return it.l[it.i].t, it.l[it.i].v
}

// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
// to allow tsdb.instantWriter to persist the data as a block.
type memBlock struct {
// Dummies to implement unused methods.
tsdb.IndexReader

symbols map[string]struct{}
postings []uint64
series []*series
chunks []chunkenc.Chunk

numberOfChunks uint64
}

func newMemBlock() *memBlock {
return &memBlock{symbols: map[string]struct{}{}}
}

func (b *memBlock) addSeries(s *series) {
sid := uint64(len(b.series))
b.postings = append(b.postings, sid)
b.series = append(b.series, s)

for _, l := range s.lset {
b.symbols[l.Name] = struct{}{}
b.symbols[l.Value] = struct{}{}
}

for i, cm := range s.chunks {
s.chunks[i].Ref = b.numberOfChunks
b.chunks = append(b.chunks, cm.Chunk)
b.numberOfChunks++
}
}

func (b *memBlock) Postings(name, val string) (index.Postings, error) {
allName, allVal := index.AllPostingsKey()

if name != allName || val != allVal {
return nil, errors.New("unsupported call to Postings()")
}
sort.Slice(b.postings, func(i, j int) bool {
return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0
})
return index.NewListPostings(b.postings), nil
}

func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if id >= uint64(len(b.series)) {
return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id)
}
s := b.series[id]

*lset = append((*lset)[:0], s.lset...)
*chks = append((*chks)[:0], s.chunks...)

return nil
}

func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) {
if id >= uint64(b.numberOfChunks) {
return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id)
}

return b.chunks[id], nil
}

func (b *memBlock) Symbols() (map[string]struct{}, error) {
return b.symbols, nil
}

func (b *memBlock) SortedPostings(p index.Postings) index.Postings {
return p
}

func (b *memBlock) Index() (tsdb.IndexReader, error) {
return b, nil
}

func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
return b, nil
}

func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
return tsdb.EmptyTombstoneReader(), nil
}

func (b *memBlock) Close() error {
return nil
}
Loading

0 comments on commit 9aa8061

Please sign in to comment.