Skip to content

Commit

Permalink
inmem startup improvments
Browse files Browse the repository at this point in the history
* only call ParseTags when necessary
* remove dependency on inmem.Series in tsdb test package
* Measurement and Series are no longer exported. Their use is restricted
  to the inmem package
* improve Measurement and Series types by exporting immutable
  fields and removing unnecessary APIs and locks

Reduced startup time from 28s to 17s. Overall improvement including
#9162 reduces startup from 46s to 17s for 1MM series across 14 shards.
  • Loading branch information
stuartcarnie committed Dec 29, 2017
1 parent 4cc1545 commit 5dfe3b2
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 320 deletions.
11 changes: 7 additions & 4 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ func ParseKeyBytes(buf []byte) ([]byte, Tags) {
return buf[:i], tags
}

func ParseTags(buf []byte) (Tags, error) {
return parseTags(buf), nil
func ParseTags(buf []byte) Tags {
return parseTags(buf)
}

func ParseName(buf []byte) ([]byte, error) {
Expand Down Expand Up @@ -1528,9 +1528,12 @@ func parseTags(buf []byte) Tags {
return nil
}

tags := make(Tags, 0, bytes.Count(buf, []byte(",")))
tags := make(Tags, bytes.Count(buf, []byte(",")))
p := 0
walkTags(buf, func(key, value []byte) bool {
tags = append(tags, NewTag(key, value))
tags[p].Key = key
tags[p].Value = value
p++
return true
})
return tags
Expand Down
7 changes: 7 additions & 0 deletions models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,13 @@ func BenchmarkEscapeString_QuotesAndBackslashes(b *testing.B) {
}
}

func BenchmarkParseTags(b *testing.B) {
tags := []byte("cpu,tag0=value0,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5")
for i := 0; i < b.N; i++ {
models.ParseTags(tags)
}
}

func init() {
// Force uint support to be enabled for testing.
models.EnableUintSupport()
Expand Down
2 changes: 1 addition & 1 deletion services/storage/series_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ RETRY:
keyb := []byte(key)
mm, _ := models.ParseName(keyb)
c.row.measurement = string(mm)
c.tags, _ = models.ParseTags(keyb)
c.tags = models.ParseTags(keyb)

c.filterset = mapValuer{"_name": c.row.measurement}
for _, tag := range c.tags {
Expand Down
46 changes: 22 additions & 24 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
}

if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}

if err := e.addToIndexFromKey(key, fieldType); err != nil {
Expand Down Expand Up @@ -1050,9 +1050,9 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
// lock contention on the index.
merged := merge(readers...)
for v := range merged {
fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ)
if err != nil {
return err
fieldType := BlockTypeToInfluxQLDataType(v.typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", v.typ)
}

if err := e.addToIndexFromKey(v.key, fieldType); err != nil {
Expand Down Expand Up @@ -1122,10 +1122,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro

// Build in-memory index, if necessary.
if e.index.Type() == inmem.IndexName {
tags, _ := models.ParseTags(seriesKey)
if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil {
return err
}
e.index.InitializeSeries(seriesKey, name)
}

return nil
Expand Down Expand Up @@ -2703,21 +2700,22 @@ func SeriesFieldKeyBytes(seriesKey, field string) []byte {
return b
}

func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
switch typ {
case BlockFloat64:
return influxql.Float, nil
case BlockInteger:
return influxql.Integer, nil
case BlockUnsigned:
return influxql.Unsigned, nil
case BlockBoolean:
return influxql.Boolean, nil
case BlockString:
return influxql.String, nil
default:
return influxql.Unknown, fmt.Errorf("unknown block type: %v", typ)
var (
blockToFieldType = []influxql.DataType{
BlockFloat64: influxql.Float,
BlockInteger: influxql.Integer,
BlockBoolean: influxql.Boolean,
BlockString: influxql.String,
BlockUnsigned: influxql.Unsigned,
}
)

func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType {
if int(typ) < len(blockToFieldType) {
return blockToFieldType[typ]
}

return influxql.Unknown
}

// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
Expand Down
19 changes: 19 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,25 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}

func makeBlockTypeSlice(n int) []byte {
r := make([]byte, n)
b := tsm1.BlockFloat64
m := tsm1.BlockUnsigned + 1
for i := 0; i < len(r); i++ {
r[i] = b % m
}
return r
}

func BenchmarkBlockTypeToInfluxQLDataType(b *testing.B) {
t := makeBlockTypeSlice(100)
for i := 0; i < b.N; i++ {
for j := 0; j < len(t); j++ {
tsm1.BlockTypeToInfluxQLDataType(t[j])
}
}
}

// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
// is not raised.
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Index interface {
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error

InitializeSeries(key, name []byte, tags models.Tags) error
InitializeSeries(key, name []byte)
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte, ts int64) error
Expand Down
Loading

0 comments on commit 5dfe3b2

Please sign in to comment.