From 8f0950c37adb9709cfca1f4ad22332b2290b2a1b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 10 Nov 2016 18:19:16 -0700 Subject: [PATCH 1/6] Cache results onf findGenerations This allocates quite a bit and it's called multiple times per second per shard. The generations don't change until a compaction has occurred so most of the time is re-calculating the same thing and creating garbage. --- tsdb/engine/tsm1/compact.go | 40 ++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 94cab317a78..bf1ee424101 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -73,6 +73,13 @@ type DefaultPlanner struct { // lastPlanCheck is the last time Plan was called lastPlanCheck time.Time + + mu sync.RWMutex + // lastFindGenerations is the last time findGenerations was run + lastFindGenerations time.Time + + // lastGenerations is the last set of generations found by findGenerations + lastGenerations tsmGenerations } // tsmGeneration represents the TSM files within a generation. @@ -458,6 +465,16 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // findGenerations groups all the TSM files by they generation based // on their filename then returns the generations in descending order (newest first) func (c *DefaultPlanner) findGenerations() tsmGenerations { + c.mu.RLock() + last := c.lastFindGenerations + lastGen := c.lastGenerations + c.mu.RUnlock() + + if !last.IsZero() && c.FileStore.LastModified().Equal(last) { + return lastGen + } + + genTime := c.FileStore.LastModified() tsmStats := c.FileStore.Stats() generations := make(map[int]*tsmGeneration, len(tsmStats)) for _, f := range tsmStats { @@ -477,7 +494,15 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { for _, g := range generations { orderedGenerations = append(orderedGenerations, g) } - sort.Sort(orderedGenerations) + if !orderedGenerations.IsSorted() { + sort.Sort(orderedGenerations) + } + + c.mu.Lock() + c.lastFindGenerations = genTime + c.lastGenerations = orderedGenerations + c.mu.Unlock() + return orderedGenerations } @@ -1323,3 +1348,16 @@ func (a tsmGenerations) chunk(size int) []tsmGenerations { } return chunks } + +func (a tsmGenerations) IsSorted() bool { + if len(a) == 1 { + return true + } + + for i := 1; i < len(a); i++ { + if a.Less(i, i-1) { + return false + } + } + return true +} From 9bf54d9dc8f26def616f66e6b726523596957446 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 10 Nov 2016 18:20:53 -0700 Subject: [PATCH 2/6] Switch time.Sleep to time.Ticker Avoids an allocation when calling time.Sleep --- tsdb/engine/tsm1/engine.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cc757933322..ae871e5e23e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -930,12 +930,14 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk func (e *Engine) compactCache(quit <-chan struct{}) { + t := time.NewTimer(time.Second) + defer t.Stop() for { select { case <-quit: return - default: + case <-t.C: e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() @@ -950,7 +952,7 @@ func (e *Engine) compactCache(quit <-chan struct{}) { atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) } } - time.Sleep(time.Second) + t.Reset(time.Second) } } @@ -968,38 +970,41 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { + t := time.NewTimer(time.Second) + defer t.Stop() + for { select { case <-quit: return - default: + case <-t.C: s := e.levelCompactionStrategy(fast, level) - if s == nil { - time.Sleep(time.Second) - continue + if s != nil { + s.Apply() } - - s.Apply() } + t.Reset(time.Second) } } func (e *Engine) compactTSMFull(quit <-chan struct{}) { + t := time.NewTimer(time.Second) + defer t.Stop() + for { select { case <-quit: return - default: + case <-t.C: s := e.fullCompactionStrategy() - if s == nil { - time.Sleep(time.Second) - continue + if s != nil { + s.Apply() } - s.Apply() } + t.Reset(time.Second) } } From bea91f74b7b0fbd7921a2e3e7d2330232ff72f01 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 11 Jan 2017 17:57:40 -0700 Subject: [PATCH 3/6] Fix compactions sometimes getting stuck I ran into an issue where the cache snapshotting seemed to stop completely causing the cache to fill up and never recover. I believe this is due to the the Timer being reused incorrectly. Instead, use a Ticker that will fire more regularly and not require the resetting logic (which was wrong). --- tsdb/engine/tsm1/engine.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ae871e5e23e..570fcd514c5 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -930,7 +930,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk func (e *Engine) compactCache(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { select { @@ -952,7 +952,6 @@ func (e *Engine) compactCache(quit <-chan struct{}) { atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) } } - t.Reset(time.Second) } } @@ -970,7 +969,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -984,12 +983,11 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { s.Apply() } } - t.Reset(time.Second) } } func (e *Engine) compactTSMFull(quit <-chan struct{}) { - t := time.NewTimer(time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1004,7 +1002,6 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { } } - t.Reset(time.Second) } } From cc49c52035d07721609c01862235a53f0ba43cea Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 10 Nov 2016 18:18:28 -0700 Subject: [PATCH 4/6] Use time.Tick instead of time.After Eliminates an allocation --- services/continuous_querier/service.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index a51150e0b67..4dc4d8d7257 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -196,6 +196,8 @@ func (s *Service) Run(database, name string, t time.Time) error { // backgroundLoop runs on a go routine and periodically executes CQs. func (s *Service) backgroundLoop() { leaseName := "continuous_querier" + t := time.NewTimer(s.RunInterval) + defer t.Stop() defer s.wg.Done() for { select { @@ -210,13 +212,15 @@ func (s *Service) backgroundLoop() { s.Logger.Printf("running continuous queries by request for time: %v", req.Now) s.runContinuousQueries(req) } - case <-time.After(s.RunInterval): + case <-t.C: if !s.hasContinuousQueries() { + t.Reset(s.RunInterval) continue } if _, err := s.MetaClient.AcquireLease(leaseName); err == nil { s.runContinuousQueries(&RunRequest{Now: time.Now()}) } + t.Reset(s.RunInterval) } } } From db5c5147ac25a0eda72ba87294fb9842df0da2a6 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 10 Nov 2016 18:22:24 -0700 Subject: [PATCH 5/6] Avoid allocation when counting tag keys A new sorted slice was called by the monitor func every 10s. The tag keys don't need to be sorted so this avoid the allocation of the slice and one during sorting. --- tsdb/meta.go | 11 +++++++++++ tsdb/shard.go | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/tsdb/meta.go b/tsdb/meta.go index f37c57fc907..a9ee17eca94 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1929,6 +1929,17 @@ func MarshalTags(tags map[string]string) []byte { return b } +// WalkTagKeys calls fn for each tag key associated with m. The order of the +// keys is undefined. +func (m *Measurement) WalkTagKeys(fn func(k string)) { + m.mu.RLock() + defer m.mu.RUnlock() + + for k := range m.seriesByTagKeyValue { + fn(k) + } +} + // TagKeys returns a list of the measurement's tag names. func (m *Measurement) TagKeys() []string { m.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index b460534dcaf..2c5ec264691 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -856,7 +856,7 @@ func (s *Shard) monitor() { } for _, m := range s.index.Measurements() { - for _, k := range m.TagKeys() { + m.WalkTagKeys(func(k string) { n := m.Cardinality(k) perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) if perc > 100 { @@ -868,7 +868,7 @@ func (s *Shard) monitor() { s.logger.Printf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k) } - } + }) } } } From d3eeaaab6fe9a0dacc2d83b9061fd492079012d2 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 15 Nov 2016 12:43:00 -0700 Subject: [PATCH 6/6] Switch all Value types from pointers --- tsdb/engine/tsm1/cache.go | 8 +- tsdb/engine/tsm1/encoding.go | 92 +++++++++++------------ tsdb/engine/tsm1/encoding_test.go | 8 +- tsdb/engine/tsm1/iterator.gen.go | 16 ++-- tsdb/engine/tsm1/iterator.gen.go.tmpldata | 8 +- tsdb/engine/tsm1/pools.go | 16 ++-- tsdb/engine/tsm1/wal.go | 65 +++++++++------- 7 files changed, 111 insertions(+), 102 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 937bc1121b0..2dd6b358f5e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -683,13 +683,13 @@ func (c *Cache) updateMemSize(b int64) { func valueType(v Value) int { switch v.(type) { - case *FloatValue: + case FloatValue: return 1 - case *IntegerValue: + case IntegerValue: return 2 - case *StringValue: + case StringValue: return 3 - case *BooleanValue: + case BooleanValue: return 4 default: return 0 diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index bbf85de5d0e..2ce815003fa 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -100,31 +100,31 @@ type Value interface { func NewValue(t int64, value interface{}) Value { switch v := value.(type) { case int64: - return &IntegerValue{unixnano: t, value: v} + return IntegerValue{unixnano: t, value: v} case float64: - return &FloatValue{unixnano: t, value: v} + return FloatValue{unixnano: t, value: v} case bool: - return &BooleanValue{unixnano: t, value: v} + return BooleanValue{unixnano: t, value: v} case string: - return &StringValue{unixnano: t, value: v} + return StringValue{unixnano: t, value: v} } return EmptyValue{} } func NewIntegerValue(t int64, v int64) Value { - return &IntegerValue{unixnano: t, value: v} + return IntegerValue{unixnano: t, value: v} } func NewFloatValue(t int64, v float64) Value { - return &FloatValue{unixnano: t, value: v} + return FloatValue{unixnano: t, value: v} } func NewBooleanValue(t int64, v bool) Value { - return &BooleanValue{unixnano: t, value: v} + return BooleanValue{unixnano: t, value: v} } func NewStringValue(t int64, v string) Value { - return &StringValue{unixnano: t, value: v} + return StringValue{unixnano: t, value: v} } type EmptyValue struct{} @@ -134,11 +134,11 @@ func (e EmptyValue) Value() interface{} { return nil } func (e EmptyValue) Size() int { return 0 } func (e EmptyValue) String() string { return "" } -func (_ EmptyValue) internalOnly() {} -func (_ *StringValue) internalOnly() {} -func (_ *IntegerValue) internalOnly() {} -func (_ *BooleanValue) internalOnly() {} -func (_ *FloatValue) internalOnly() {} +func (_ EmptyValue) internalOnly() {} +func (_ StringValue) internalOnly() {} +func (_ IntegerValue) internalOnly() {} +func (_ BooleanValue) internalOnly() {} +func (_ FloatValue) internalOnly() {} // Encode converts the values to a byte slice. If there are no values, // this function panics. @@ -148,13 +148,13 @@ func (a Values) Encode(buf []byte) ([]byte, error) { } switch a[0].(type) { - case *FloatValue: + case FloatValue: return encodeFloatBlock(buf, a) - case *IntegerValue: + case IntegerValue: return encodeIntegerBlock(buf, a) - case *BooleanValue: + case BooleanValue: return encodeBooleanBlock(buf, a) - case *StringValue: + case StringValue: return encodeStringBlock(buf, a) } @@ -168,13 +168,13 @@ func (a Values) InfluxQLType() (influxql.DataType, error) { } switch a[0].(type) { - case *FloatValue: + case FloatValue: return influxql.Float, nil - case *IntegerValue: + case IntegerValue: return influxql.Integer, nil - case *BooleanValue: + case BooleanValue: return influxql.Boolean, nil - case *StringValue: + case StringValue: return influxql.String, nil } @@ -225,7 +225,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err case BlockInteger: @@ -235,7 +235,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -246,7 +246,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -257,7 +257,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -271,19 +271,19 @@ type FloatValue struct { value float64 } -func (f *FloatValue) UnixNano() int64 { +func (f FloatValue) UnixNano() int64 { return f.unixnano } -func (f *FloatValue) Value() interface{} { +func (f FloatValue) Value() interface{} { return f.value } -func (f *FloatValue) Size() int { +func (f FloatValue) Size() int { return 16 } -func (f *FloatValue) String() string { +func (f FloatValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.value) } @@ -306,7 +306,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsenc.Write(v.UnixNano()) - venc.Push(v.(*FloatValue).value) + venc.Push(v.(FloatValue).value) } venc.Finish() @@ -398,19 +398,19 @@ type BooleanValue struct { value bool } -func (b *BooleanValue) Size() int { +func (b BooleanValue) Size() int { return 9 } -func (b *BooleanValue) UnixNano() int64 { +func (b BooleanValue) UnixNano() int64 { return b.unixnano } -func (b *BooleanValue) Value() interface{} { +func (b BooleanValue) Value() interface{} { return b.value } -func (f *BooleanValue) String() string { +func (f BooleanValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } @@ -430,7 +430,7 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsenc.Write(v.UnixNano()) - venc.Write(v.(*BooleanValue).value) + venc.Write(v.(BooleanValue).value) } // Encoded timestamp values @@ -516,19 +516,19 @@ type IntegerValue struct { value int64 } -func (v *IntegerValue) Value() interface{} { +func (v IntegerValue) Value() interface{} { return v.value } -func (v *IntegerValue) UnixNano() int64 { +func (v IntegerValue) UnixNano() int64 { return v.unixnano } -func (v *IntegerValue) Size() int { +func (v IntegerValue) Size() int { return 16 } -func (f *IntegerValue) String() string { +func (f IntegerValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } @@ -540,7 +540,7 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*IntegerValue).value) + vEnc.Write(v.(IntegerValue).value) } // Encoded timestamp values @@ -626,31 +626,31 @@ type StringValue struct { value string } -func (v *StringValue) Value() interface{} { +func (v StringValue) Value() interface{} { return v.value } -func (v *StringValue) UnixNano() int64 { +func (v StringValue) UnixNano() int64 { return v.unixnano } -func (v *StringValue) Size() int { +func (v StringValue) Size() int { return 8 + len(v.value) } -func (f *StringValue) String() string { +func (f StringValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) - vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value)) + vEnc := getStringEncoder(len(values) * len(values[0].(StringValue).value)) var b []byte err := func() error { for _, v := range values { tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*StringValue).value) + vEnc.Write(v.(StringValue).value) } // Encoded timestamp values diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index b4b384da1c8..07e36b124cb 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -503,7 +503,7 @@ func TestValues_MergeFloat(t *testing.T) { func TestIntegerValues_Merge(t *testing.T) { integerValue := func(t int64, f int64) tsm1.IntegerValue { - return *(tsm1.NewValue(t, f).(*tsm1.IntegerValue)) + return tsm1.NewValue(t, f).(tsm1.IntegerValue) } tests := []struct { @@ -636,7 +636,7 @@ func TestIntegerValues_Merge(t *testing.T) { func TestFloatValues_Merge(t *testing.T) { floatValue := func(t int64, f float64) tsm1.FloatValue { - return *(tsm1.NewValue(t, f).(*tsm1.FloatValue)) + return tsm1.NewValue(t, f).(tsm1.FloatValue) } tests := []struct { @@ -765,7 +765,7 @@ func TestFloatValues_Merge(t *testing.T) { func TestBooleanValues_Merge(t *testing.T) { booleanValue := func(t int64, f bool) tsm1.BooleanValue { - return *(tsm1.NewValue(t, f).(*tsm1.BooleanValue)) + return tsm1.NewValue(t, f).(tsm1.BooleanValue) } tests := []struct { @@ -894,7 +894,7 @@ func TestBooleanValues_Merge(t *testing.T) { func TestStringValues_Merge(t *testing.T) { stringValue := func(t int64, f string) tsm1.StringValue { - return *(tsm1.NewValue(t, f).(*tsm1.StringValue)) + return tsm1.NewValue(t, f).(tsm1.StringValue) } tests := []struct { diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index 762a0467996..66c7c8470c8 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -343,7 +343,7 @@ func (c *floatAscendingCursor) peekCache() (t int64, v float64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*FloatValue).value + return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. @@ -463,7 +463,7 @@ func (c *floatDescendingCursor) peekCache() (t int64, v float64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*FloatValue).value + return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. @@ -786,7 +786,7 @@ func (c *integerAscendingCursor) peekCache() (t int64, v int64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*IntegerValue).value + return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. @@ -906,7 +906,7 @@ func (c *integerDescendingCursor) peekCache() (t int64, v int64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*IntegerValue).value + return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. @@ -1229,7 +1229,7 @@ func (c *stringAscendingCursor) peekCache() (t int64, v string) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*StringValue).value + return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. @@ -1349,7 +1349,7 @@ func (c *stringDescendingCursor) peekCache() (t int64, v string) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*StringValue).value + return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. @@ -1672,7 +1672,7 @@ func (c *booleanAscendingCursor) peekCache() (t int64, v bool) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*BooleanValue).value + return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. @@ -1792,7 +1792,7 @@ func (c *booleanDescendingCursor) peekCache() (t int64, v bool) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*BooleanValue).value + return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpldata b/tsdb/engine/tsm1/iterator.gen.go.tmpldata index 4aacad62d82..36e7b8311cf 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpldata +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpldata @@ -3,28 +3,28 @@ "Name":"Float", "name":"float", "Type":"float64", - "ValueType":"*FloatValue", + "ValueType":"FloatValue", "Nil":"0" }, { "Name":"Integer", "name":"integer", "Type":"int64", - "ValueType":"*IntegerValue", + "ValueType":"IntegerValue", "Nil":"0" }, { "Name":"String", "name":"string", "Type":"string", - "ValueType":"*StringValue", + "ValueType":"StringValue", "Nil":"\"\"" }, { "Name":"Boolean", "name":"boolean", "Type":"bool", - "ValueType":"*BooleanValue", + "ValueType":"BooleanValue", "Nil":"false" } ] diff --git a/tsdb/engine/tsm1/pools.go b/tsdb/engine/tsm1/pools.go index 8f8b05e26fb..92d6db75d1b 100644 --- a/tsdb/engine/tsm1/pools.go +++ b/tsdb/engine/tsm1/pools.go @@ -39,7 +39,7 @@ func getFloat64Values(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &FloatValue{} + buf[i] = FloatValue{} } } return buf[:size] @@ -65,7 +65,7 @@ func getIntegerValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &IntegerValue{} + buf[i] = IntegerValue{} } } return buf[:size] @@ -91,7 +91,7 @@ func getBooleanValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &BooleanValue{} + buf[i] = BooleanValue{} } } return buf[:size] @@ -117,7 +117,7 @@ func getStringValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &StringValue{} + buf[i] = StringValue{} } } return buf[:size] @@ -130,13 +130,13 @@ func putBooleanValues(buf []Value) { func putValue(buf []Value) { if len(buf) > 0 { switch buf[0].(type) { - case *FloatValue: + case FloatValue: putFloat64Values(buf) - case *IntegerValue: + case IntegerValue: putIntegerValues(buf) - case *BooleanValue: + case BooleanValue: putBooleanValues(buf) - case *StringValue: + case StringValue: putStringValues(buf) } } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index c5ac249bd77..0f8aa0bc7e4 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -516,13 +516,13 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { encLen += 8 * len(v) // timestamps (8) switch v[0].(type) { - case *FloatValue, *IntegerValue: + case FloatValue, IntegerValue: encLen += 8 * len(v) - case *BooleanValue: + case BooleanValue: encLen += 1 * len(v) - case *StringValue: + case StringValue: for _, vv := range v { - str, ok := vv.(*StringValue) + str, ok := vv.(StringValue) if !ok { return nil, fmt.Errorf("non-string found in string value slice: %T", vv) } @@ -546,13 +546,13 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { for k, v := range w.Values { switch v[0].(type) { - case *FloatValue: + case FloatValue: curType = float64EntryType - case *IntegerValue: + case IntegerValue: curType = integerEntryType - case *BooleanValue: + case BooleanValue: curType = booleanEntryType - case *StringValue: + case StringValue: curType = stringEntryType default: return nil, fmt.Errorf("unsupported value type: %T", v[0]) @@ -572,19 +572,19 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { n += 8 switch vv := vv.(type) { - case *FloatValue: + case FloatValue: if curType != float64EntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value)) n += 8 - case *IntegerValue: + case IntegerValue: if curType != integerEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value)) n += 8 - case *BooleanValue: + case BooleanValue: if curType != booleanEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } @@ -594,7 +594,7 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { dst[n] = 0 } n++ - case *StringValue: + case StringValue: if curType != stringEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } @@ -647,19 +647,19 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { switch typ { case float64EntryType: for i := 0; i < nvals; i++ { - values[i] = &FloatValue{} + values[i] = FloatValue{} } case integerEntryType: for i := 0; i < nvals; i++ { - values[i] = &IntegerValue{} + values[i] = IntegerValue{} } case booleanEntryType: for i := 0; i < nvals; i++ { - values[i] = &BooleanValue{} + values[i] = BooleanValue{} } case stringEntryType: for i := 0; i < nvals; i++ { - values[i] = &StringValue{} + values[i] = StringValue{} } default: @@ -682,9 +682,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 - if fv, ok := values[j].(*FloatValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(FloatValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } case integerEntryType: if i+8 > len(b) { @@ -693,9 +695,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - if fv, ok := values[j].(*IntegerValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(IntegerValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } case booleanEntryType: if i >= len(b) { @@ -704,13 +708,16 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := b[i] i += 1 - if fv, ok := values[j].(*BooleanValue); ok { + if fv, ok := values[j].(BooleanValue); ok { + x := (&fv) + x.unixnano = un fv.unixnano = un if v == 1 { - fv.value = true + x.value = true } else { - fv.value = false + x.value = false } + values[j] = *x } case stringEntryType: if i+4 > len(b) { @@ -730,9 +737,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := string(b[i : i+length]) i += length - if fv, ok := values[j].(*StringValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(StringValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } default: return fmt.Errorf("unsupported value type: %#v", typ)