From 77d2c7cf3d19ca27bd2cb3dfa2feebf5fe300f0f Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Sat, 8 Sep 2018 23:58:36 +0530 Subject: [PATCH 01/15] Added metric for symbol table size Signed-off-by: Ganesh Vernekar --- block.go | 31 ++++++++++++++++++++++++++----- db.go | 15 +++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/block.go b/block.go index 1a45fb97..342a8d02 100644 --- a/block.go +++ b/block.go @@ -15,6 +15,7 @@ package tsdb import ( + "encoding/binary" "encoding/json" "io/ioutil" "os" @@ -248,6 +249,10 @@ type Block struct { dir string meta BlockMeta + // Symbol Table Size in bytes. + // We maintain this variable to avoid recalculation everytime. + symbolTableSize uint64 + chunkr ChunkReader indexr IndexReader tombstones TombstoneReader @@ -275,12 +280,23 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } + // Calculating symbol table size. + tmp := make([]byte, 8) + symTblSize := uint64(0) + for _, v := range ir.SymbolTable() { + // Size of varint length of the symbol. + symTblSize += uint64(binary.PutUvarint(tmp, uint64(len(v)))) + // Size of the symbol. + symTblSize += uint64(len(v)) + } + pb := &Block{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, - tombstones: tr, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, + tombstones: tr, + symbolTableSize: symTblSize, } return pb, nil } @@ -350,6 +366,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +// GetSymbolTableSize returns the Symbol Table Size in the index of this block. +func (pb *Block) GetSymbolTableSize() uint64 { + return pb.symbolTableSize +} + func (pb *Block) setCompactionFailed() error { pb.meta.Compaction.Failed = true return writeMetaFile(pb.dir, &pb.meta) diff --git a/db.go b/db.go index e6a0a74b..2171a0cf 100644 --- a/db.go +++ b/db.go @@ -119,6 +119,7 @@ type DB struct { type dbMetrics struct { loadedBlocks prometheus.GaugeFunc + symbolTableSize prometheus.GaugeFunc reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter @@ -138,6 +139,19 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { defer db.mtx.RUnlock() return float64(len(db.blocks)) }) + m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_symbol_table_size", + Help: "Size of symbol table on disk (in bytes)", + }, func() float64 { + db.mtx.RLock() + blocks := db.blocks[:] + db.mtx.RUnlock() + symTblSize := float64(0) + for _, b := range blocks { + symTblSize += float64(b.GetSymbolTableSize()) + } + return symTblSize + }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", @@ -166,6 +180,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( m.loadedBlocks, + m.symbolTableSize, m.reloads, m.reloadsFailed, m.cutoffs, From 2945db18cafdef58d9c511d4c9f223becb69ac5a Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 12 Sep 2018 14:39:02 +0530 Subject: [PATCH 02/15] Changes in series names (and types) exposed (#376) Signed-off-by: Ganesh Vernekar --- compact.go | 4 ++-- db.go | 8 ++++---- head.go | 14 +++++++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/compact.go b/compact.go index 1da13005..1b8f20fa 100644 --- a/compact.go +++ b/compact.go @@ -97,7 +97,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Buckets: prometheus.ExponentialBuckets(1, 2, 10), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_size", + Name: "prometheus_tsdb_compaction_chunk_size_bytes", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) @@ -107,7 +107,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_range", + Name: "prometheus_tsdb_compaction_chunk_range_seconds", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) diff --git a/db.go b/db.go index 2171a0cf..909050e0 100644 --- a/db.go +++ b/db.go @@ -140,17 +140,17 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(len(db.blocks)) }) m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_symbol_table_size", + Name: "prometheus_tsdb_symbol_table_size_bytes", Help: "Size of symbol table on disk (in bytes)", }, func() float64 { db.mtx.RLock() blocks := db.blocks[:] db.mtx.RUnlock() - symTblSize := float64(0) + symTblSize := uint64(0) for _, b := range blocks { - symTblSize += float64(b.GetSymbolTableSize()) + symTblSize += b.GetSymbolTableSize() } - return symTblSize + return float64(symTblSize) }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", diff --git a/head.go b/head.go index 4f0c2c95..d937b524 100644 --- a/head.go +++ b/head.go @@ -82,8 +82,8 @@ type headMetrics struct { seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter chunks prometheus.Gauge - chunksCreated prometheus.Gauge - chunksRemoved prometheus.Gauge + chunksCreated prometheus.Counter + chunksRemoved prometheus.Counter gcDuration prometheus.Summary minTime prometheus.GaugeFunc maxTime prometheus.GaugeFunc @@ -102,27 +102,27 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", }) - m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", }) - m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_not_found", + Name: "prometheus_tsdb_head_series_not_found_total", Help: "Total number of requests for series that were not found.", }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_chunks", Help: "Total number of chunks in the head block.", }) - m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_chunks_created_total", Help: "Total number of chunks created in the head", }) - m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_chunks_removed_total", Help: "Total number of chunks removed in the head", }) From 9be883670eaf846e8f5bb65ea577dcd7d28d469e Mon Sep 17 00:00:00 2001 From: Alexey Palazhchenko Date: Thu, 13 Sep 2018 19:34:26 +0300 Subject: [PATCH 03/15] fix the test data file path in benchmarks. (#345) Signed-off-by: Alexey Palazhchenko --- head_test.go | 2 +- querier_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/head_test.go b/head_test.go index b06a66c2..5531f965 100644 --- a/head_test.go +++ b/head_test.go @@ -28,7 +28,7 @@ import ( ) func BenchmarkCreateSeries(b *testing.B) { - lbls, err := labels.ReadLabels("testdata/all.series", b.N) + lbls, err := labels.ReadLabels("testdata/20kseries.json", b.N) testutil.Ok(b, err) h, err := NewHead(nil, nil, nil, 10000) diff --git a/querier_test.go b/querier_test.go index fd8c7dec..3515df3e 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1243,11 +1243,11 @@ func BenchmarkMergedSeriesSet(b *testing.B) { 100, 1000, 10000, - 100000, + 20000, } { for _, j := range []int{1, 2, 4, 8, 16, 32} { b.Run(fmt.Sprintf("series=%d,blocks=%d", k, j), func(b *testing.B) { - lbls, err := labels.ReadLabels("testdata/1m.series", k) + lbls, err := labels.ReadLabels("testdata/20kseries.json", k) testutil.Ok(b, err) sort.Sort(labels.Slice(lbls)) From cb7f320d42244821bf83015b4c6e2cc3e37316af Mon Sep 17 00:00:00 2001 From: Bob Shannon Date: Fri, 14 Sep 2018 08:07:45 -0400 Subject: [PATCH 04/15] Expose prometheus_tsdb_lowest_timestamp metric (#363) * Expose prometheus_tsdb_start_time_seconds metric Signed-off-by: Bob Shannon * Search for block with smallest minTime Signed-off-by: Bob Shannon * PR comments Signed-off-by: Bob Shannon * PR comment: Make metric name more accurate Signed-off-by: Bob Shannon --- db.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/db.go b/db.go index 909050e0..228dbb9c 100644 --- a/db.go +++ b/db.go @@ -125,6 +125,7 @@ type dbMetrics struct { compactionsTriggered prometheus.Counter cutoffs prometheus.Counter cutoffsFailed prometheus.Counter + startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram } @@ -172,6 +173,17 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_retention_cutoffs_failures_total", Help: "Number of times the database failed to cut off block data from disk.", }) + m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_lowest_timestamp", + Help: "Lowest timestamp value stored in the database.", + }, func() float64 { + db.mtx.RLock() + defer db.mtx.RUnlock() + if len(db.blocks) == 0 { + return float64(db.head.minTime) + } + return float64(db.blocks[0].meta.MinTime) + }) m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", @@ -186,6 +198,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.cutoffs, m.cutoffsFailed, m.compactionsTriggered, + m.startTime, m.tombCleanTimer, ) } From ad459ca1f483ada82ae9b19ca871822874bf8b79 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 17 Sep 2018 14:28:55 +0300 Subject: [PATCH 05/15] re-added TestDeleteUntilCurMax TestDelete_e2e (#382) no logic changes just uncommented and fixed the issues. Signed-off-by: Krasi Georgiev --- head_test.go | 423 ++++++++++++++++++++++++--------------------------- 1 file changed, 198 insertions(+), 225 deletions(-) diff --git a/head_test.go b/head_test.go index 5531f965..5383b2ba 100644 --- a/head_test.go +++ b/head_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sort" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -386,231 +387,203 @@ Outer: } } -// func TestDeleteUntilCurMax(t *testing.T) { -// numSamples := int64(10) - -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) - -// hb := createTestHead(t, dir, 0, 2*numSamples) -// app := hb.Appender() - -// smpls := make([]float64, numSamples) -// for i := int64(0); i < numSamples; i++ { -// smpls[i] = rand.Float64() -// app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) -// } - -// testutil.Ok(t, app.Commit()) -// testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) -// app = hb.Appender() -// _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) -// testutil.Ok(t, err) -// testutil.Ok(t, app.Commit()) - -// q := hb.Querier(0, 100000) -// res := q.Select(labels.NewEqualMatcher("a", "b")) - -// require.True(t, res.Next()) -// exps := res.At() -// it := exps.Iterator() -// ressmpls, err := expandSeriesIterator(it) -// testutil.Ok(t, err) -// testutil.Equals(t, []sample{{11, 1}}, ressmpls) -// } - -// func TestDelete_e2e(t *testing.T) { -// numDatapoints := 1000 -// numRanges := 1000 -// timeInterval := int64(2) -// maxTime := int64(2 * 1000) -// minTime := int64(200) -// // Create 8 series with 1000 data-points of different ranges, delete and run queries. -// lbls := [][]labels.Label{ -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// } - -// seriesMap := map[string][]sample{} -// for _, l := range lbls { -// seriesMap[labels.New(l...).String()] = []sample{} -// } - -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) - -// hb := createTestHead(t, dir, minTime, maxTime) -// app := hb.Appender() - -// for _, l := range lbls { -// ls := labels.New(l...) -// series := []sample{} - -// ts := rand.Int63n(300) -// for i := 0; i < numDatapoints; i++ { -// v := rand.Float64() -// if ts >= minTime && ts <= maxTime { -// series = append(series, sample{ts, v}) -// } - -// _, err := app.Add(ls, ts, v) -// if ts >= minTime && ts <= maxTime { -// testutil.Ok(t, err) -// } else { -// testutil.EqualsError(t, err, ErrOutOfBounds.Error()) -// } - -// ts += rand.Int63n(timeInterval) + 1 -// } - -// seriesMap[labels.New(l...).String()] = series -// } - -// testutil.Ok(t, app.Commit()) - -// // Delete a time-range from each-selector. -// dels := []struct { -// ms []labels.Matcher -// drange Intervals -// }{ -// { -// ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, -// drange: Intervals{{300, 500}, {600, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "b"), -// labels.NewEqualMatcher("job", "prom-k8s"), -// }, -// drange: Intervals{{300, 500}, {100, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "c"), -// labels.NewEqualMatcher("instance", "localhost:9090"), -// labels.NewEqualMatcher("job", "prometheus"), -// }, -// drange: Intervals{{300, 400}, {100, 6700}}, -// }, -// // TODO: Add Regexp Matchers. -// } - -// for _, del := range dels { -// // Reset the deletes everytime. -// writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) -// hb.tombstones = newEmptyTombstoneReader() - -// for _, r := range del.drange { -// testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) -// } - -// matched := labels.Slice{} -// for _, ls := range lbls { -// s := labels.Selector(del.ms) -// if s.Matches(ls) { -// matched = append(matched, ls) -// } -// } - -// sort.Sort(matched) - -// for i := 0; i < numRanges; i++ { -// mint := rand.Int63n(200) -// maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - -// q := hb.Querier(mint, maxt) -// ss := q.Select(del.ms...) - -// // Build the mockSeriesSet. -// matchedSeries := make([]Series, 0, len(matched)) -// for _, m := range matched { -// smpls := boundedSamples(seriesMap[m.String()], mint, maxt) -// smpls = deletedSamples(smpls, del.drange) - -// // Only append those series for which samples exist as mockSeriesSet -// // doesn't skip series with no samples. -// // TODO: But sometimes SeriesSet returns an empty SeriesIterator -// if len(smpls) > 0 { -// matchedSeries = append(matchedSeries, newSeries( -// m.Map(), -// smpls, -// )) -// } -// } -// expSs := newListSeriesSet(matchedSeries) - -// // Compare both SeriesSets. -// for { -// eok, rok := expSs.Next(), ss.Next() - -// // Skip a series if iterator is empty. -// if rok { -// for !ss.At().Iterator().Next() { -// rok = ss.Next() -// if !rok { -// break -// } -// } -// } -// testutil.Equals(t, eok, rok, "next") - -// if !eok { -// break -// } -// sexp := expSs.At() -// sres := ss.At() - -// testutil.Equals(t, sexp.Labels(), sres.Labels(), "labels") - -// smplExp, errExp := expandSeriesIterator(sexp.Iterator()) -// smplRes, errRes := expandSeriesIterator(sres.Iterator()) - -// testutil.Equals(t, errExp, errRes, "samples error") -// testutil.Equals(t, smplExp, smplRes, "samples") -// } -// } -// } - -// return -// } +func TestDeleteUntilCurMax(t *testing.T) { + numSamples := int64(10) + hb, err := NewHead(nil, nil, nil, 1000000) + testutil.Ok(t, err) + app := hb.Appender() + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + _, err := app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) + + // Test the series have been deleted. + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, !res.Next(), "series didn't get deleted") + + // Add again and test for presence. + app = hb.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 11, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + q, err = NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err = q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, res.Next(), "series don't exist") + exps := res.At() + it := exps.Iterator() + ressmpls, err := expandSeriesIterator(it) + testutil.Ok(t, err) + testutil.Equals(t, []sample{{11, 1}}, ressmpls) +} +func TestDelete_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(2) + // Create 8 series with 1000 data-points of different ranges, delete and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + hb, err := NewHead(nil, nil, nil, 100000) + testutil.Ok(t, err) + app := hb.Appender() + for _, l := range lbls { + ls := labels.New(l...) + series := []sample{} + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + _, err := app.Add(ls, ts, v) + testutil.Ok(t, err) + series = append(series, sample{ts, v}) + ts += rand.Int63n(timeInterval) + 1 + } + seriesMap[labels.New(l...).String()] = series + } + testutil.Ok(t, app.Commit()) + // Delete a time-range from each-selector. + dels := []struct { + ms []labels.Matcher + drange Intervals + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + drange: Intervals{{300, 500}, {600, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + drange: Intervals{{300, 500}, {100, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + drange: Intervals{{300, 400}, {100, 6700}}, + }, + // TODO: Add Regexp Matchers. + } + for _, del := range dels { + // Reset the deletes everytime. + hb.tombstones = NewMemTombstones() + for _, r := range del.drange { + testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) + } + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(del.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + sort.Sort(matched) + for i := 0; i < numRanges; i++ { + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + defer q.Close() + ss, err := q.Select(del.ms...) + testutil.Ok(t, err) + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := seriesMap[m.String()] + smpls = deletedSamples(smpls, del.drange) + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + // Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + testutil.Equals(t, eok, rok) + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + testutil.Equals(t, sexp.Labels(), sres.Labels()) + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } + } + } + return +} func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { From 98fe30438ce2f33372fda366fc8205f4b86bfc5c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 17 Sep 2018 14:53:48 +0200 Subject: [PATCH 06/15] Remove `prometheus_` prefix from metrics This can now be added by users of the library as needed with the new https://godoc.org/github.com/prometheus/client_golang/prometheus#WrapRegistererWithPrefix Signed-off-by: beorn7 --- compact.go | 12 ++++++------ db.go | 18 +++++++++--------- head.go | 26 +++++++++++++------------- wal.go | 4 ++-- wal/wal.go | 6 +++--- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/compact.go b/compact.go index 1b8f20fa..cffb0aca 100644 --- a/compact.go +++ b/compact.go @@ -84,30 +84,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} m.ran = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_total", + Name: "tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_failed_total", + Name: "tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_duration_seconds", + Name: "tsdb_compaction_duration_seconds", Help: "Duration of compaction runs", Buckets: prometheus.ExponentialBuckets(1, 2, 10), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_size_bytes", + Name: "tsdb_compaction_chunk_size_bytes", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_samples", + Name: "tsdb_compaction_chunk_samples", Help: "Final number of samples on their first compaction", Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_range_seconds", + Name: "tsdb_compaction_chunk_range_seconds", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) diff --git a/db.go b/db.go index 228dbb9c..2fcdbef0 100644 --- a/db.go +++ b/db.go @@ -133,7 +133,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m := &dbMetrics{} m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_blocks_loaded", + Name: "tsdb_blocks_loaded", Help: "Number of currently loaded data blocks", }, func() float64 { db.mtx.RLock() @@ -141,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(len(db.blocks)) }) m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_symbol_table_size_bytes", + Name: "tsdb_symbol_table_size_bytes", Help: "Size of symbol table on disk (in bytes)", }, func() float64 { db.mtx.RLock() @@ -154,27 +154,27 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(symTblSize) }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_reloads_total", + Name: "tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", }) m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_reloads_failures_total", + Name: "tsdb_reloads_failures_total", Help: "Number of times the database failed to reload block data from disk.", }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_triggered_total", + Name: "tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", + Name: "tsdb_retention_cutoffs_total", Help: "Number of times the database cut off block data from disk.", }) m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", + Name: "tsdb_retention_cutoffs_failures_total", Help: "Number of times the database failed to cut off block data from disk.", }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_lowest_timestamp", + Name: "tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database.", }, func() float64 { db.mtx.RLock() @@ -185,7 +185,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(db.blocks[0].meta.MinTime) }) m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_tombstone_cleanup_seconds", + Name: "tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) diff --git a/head.go b/head.go index d937b524..9753a150 100644 --- a/head.go +++ b/head.go @@ -95,59 +95,59 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m := &headMetrics{} m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_active_appenders", + Name: "tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) m.series = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_series", + Name: "tsdb_head_series", Help: "Total number of series in the head block.", }) m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_created_total", + Name: "tsdb_head_series_created_total", Help: "Total number of series created in the head", }) m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_removed_total", + Name: "tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_not_found_total", + Name: "tsdb_head_series_not_found_total", Help: "Total number of requests for series that were not found.", }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_chunks", + Name: "tsdb_head_chunks", Help: "Total number of chunks in the head block.", }) m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_chunks_created_total", + Name: "tsdb_head_chunks_created_total", Help: "Total number of chunks created in the head", }) m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_chunks_removed_total", + Name: "tsdb_head_chunks_removed_total", Help: "Total number of chunks removed in the head", }) m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_head_gc_duration_seconds", + Name: "tsdb_head_gc_duration_seconds", Help: "Runtime of garbage collection in the head block.", }) m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_max_time", + Name: "tsdb_head_max_time", Help: "Maximum timestamp of the head block.", }, func() float64 { return float64(h.MaxTime()) }) m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_min_time", + Name: "tsdb_head_min_time", Help: "Minimum time bound of the head block.", }, func() float64 { return float64(h.MinTime()) }) m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_truncate_duration_seconds", + Name: "tsdb_wal_truncate_duration_seconds", Help: "Duration of WAL truncation.", }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_samples_appended_total", + Name: "tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }) diff --git a/wal.go b/wal.go index 972fdea3..bc48e48e 100644 --- a/wal.go +++ b/wal.go @@ -64,11 +64,11 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { m := &walMetrics{} m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Name: "tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", + Name: "tsdb_wal_corruptions_total", Help: "Total number of WAL corruptions.", }) diff --git a/wal/wal.go b/wal/wal.go index 0e95ba2a..af1304f4 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -190,15 +190,15 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi stopc: make(chan chan struct{}), } w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Name: "tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_page_flushes_total", + Name: "tsdb_wal_page_flushes_total", Help: "Total number of page flushes.", }) w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_completed_pages_total", + Name: "tsdb_wal_completed_pages_total", Help: "Total number of completed pages.", }) if reg != nil { From 722f0ab920590f700435471a8f03ce177831ef49 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Mon, 17 Sep 2018 09:30:56 -0700 Subject: [PATCH 07/15] break MigrateWAL into two functions, detection and migration (#371) Signed-off-by: Callum Styan --- db.go | 2 +- wal.go | 26 ++++++++++++++++---------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/db.go b/db.go index 2fcdbef0..2655bd19 100644 --- a/db.go +++ b/db.go @@ -220,7 +220,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } - // Migrate old WAL. + // Migrate old WAL if one exists. if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { return nil, errors.Wrap(err, "migrate WAL") } diff --git a/wal.go b/wal.go index bc48e48e..f44dbf0a 100644 --- a/wal.go +++ b/wal.go @@ -1212,38 +1212,44 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { return nil } -// MigrateWAL rewrites the deprecated write ahead log into the new format. -func MigrateWAL(logger log.Logger, dir string) (err error) { - if logger == nil { - logger = log.NewNopLogger() - } +func deprecatedWALExists(logger log.Logger, dir string) (bool, error) { // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "list sequence files") + return false, errors.Wrap(err, "list sequence files") } if len(fns) == 0 { - return nil // No WAL at all yet. + return false, nil // No WAL at all yet. } // Check header of first segment to see whether we are still dealing with an // old WAL. f, err := os.Open(fns[0]) if err != nil { - return errors.Wrap(err, "check first existing segment") + return false, errors.Wrap(err, "check first existing segment") } defer f.Close() var hdr [4]byte if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { - return errors.Wrap(err, "read header from first segment") + return false, errors.Wrap(err, "read header from first segment") } // If we cannot read the magic header for segments of the old WAL, abort. // Either it's migrated already or there's a corruption issue with which // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. if binary.BigEndian.Uint32(hdr[:]) != WALMagic { - return nil + return false, nil } + return true, nil +} +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists { + return err + } level.Info(logger).Log("msg", "migrating WAL format") tmpdir := dir + ".tmp" From a8966cb53d39747c5d7b7ba787873b52899881d0 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 17 Sep 2018 10:58:42 -0600 Subject: [PATCH 08/15] Fix race condition between gc and committing (#378) Signed-off-by: Chris Marchbanks --- head.go | 32 ++++++++++++++++++----------- head_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/head.go b/head.go index 9753a150..0a5f3c15 100644 --- a/head.go +++ b/head.go @@ -620,21 +620,22 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro } func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - s := a.head.series.getByID(ref) + if t < a.minValidTime { + return ErrOutOfBounds + } + s := a.head.series.getByID(ref) if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - err := s.appendable(t, v) - s.Unlock() - - if err != nil { + if err := s.appendable(t, v); err != nil { + s.Unlock() return err } - if t < a.minValidTime { - return ErrOutOfBounds - } + s.pendingCommit = true + s.Unlock() + if t < a.mint { a.mint = t } @@ -694,6 +695,7 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.pendingCommit = false s.series.Unlock() if !ok { @@ -713,6 +715,11 @@ func (a *headAppender) Commit() error { func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() + for _, s := range a.samples { + s.series.Lock() + s.series.pendingCommit = false + s.series.Unlock() + } a.head.putAppendBuffer(a.samples) // Series are created in the head memory regardless of rollback. Thus we have @@ -1165,7 +1172,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { series.Lock() rmChunks += series.truncateChunksBefore(mint) - if len(series.chunks) > 0 { + if len(series.chunks) > 0 || series.pendingCommit { series.Unlock() continue } @@ -1256,9 +1263,10 @@ type memSeries struct { chunkRange int64 firstChunkID int - nextAt int64 // timestamp at which to cut the next chunk. - lastValue float64 - sampleBuf [4]sample + nextAt int64 // Timestamp at which to cut the next chunk. + lastValue float64 + sampleBuf [4]sample + pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. } diff --git a/head_test.go b/head_test.go index 5383b2ba..d9de7bea 100644 --- a/head_test.go +++ b/head_test.go @@ -781,6 +781,64 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, ErrNotFound, err) } +func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Commit()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, true, ss.Next()) +} + +func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Rollback()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, false, ss.Next()) + + // Truncate again, this time the series should be deleted + testutil.Ok(t, h.Truncate(2050)) + testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset)) +} + func TestHead_LogRollback(t *testing.T) { dir, err := ioutil.TempDir("", "wal_rollback") testutil.Ok(t, err) From 3bc6c670fa0dc41e540f3cb238e37e8fd0dcddab Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 18 Sep 2018 19:17:41 +0200 Subject: [PATCH 09/15] Revert "Remove `prometheus_` prefix from metrics" This reverts commit 98fe30438ce2f33372fda366fc8205f4b86bfc5c. After some discussion, it was concluded that we want the full `prometheus_tsdb_...` prefix hardcoded in the library. Signed-off-by: beorn7 --- compact.go | 12 ++++++------ db.go | 18 +++++++++--------- head.go | 26 +++++++++++++------------- wal.go | 4 ++-- wal/wal.go | 6 +++--- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/compact.go b/compact.go index cffb0aca..1b8f20fa 100644 --- a/compact.go +++ b/compact.go @@ -84,30 +84,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} m.ran = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_total", + Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_failed_total", + Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_duration_seconds", + Name: "prometheus_tsdb_compaction_duration_seconds", Help: "Duration of compaction runs", Buckets: prometheus.ExponentialBuckets(1, 2, 10), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_size_bytes", + Name: "prometheus_tsdb_compaction_chunk_size_bytes", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_samples", + Name: "prometheus_tsdb_compaction_chunk_samples", Help: "Final number of samples on their first compaction", Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_compaction_chunk_range_seconds", + Name: "prometheus_tsdb_compaction_chunk_range_seconds", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) diff --git a/db.go b/db.go index 2655bd19..059ef06e 100644 --- a/db.go +++ b/db.go @@ -133,7 +133,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m := &dbMetrics{} m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_blocks_loaded", + Name: "prometheus_tsdb_blocks_loaded", Help: "Number of currently loaded data blocks", }, func() float64 { db.mtx.RLock() @@ -141,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(len(db.blocks)) }) m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_symbol_table_size_bytes", + Name: "prometheus_tsdb_symbol_table_size_bytes", Help: "Size of symbol table on disk (in bytes)", }, func() float64 { db.mtx.RLock() @@ -154,27 +154,27 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(symTblSize) }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_reloads_total", + Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", }) m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_reloads_failures_total", + Name: "prometheus_tsdb_reloads_failures_total", Help: "Number of times the database failed to reload block data from disk.", }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_compactions_triggered_total", + Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_retention_cutoffs_total", + Name: "prometheus_tsdb_retention_cutoffs_total", Help: "Number of times the database cut off block data from disk.", }) m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_retention_cutoffs_failures_total", + Name: "prometheus_tsdb_retention_cutoffs_failures_total", Help: "Number of times the database failed to cut off block data from disk.", }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_lowest_timestamp", + Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database.", }, func() float64 { db.mtx.RLock() @@ -185,7 +185,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return float64(db.blocks[0].meta.MinTime) }) m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_tombstone_cleanup_seconds", + Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) diff --git a/head.go b/head.go index 0a5f3c15..b342c8f5 100644 --- a/head.go +++ b/head.go @@ -95,59 +95,59 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m := &headMetrics{} m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_active_appenders", + Name: "prometheus_tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) m.series = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series", + Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", }) m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_series_created_total", + Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", }) m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_series_removed_total", + Name: "prometheus_tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_series_not_found_total", + Name: "prometheus_tsdb_head_series_not_found_total", Help: "Total number of requests for series that were not found.", }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks", + Name: "prometheus_tsdb_head_chunks", Help: "Total number of chunks in the head block.", }) m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_chunks_created_total", + Name: "prometheus_tsdb_head_chunks_created_total", Help: "Total number of chunks created in the head", }) m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_chunks_removed_total", + Name: "prometheus_tsdb_head_chunks_removed_total", Help: "Total number of chunks removed in the head", }) m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_head_gc_duration_seconds", + Name: "prometheus_tsdb_head_gc_duration_seconds", Help: "Runtime of garbage collection in the head block.", }) m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_max_time", + Name: "prometheus_tsdb_head_max_time", Help: "Maximum timestamp of the head block.", }, func() float64 { return float64(h.MaxTime()) }) m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_min_time", + Name: "prometheus_tsdb_head_min_time", Help: "Minimum time bound of the head block.", }, func() float64 { return float64(h.MinTime()) }) m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_truncate_duration_seconds", + Name: "prometheus_tsdb_wal_truncate_duration_seconds", Help: "Duration of WAL truncation.", }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_head_samples_appended_total", + Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }) diff --git a/wal.go b/wal.go index f44dbf0a..ff978766 100644 --- a/wal.go +++ b/wal.go @@ -64,11 +64,11 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { m := &walMetrics{} m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_fsync_duration_seconds", + Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_wal_corruptions_total", + Name: "prometheus_tsdb_wal_corruptions_total", Help: "Total number of WAL corruptions.", }) diff --git a/wal/wal.go b/wal/wal.go index af1304f4..0e95ba2a 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -190,15 +190,15 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi stopc: make(chan chan struct{}), } w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_fsync_duration_seconds", + Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_wal_page_flushes_total", + Name: "prometheus_tsdb_wal_page_flushes_total", Help: "Total number of page flushes.", }) w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_wal_completed_pages_total", + Name: "prometheus_tsdb_wal_completed_pages_total", Help: "Total number of completed pages.", }) if reg != nil { From c7d0d10da43c5486925870697ee5c5abf0d070d1 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 19 Sep 2018 11:40:07 +0530 Subject: [PATCH 10/15] Make sure WAL Repair can handle wrapped errors Signed-off-by: Goutham Veeramachaneni --- wal/wal.go | 6 ++++-- wal/wal_test.go | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 0e95ba2a..1aea24d8 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -255,15 +255,17 @@ Loop: // Repair attempts to repair the WAL based on the error. // It discards all data after the corruption. -func (w *WAL) Repair(err error) error { +func (w *WAL) Repair(origErr error) error { // We could probably have a mode that only discards torn records right around // the corruption to preserve as data much as possible. // But that's not generally applicable if the records have any kind of causality. // Maybe as an extra mode in the future if mid-WAL corruptions become // a frequent concern. + err := errors.Cause(origErr) // So that we can pick up errors even if wrapped. + cerr, ok := err.(*CorruptionErr) if !ok { - return errors.New("cannot handle error") + return errors.Wrap(origErr, "cannot handle error") } if cerr.Segment < 0 { return errors.New("corruption error does not specify position") diff --git a/wal/wal_test.go b/wal/wal_test.go index d1b724c7..26ee8663 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -23,6 +23,7 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/testutil" ) @@ -286,8 +287,8 @@ func TestWAL_Repair(t *testing.T) { for r.Next() { } testutil.NotOk(t, r.Err()) - testutil.Ok(t, w.Repair(r.Err())) + testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) // See https://github.com/prometheus/prometheus/issues/4603 sr, err = NewSegmentsReader(dir) testutil.Ok(t, err) From 5ae6c60d3934266a2ad1f5e7c8439bce2466400b Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 20 Sep 2018 10:33:52 +0200 Subject: [PATCH 11/15] Handle a bunch of unchecked errors (#365) As discovered by "gosec". Signed-off-by: Julius Volz --- block.go | 7 +++++-- cmd/tsdb/main.go | 23 +++++++++++++++++------ compact.go | 4 +++- head.go | 7 +++++-- index/index.go | 4 +++- tombstones.go | 9 ++++++--- 6 files changed, 39 insertions(+), 15 deletions(-) diff --git a/block.go b/block.go index 342a8d02..7c4ccf0d 100644 --- a/block.go +++ b/block.go @@ -504,10 +504,13 @@ Outer: func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { numStones := 0 - pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { numStones += len(ivs) return nil - }) + }); err != nil { + // This should never happen, as the iteration function only returns nil. + panic(err) + } if numStones == 0 { return nil, nil } diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 0a033e5a..cd24ad1f 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -145,7 +145,9 @@ func (b *writeBenchmark) run() { if err := b.storage.Close(); err != nil { exitWithError(err) } - b.stopProfiling() + if err := b.stopProfiling(); err != nil { + exitWithError(err) + } }) } @@ -248,7 +250,9 @@ func (b *writeBenchmark) startProfiling() { if err != nil { exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) } - pprof.StartCPUProfile(b.cpuprof) + if err := pprof.StartCPUProfile(b.cpuprof); err != nil { + exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err)) + } // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) @@ -271,29 +275,36 @@ func (b *writeBenchmark) startProfiling() { runtime.SetMutexProfileFraction(20) } -func (b *writeBenchmark) stopProfiling() { +func (b *writeBenchmark) stopProfiling() error { if b.cpuprof != nil { pprof.StopCPUProfile() b.cpuprof.Close() b.cpuprof = nil } if b.memprof != nil { - pprof.Lookup("heap").WriteTo(b.memprof, 0) + if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil { + return fmt.Errorf("error writing mem profile: %v", err) + } b.memprof.Close() b.memprof = nil } if b.blockprof != nil { - pprof.Lookup("block").WriteTo(b.blockprof, 0) + if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil { + return fmt.Errorf("error writing block profile: %v", err) + } b.blockprof.Close() b.blockprof = nil runtime.SetBlockProfileRate(0) } if b.mtxprof != nil { - pprof.Lookup("mutex").WriteTo(b.mtxprof, 0) + if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil { + return fmt.Errorf("error writing mutex profile: %v", err) + } b.mtxprof.Close() b.mtxprof = nil runtime.SetMutexProfileFraction(0) } + return nil } func measureTime(stage string, f func()) time.Duration { diff --git a/compact.go b/compact.go index 1b8f20fa..6df33a4c 100644 --- a/compact.go +++ b/compact.go @@ -626,7 +626,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for _, chk := range chks { - c.chunkPool.Put(chk.Chunk) + if err := c.chunkPool.Put(chk.Chunk); err != nil { + return errors.Wrap(err, "put chunk") + } } for _, l := range lset { diff --git a/head.go b/head.go index b342c8f5..bc8cdfbe 100644 --- a/head.go +++ b/head.go @@ -793,7 +793,7 @@ func (h *Head) gc() { symbols := make(map[string]struct{}) values := make(map[string]stringset, len(h.values)) - h.postings.Iter(func(t labels.Label, _ index.Postings) error { + if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error { symbols[t.Name] = struct{}{} symbols[t.Value] = struct{}{} @@ -804,7 +804,10 @@ func (h *Head) gc() { } ss.set(t.Value) return nil - }) + }); err != nil { + // This should never happen, as the iteration function only returns nil. + panic(err) + } h.symMtx.Lock() diff --git a/index/index.go b/index/index.go index c58ff6ea..c75796d7 100644 --- a/index/index.go +++ b/index/index.go @@ -271,7 +271,9 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. - w.addPadding(16) + if err := w.addPadding(16); err != nil { + return errors.Errorf("failed to write padding bytes: %v", err) + } if w.pos%16 != 0 { return errors.Errorf("series write not 16-byte aligned at %d", w.pos) diff --git a/tombstones.go b/tombstones.go index d4a3d0ef..ad820a05 100644 --- a/tombstones.go +++ b/tombstones.go @@ -16,12 +16,13 @@ package tsdb import ( "encoding/binary" "fmt" - "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" "sync" + + "github.com/pkg/errors" ) const tombstoneFilename = "tombstones" @@ -72,7 +73,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { mw := io.MultiWriter(f, hash) - tr.Iter(func(ref uint64, ivs Intervals) error { + if err := tr.Iter(func(ref uint64, ivs Intervals) error { for _, iv := range ivs { buf.reset() @@ -86,7 +87,9 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } } return nil - }) + }); err != nil { + return fmt.Errorf("error writing tombstones: %v", err) + } _, err = f.Write(hash.Sum(nil)) if err != nil { From 9c8ca47399a7f53f57b440464c103f5067e9b7b6 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 21 Sep 2018 11:01:22 +0530 Subject: [PATCH 12/15] Fix filehandling for windows (#392) * Fix filehandling for windows Signed-off-by: Goutham Veeramachaneni * Fix more windows filehandling issues Windows: Close files before deleting Checkpoints. Signed-off-by: Goutham Veeramachaneni Windows: Close writers in case of errors so they can be deleted Signed-off-by: Goutham Veeramachaneni Windows: Close block so that it can be deleted. Signed-off-by: Goutham Veeramachaneni Windows: Close file to delete it Signed-off-by: Goutham Veeramachaneni Windows: Close dir so that it can be deleted. Signed-off-by: Goutham Veeramachaneni Windows: close files so that they can be deleted. Signed-off-by: Goutham Veeramachaneni * Review feedback Signed-off-by: Goutham Veeramachaneni --- checkpoint.go | 9 +++++++++ compact.go | 6 ++++++ db_test.go | 1 + fileutil/fileutil.go | 2 +- repair_test.go | 2 ++ wal.go | 20 ++++++++++++++++++-- wal/wal.go | 18 +++++++++++++++++- wal/wal_test.go | 8 +++++++- 8 files changed, 61 insertions(+), 5 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index d988d356..f45f3791 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo stats := &CheckpointStats{} var sr io.Reader + // We close everything explicitly because Windows needs files to be + // closed before being deleted. But we also have defer so that we close + // files if there is an error somewhere. + var closers []io.Closer { lastFn, k, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { @@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "open last checkpoint") } defer last.Close() + closers = append(closers, last) sr = last } @@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "create segment reader") } defer segsr.Close() + closers = append(closers, segsr) if sr != nil { sr = io.MultiReader(sr, segsr) @@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } + if err := closeAll(closers...); err != nil { + return stats, errors.Wrap(err, "close opened files") + } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. // Leftover segments will just be ignored in the future if there's a checkpoint diff --git a/compact.go b/compact.go index 6df33a4c..3f5fa367 100644 --- a/compact.go +++ b/compact.go @@ -452,6 +452,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } + defer chunkw.Close() // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -466,6 +467,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } + defer indexw.Close() if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -475,6 +477,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write merged meta") } + // We are explicitly closing them here to check for error even + // though these are covered under defer. This is because in Windows, + // you cannot delete these unless they are closed and the defer is to + // make sure they are closed if the function exits due to an error above. if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") } diff --git a/db_test.go b/db_test.go index 9c175118..988fdb7b 100644 --- a/db_test.go +++ b/db_test.go @@ -859,6 +859,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 } block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) // Now check that all expected blocks are actually persisted on disk. diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index 2158bfd2..1154e730 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -48,7 +48,7 @@ func Rename(from, to string) error { // It is not atomic. func Replace(from, to string) error { if err := os.RemoveAll(to); err != nil { - return nil + return err } if err := os.Rename(from, to); err != nil { return err diff --git a/repair_test.go b/repair_test.go index c8097600..cbe138c0 100644 --- a/repair_test.go +++ b/repair_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -74,6 +75,7 @@ func TestRepairBadIndexVersion(t *testing.T) { if p.Err() != nil { t.Fatal(err) } + testutil.Ok(t, r.Close()) // On DB opening all blocks in the base dir should be repaired. db, err := Open("testdata/repair_index_version", nil, nil, nil) diff --git a/wal.go b/wal.go index ff978766..59206d8d 100644 --- a/wal.go +++ b/wal.go @@ -723,6 +723,13 @@ func (w *SegmentWAL) run(interval time.Duration) { // Close syncs all data and closes the underlying resources. func (w *SegmentWAL) Close() error { + // Make sure you can call Close() multiple times. + select { + case <-w.stopc: + return nil // Already closed. + default: + } + close(w.stopc) <-w.donec @@ -735,10 +742,12 @@ func (w *SegmentWAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if hf := w.head(); hf != nil { - return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "closing WAL head %s", hf.Name()) + } } - return w.dirFile.Close() + return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name()) } const ( @@ -1260,6 +1269,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "open new WAL") } + // It should've already been closed as part of the previous finalization. // Do it once again in case of prior errors. defer func() { @@ -1306,6 +1316,12 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "write new entries") } + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } diff --git a/wal/wal.go b/wal/wal.go index 1aea24d8..aa52738f 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -285,6 +285,15 @@ func (w *WAL) Repair(origErr error) error { if s.n <= cerr.Segment { continue } + if w.segment.i == s.n { + // The active segment needs to be removed, + // close it first (Windows!). Can be closed safely + // as we set the current segment to repaired file + // below. + if err := w.segment.Close(); err != nil { + return errors.Wrap(err, "close active segment") + } + } if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { return errors.Wrap(err, "delete segment") } @@ -312,6 +321,7 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "open segment") } defer f.Close() + r := NewReader(bufio.NewReader(f)) for r.Next() { @@ -319,8 +329,14 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "insert record") } } - // We expect an error here, so nothing to handle. + // We expect an error here from r.Err(), so nothing to handle. + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := f.Close(); err != nil { + return errors.Wrap(err, "close corrupted file") + } if err := os.Remove(tmpfn); err != nil { return errors.Wrap(err, "delete corrupted segment") } diff --git a/wal/wal_test.go b/wal/wal_test.go index 26ee8663..72f46253 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -287,8 +287,14 @@ func TestWAL_Repair(t *testing.T) { for r.Next() { } testutil.NotOk(t, r.Err()) + testutil.Ok(t, sr.Close()) testutil.Ok(t, w.Repair(r.Err())) - testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) // See https://github.com/prometheus/prometheus/issues/4603 + + // See https://github.com/prometheus/prometheus/issues/4603 + // We need to close w.segment because it needs to be deleted. + // But this is to mainly artificially test Repair() again. + testutil.Ok(t, w.segment.Close()) + testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) sr, err = NewSegmentsReader(dir) testutil.Ok(t, err) From e779603633366966b78e2dd3d93f7069d1f9a008 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 21 Sep 2018 11:47:59 +0530 Subject: [PATCH 13/15] Reuse testutil (#393) Signed-off-by: Ganesh Vernekar --- head_test.go | 4 +--- repair_test.go | 56 ++++++++++++++------------------------------------ 2 files changed, 16 insertions(+), 44 deletions(-) diff --git a/head_test.go b/head_test.go index d9de7bea..4c796886 100644 --- a/head_test.go +++ b/head_test.go @@ -33,9 +33,7 @@ func BenchmarkCreateSeries(b *testing.B) { testutil.Ok(b, err) h, err := NewHead(nil, nil, nil, 10000) - if err != nil { - testutil.Ok(b, err) - } + testutil.Ok(b, err) defer h.Close() b.ReportAllocs() diff --git a/repair_test.go b/repair_test.go index cbe138c0..b5ecca61 100644 --- a/repair_test.go +++ b/repair_test.go @@ -2,7 +2,6 @@ package tsdb import ( "os" - "reflect" "testing" "github.com/prometheus/tsdb/chunks" @@ -50,48 +49,32 @@ func TestRepairBadIndexVersion(t *testing.T) { // In its current state, lookups should fail with the fixed code. const dir = "testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/" meta, err := readMetaFile(dir) - if err == nil { - t.Fatal("error expected but got none") - } + testutil.NotOk(t, err) // Touch chunks dir in block. os.MkdirAll(dir+"chunks", 0777) r, err := index.NewFileReader(dir + "index") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) p, err := r.Postings("b", "1") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) for p.Next() { t.Logf("next ID %d", p.At()) var lset labels.Labels - if err := r.Series(p.At(), &lset, nil); err == nil { - t.Fatal("expected error but got none") - } - } - if p.Err() != nil { - t.Fatal(err) + testutil.NotOk(t, r.Series(p.At(), &lset, nil)) } + testutil.Ok(t, p.Err()) testutil.Ok(t, r.Close()) // On DB opening all blocks in the base dir should be repaired. db, err := Open("testdata/repair_index_version", nil, nil, nil) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) db.Close() r, err = index.NewFileReader(dir + "index") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) p, err = r.Postings("b", "1") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) res := []labels.Labels{} for p.Next() { @@ -99,26 +82,17 @@ func TestRepairBadIndexVersion(t *testing.T) { var lset labels.Labels var chks []chunks.Meta - if err := r.Series(p.At(), &lset, &chks); err != nil { - t.Fatal(err) - } + testutil.Ok(t, r.Series(p.At(), &lset, &chks)) res = append(res, lset) } - if p.Err() != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(res, []labels.Labels{ + + testutil.Ok(t, p.Err()) + testutil.Equals(t, []labels.Labels{ {{"a", "1"}, {"b", "1"}}, {{"a", "2"}, {"b", "1"}}, - }) { - t.Fatalf("unexpected result %v", res) - } + }, res) meta, err = readMetaFile(dir) - if err != nil { - t.Fatal(err) - } - if meta.Version != 1 { - t.Fatalf("unexpected meta version %d", meta.Version) - } + testutil.Ok(t, err) + testutil.Assert(t, meta.Version == 1, "unexpected meta version %d", meta.Version) } From 2db59a71a6450f34f557d317a5f2c8ac8ea98a63 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 21 Sep 2018 00:23:01 -0600 Subject: [PATCH 14/15] Fix assert order being backwards in a few places (#388) Signed-off-by: Chris Marchbanks --- db_test.go | 12 ++++++------ head_test.go | 2 +- wal_test.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/db_test.go b/db_test.go index 988fdb7b..6cb18322 100644 --- a/db_test.go +++ b/db_test.go @@ -110,7 +110,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { testutil.Ok(t, err) seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{}) + testutil.Equals(t, map[string][]sample{}, seriesSet) testutil.Ok(t, querier.Close()) err = app.Commit() @@ -122,7 +122,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}) + testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}, seriesSet) } func TestDataNotAvailableAfterRollback(t *testing.T) { @@ -143,7 +143,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{}) + testutil.Equals(t, map[string][]sample{}, seriesSet) } func TestDBAppenderAddRef(t *testing.T) { @@ -179,7 +179,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, errors.Cause(err), ErrNotFound) + testutil.Equals(t, ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -412,7 +412,7 @@ func TestDB_Snapshot(t *testing.T) { testutil.Ok(t, series.Err()) } testutil.Ok(t, seriesSet.Err()) - testutil.Equals(t, sum, 1000.0) + testutil.Equals(t, 1000.0, sum) } func TestDB_SnapshotWithDelete(t *testing.T) { @@ -689,7 +689,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { values, err := q.LabelValues("labelname") testutil.Ok(t, err) - testutil.Equals(t, values, []string{"labelvalue"}) + testutil.Equals(t, []string{"labelvalue"}, values) } func TestTombstoneClean(t *testing.T) { diff --git a/head_test.go b/head_test.go index 4c796886..76a8b72e 100644 --- a/head_test.go +++ b/head_test.go @@ -858,5 +858,5 @@ func TestHead_LogRollback(t *testing.T) { series, ok := recs[0].([]RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}) + testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) } diff --git a/wal_test.go b/wal_test.go index b16680a9..e145188d 100644 --- a/wal_test.go +++ b/wal_test.go @@ -66,7 +66,7 @@ func TestSegmentWAL_cut(t *testing.T) { et, flag, b, err := newWALReader(nil, nil).entry(f) testutil.Ok(t, err) testutil.Equals(t, WALEntrySeries, et) - testutil.Equals(t, flag, byte(walSeriesSimple)) + testutil.Equals(t, byte(walSeriesSimple), flag) testutil.Equals(t, []byte("Hello World!!"), b) } } From d38516b1c2e5c6eb118df3fe84c9679153ef08d3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 21 Sep 2018 09:24:01 +0300 Subject: [PATCH 15/15] remove unused changes variable (#391) This was added in https://github.com/prometheus/tsdb/commit/55a9b5428aceb644b3b297d7a9fd63d0354ce953 and later not used after some refactoring in following PRs. Signed-off-by: Krasi Georgiev --- db.go | 24 +++++++++++------------- db_test.go | 4 ++-- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/db.go b/db.go index 059ef06e..cb02b483 100644 --- a/db.go +++ b/db.go @@ -300,7 +300,7 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - _, err := db.compact() + err := db.compact() if err != nil { level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) @@ -366,12 +366,12 @@ func (a dbAppender) Commit() error { // this is sufficient to reliably delete old data. // Old blocks are only deleted on reload based on the new block's parent information. // See DB.reload documentation for further information. -func (db *DB) compact() (changes bool, err error) { +func (db *DB) compact() (err error) { db.cmtx.Lock() defer db.cmtx.Unlock() if !db.compactionsEnabled { - return false, nil + return nil } // Check whether we have pending head blocks that are ready to be persisted. @@ -379,7 +379,7 @@ func (db *DB) compact() (changes bool, err error) { for { select { case <-db.stopc: - return changes, nil + return nil default: } // The head has a compactable range if 1.5 level 0 ranges are between the oldest @@ -402,14 +402,13 @@ func (db *DB) compact() (changes bool, err error) { maxt: maxt - 1, } if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { - return changes, errors.Wrap(err, "persist head block") + return errors.Wrap(err, "persist head block") } - changes = true runtime.GC() if err := db.reload(); err != nil { - return changes, errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reload blocks") } runtime.GC() } @@ -418,7 +417,7 @@ func (db *DB) compact() (changes bool, err error) { for { plan, err := db.compactor.Plan(db.dir) if err != nil { - return changes, errors.Wrap(err, "plan compaction") + return errors.Wrap(err, "plan compaction") } if len(plan) == 0 { break @@ -426,23 +425,22 @@ func (db *DB) compact() (changes bool, err error) { select { case <-db.stopc: - return changes, nil + return nil default: } if _, err := db.compactor.Compact(db.dir, plan...); err != nil { - return changes, errors.Wrapf(err, "compact %s", plan) + return errors.Wrapf(err, "compact %s", plan) } - changes = true runtime.GC() if err := db.reload(); err != nil { - return changes, errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reload blocks") } runtime.GC() } - return changes, nil + return nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { diff --git a/db_test.go b/db_test.go index 6cb18322..916df5ef 100644 --- a/db_test.go +++ b/db_test.go @@ -1131,7 +1131,7 @@ func TestChunkAtBlockBoundary(t *testing.T) { err := app.Commit() testutil.Ok(t, err) - _, err = db.compact() + err = db.compact() testutil.Ok(t, err) for _, block := range db.blocks { @@ -1183,7 +1183,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { err := app.Commit() testutil.Ok(t, err) - _, err = db.compact() + err = db.compact() testutil.Ok(t, err) testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")