diff --git a/models/points.go b/models/points.go index 508d73a6bac..ad80a816bf1 100644 --- a/models/points.go +++ b/models/points.go @@ -281,8 +281,8 @@ func ParseKeyBytes(buf []byte) ([]byte, Tags) { return buf[:i], tags } -func ParseTags(buf []byte) (Tags, error) { - return parseTags(buf), nil +func ParseTags(buf []byte) Tags { + return parseTags(buf) } func ParseName(buf []byte) ([]byte, error) { @@ -1528,9 +1528,12 @@ func parseTags(buf []byte) Tags { return nil } - tags := make(Tags, 0, bytes.Count(buf, []byte(","))) + tags := make(Tags, bytes.Count(buf, []byte(","))) + p := 0 walkTags(buf, func(key, value []byte) bool { - tags = append(tags, NewTag(key, value)) + tags[p].Key = key + tags[p].Value = value + p++ return true }) return tags diff --git a/models/points_test.go b/models/points_test.go index 6234fd02032..ca1552101c3 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -2381,6 +2381,13 @@ func BenchmarkEscapeString_QuotesAndBackslashes(b *testing.B) { } } +func BenchmarkParseTags(b *testing.B) { + tags := []byte("cpu,tag0=value0,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5") + for i := 0; i < b.N; i++ { + models.ParseTags(tags) + } +} + func init() { // Force uint support to be enabled for testing. models.EnableUintSupport() diff --git a/services/storage/series_cursor.go b/services/storage/series_cursor.go index 3f4100f89b1..fc9395dce39 100644 --- a/services/storage/series_cursor.go +++ b/services/storage/series_cursor.go @@ -157,7 +157,7 @@ RETRY: keyb := []byte(key) mm, _ := models.ParseName(keyb) c.row.measurement = string(mm) - c.tags, _ = models.ParseTags(keyb) + c.tags = models.ParseTags(keyb) c.filterset = mapValuer{"_name": c.row.measurement} for _, tag := range c.tags { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7d8d09e3f81..440c769a264 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -690,9 +690,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { } if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error { - fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) - if err != nil { - return err + fieldType := BlockTypeToInfluxQLDataType(typ) + if fieldType == influxql.Unknown { + return fmt.Errorf("unknown block type: %v", typ) } if err := e.addToIndexFromKey(key, fieldType); err != nil { @@ -1050,9 +1050,9 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { // lock contention on the index. merged := merge(readers...) for v := range merged { - fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ) - if err != nil { - return err + fieldType := BlockTypeToInfluxQLDataType(v.typ) + if fieldType == influxql.Unknown { + return fmt.Errorf("unknown block type: %v", v.typ) } if err := e.addToIndexFromKey(v.key, fieldType); err != nil { @@ -1122,10 +1122,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro // Build in-memory index, if necessary. if e.index.Type() == inmem.IndexName { - tags, _ := models.ParseTags(seriesKey) - if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil { - return err - } + e.index.InitializeSeries(seriesKey, name) } return nil @@ -2703,21 +2700,22 @@ func SeriesFieldKeyBytes(seriesKey, field string) []byte { return b } -func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) { - switch typ { - case BlockFloat64: - return influxql.Float, nil - case BlockInteger: - return influxql.Integer, nil - case BlockUnsigned: - return influxql.Unsigned, nil - case BlockBoolean: - return influxql.Boolean, nil - case BlockString: - return influxql.String, nil - default: - return influxql.Unknown, fmt.Errorf("unknown block type: %v", typ) +var ( + blockToFieldType = []influxql.DataType{ + BlockFloat64: influxql.Float, + BlockInteger: influxql.Integer, + BlockBoolean: influxql.Boolean, + BlockString: influxql.String, + BlockUnsigned: influxql.Unsigned, + } +) + +func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType { + if int(typ) < len(blockToFieldType) { + return blockToFieldType[typ] } + + return influxql.Unknown } // SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key. diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 5ecee30c93e..6a143a5ce9e 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1391,6 +1391,25 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { } } +func makeBlockTypeSlice(n int) []byte { + r := make([]byte, n) + b := tsm1.BlockFloat64 + m := tsm1.BlockUnsigned + 1 + for i := 0; i < len(r); i++ { + r[i] = b % m + } + return r +} + +func BenchmarkBlockTypeToInfluxQLDataType(b *testing.B) { + t := makeBlockTypeSlice(100) + for i := 0; i < b.N; i++ { + for j := 0; j < len(t); j++ { + tsm1.BlockTypeToInfluxQLDataType(t[j]) + } + } +} + // This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is // is not raised. func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) { diff --git a/tsdb/index.go b/tsdb/index.go index d07a9c0e038..922ff9d68be 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -29,7 +29,7 @@ type Index interface { DropMeasurement(name []byte) error ForEachMeasurementName(fn func(name []byte) error) error - InitializeSeries(key, name []byte, tags models.Tags) error + InitializeSeries(key, name []byte) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DropSeries(key []byte, ts int64) error diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 94775144e57..09321b10abf 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -53,8 +53,8 @@ type Index struct { fieldset *tsdb.MeasurementFieldSet // In-memory metadata index, built on load and updated when new series come in - measurements map[string]*Measurement // measurement name to object and index - series map[string]*Series // map series key to the Series object + measurements map[string]*measurement // measurement name to object and index + series map[string]*series // map series key to the Series object seriesSketch, seriesTSSketch *hll.Plus measurementsSketch, measurementsTSSketch *hll.Plus @@ -68,8 +68,8 @@ func NewIndex(database string, sfile *tsdb.SeriesFile) *Index { index := &Index{ database: database, sfile: sfile, - measurements: make(map[string]*Measurement), - series: make(map[string]*Series), + measurements: make(map[string]*measurement), + series: make(map[string]*series), } index.seriesSketch = hll.NewDefaultPlus() @@ -92,7 +92,7 @@ func (i *Index) Database() string { } // Series returns a series by key. -func (i *Index) Series(key []byte) (*Series, error) { +func (i *Index) Series(key []byte) (*series, error) { i.mu.RLock() s := i.series[string(key)] i.mu.RUnlock() @@ -117,7 +117,7 @@ func (i *Index) SeriesN() int64 { } // Measurement returns the measurement object from the index by the name -func (i *Index) Measurement(name []byte) (*Measurement, error) { +func (i *Index) Measurement(name []byte) (*measurement, error) { i.mu.RLock() defer i.mu.RUnlock() return i.measurements[string(name)], nil @@ -138,11 +138,11 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro } // MeasurementsByName returns a list of measurements. -func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error) { +func (i *Index) MeasurementsByName(names [][]byte) ([]*measurement, error) { i.mu.RLock() defer i.mu.RUnlock() - a := make([]*Measurement, 0, len(names)) + a := make([]*measurement, 0, len(names)) for _, name := range names { if m := i.measurements[string(name)]; m != nil { a = append(a, m) @@ -202,14 +202,12 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m // set the in memory ID for query processing on this shard // The series key and tags are clone to prevent a memory leak - series := NewSeries([]byte(string(key)), tags.Clone()) - series.ID = seriesID + skey := string(key) + ss = newSeries(seriesID, m, skey, tags.Clone()) + i.series[skey] = ss - series.SetMeasurement(m) - i.series[string(key)] = series - - m.AddSeries(series) - series.AssignShard(shardID, time.Now().UnixNano()) + m.AddSeries(ss) + ss.AssignShard(shardID, time.Now().UnixNano()) // Add the series to the series sketch. i.seriesSketch.Add(key) @@ -219,7 +217,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m // CreateMeasurementIndexIfNotExists creates or retrieves an in memory index // object for the measurement -func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement { +func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *measurement { name = escape.Unescape(name) // See if the measurement exists using a read-lock @@ -239,7 +237,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement { // and acquire the write lock m = i.measurements[string(name)] if m == nil { - m = NewMeasurement(i.database, string(name)) + m = newMeasurement(i.database, string(name)) i.measurements[string(name)] = m // Add the measurement to the measurements sketch. @@ -313,19 +311,19 @@ func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, ke // possible to get the set of unique series IDs for a given measurement name // and tag key. var authorized bool - mm.SeriesByTagKeyValue(key).Range(func(_ string, seriesIDs SeriesIDs) bool { + mm.SeriesByTagKeyValue(key).Range(func(_ string, sIDs seriesIDs) bool { if auth == nil || auth == query.OpenAuthorizer { authorized = true return false } - for _, id := range seriesIDs { + for _, id := range sIDs { s := mm.SeriesByID(id) if s == nil { continue } - if auth.AuthorizeSeriesRead(i.database, mm.name, s.Tags()) { + if auth.AuthorizeSeriesRead(i.database, mm.NameBytes, s.Tags) { authorized = true return false } @@ -388,13 +386,13 @@ func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte if s == nil { continue } - if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement().name, s.Tags()) { + if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement.NameBytes, s.Tags) { continue } // Iterate the tag keys we're interested in and collect values // from this series, if they exist. - for _, t := range s.Tags() { + for _, t := range s.Tags { if idx, ok := keyIdxs[string(t.Key)]; ok { resultSet[idx].add(string(t.Value)) } else if string(t.Key) > keys[len(keys)-1] { @@ -452,7 +450,7 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) { if ss == nil { return nil, nil } - return ss.Tags(), nil + return ss.Tags, nil } // MeasurementNamesByExpr takes an expression containing only tags and returns a @@ -469,7 +467,7 @@ func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr a := make([][]byte, 0, len(i.measurements)) for _, m := range i.measurements { if m.Authorized(auth) { - a = append(a, m.name) + a = append(a, m.NameBytes) } } bytesutil.Sort(a) @@ -561,7 +559,7 @@ func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql. } if matched && m.Authorized(auth) { - names = append(names, m.name) + names = append(names, m.NameBytes) } } bytesutil.Sort(names) @@ -593,7 +591,7 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF // Check the tag values belonging to the tag key for equivalence to the // tag value being filtered on. - tagVals.Range(func(tv string, seriesIDs SeriesIDs) bool { + tagVals.Range(func(tv string, seriesIDs seriesIDs) bool { if !valEqual(tv) { return true // No match. Keep checking. } @@ -613,7 +611,7 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF continue } - if s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) { + if s != nil && auth.AuthorizeSeriesRead(i.database, m.NameBytes, s.Tags) { // The Range call can return early as a matching // tag value with an authorized series has been found. authorized = true @@ -714,13 +712,13 @@ func (i *Index) DropSeries(key []byte, ts int64) error { delete(i.series, k) // Remove the measurement's reference. - series.Measurement().DropSeries(series) + series.Measurement.DropSeries(series) // Mark the series as deleted. series.Delete(ts) // If the measurement no longer has any series, remove it as well. - if !series.Measurement().HasSeries() { - i.dropMeasurement(series.Measurement().Name) + if !series.Measurement.HasSeries() { + i.dropMeasurement(series.Measurement.Name) } return nil @@ -778,7 +776,7 @@ func (i *Index) SetFieldName(measurement []byte, name string) { // ForEachMeasurementName iterates over each measurement name. func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { i.mu.RLock() - mms := make(Measurements, 0, len(i.measurements)) + mms := make(measurements, 0, len(i.measurements)) for _, m := range i.measurements { mms = append(mms, m) } @@ -931,7 +929,7 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat defer i.mu.RUnlock() // Read and sort all measurements. - mms := make(Measurements, 0, len(i.measurements)) + mms := make(measurements, 0, len(i.measurements)) for _, mm := range i.measurements { mms = append(mms, mm) } @@ -1123,8 +1121,45 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli // InitializeSeries is called during start-up. // This works the same as CreateSeriesIfNotExists except it ignore limit errors. -func (i *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error { - return i.Index.CreateSeriesIfNotExists(i.id, key, name, tags, &i.opt, true) +func (i *ShardIndex) InitializeSeries(key, name []byte) { + i.mu.RLock() + // if there is a series for this id, it's already been added + ss := i.series[string(key)] + i.mu.RUnlock() + + if ss != nil { + ss.InitializeShard(i.id) + return + } + + // get or create the measurement index + m := i.CreateMeasurementIndexIfNotExists(name) + + i.mu.Lock() + defer i.mu.Unlock() + + // Check for the series again under a write lock + ss = i.series[string(key)] + if ss != nil { + ss.InitializeShard(i.id) + return + } + + tags := models.ParseTags(key) + + // set the in memory ID for query processing on this shard + // The series key and tags are clone to prevent a memory leak + skey := string(key) + i.lastID++ + ss = newSeries(i.lastID, m, skey, tags.Clone()) + + i.series[skey] = ss + + m.AddSeries(ss) + ss.InitializeShard(i.id) + + // Add the series to the series sketch. + i.seriesSketch.Add(key) } func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { @@ -1148,9 +1183,9 @@ func NewShardIndex(id uint64, database, path string, sfile *tsdb.SeriesFile, opt // seriesIDIterator emits series ids. type seriesIDIterator struct { database string - mms Measurements + mms measurements keys struct { - buf []*Series + buf []*series i int } opt query.IteratorOptions @@ -1179,7 +1214,7 @@ func (itr *seriesIDIterator) Next() (tsdb.SeriesIDElem, error) { series := itr.keys.buf[itr.keys.i] itr.keys.i++ - if !itr.opt.Authorizer.AuthorizeSeriesRead(itr.database, series.measurement.name, series.tags) { + if !itr.opt.Authorizer.AuthorizeSeriesRead(itr.database, series.Measurement.NameBytes, series.Tags) { continue } @@ -1224,20 +1259,20 @@ var errMaxSeriesPerDatabaseExceeded = errors.New("max series per database exceed type seriesIterator struct { keys [][]byte - elem series + elem seriesElement } -type series struct { +type seriesElement struct { tsdb.SeriesElem name []byte tags models.Tags deleted bool } -func (s series) Name() []byte { return s.name } -func (s series) Tags() models.Tags { return s.tags } -func (s series) Deleted() bool { return s.deleted } -func (s series) Expr() influxql.Expr { return nil } +func (s seriesElement) Name() []byte { return s.name } +func (s seriesElement) Tags() models.Tags { return s.tags } +func (s seriesElement) Deleted() bool { return s.deleted } +func (s seriesElement) Expr() influxql.Expr { return nil } func (itr *seriesIterator) Next() tsdb.SeriesElem { if len(itr.keys) == 0 { diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 7481097a82a..cd50939ff1c 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -20,43 +20,43 @@ import ( // contains in memory structures for indexing tags. Exported functions are // goroutine safe while un-exported functions assume the caller will use the // appropriate locks. -type Measurement struct { - database string - Name string `json:"name,omitempty"` - name []byte // cached version as []byte +type measurement struct { + Database string + Name string `json:"name,omitempty"` + NameBytes []byte // cached version as []byte mu sync.RWMutex fieldNames map[string]struct{} // in-memory index fields - seriesByID map[uint64]*Series // lookup table for series by their id - seriesByTagKeyValue map[string]*TagKeyValue // map from tag key to value to sorted set of series ids + seriesByID map[uint64]*series // lookup table for series by their id + seriesByTagKeyValue map[string]*tagKeyValue // map from tag key to value to sorted set of series ids // lazyily created sorted series IDs - sortedSeriesIDs SeriesIDs // sorted list of series IDs in this measurement + sortedSeriesIDs seriesIDs // sorted list of series IDs in this measurement // Indicates whether the seriesByTagKeyValueMap needs to be rebuilt as it contains deleted series // that waste memory. dirty bool } -// NewMeasurement allocates and initializes a new Measurement. -func NewMeasurement(database, name string) *Measurement { - return &Measurement{ - database: database, - Name: name, - name: []byte(name), - fieldNames: make(map[string]struct{}), +// newMeasurement allocates and initializes a new Measurement. +func newMeasurement(database, name string) *measurement { + return &measurement{ + Database: database, + Name: name, + NameBytes: []byte(name), - seriesByID: make(map[uint64]*Series), - seriesByTagKeyValue: make(map[string]*TagKeyValue), + fieldNames: make(map[string]struct{}), + seriesByID: make(map[uint64]*series), + seriesByTagKeyValue: make(map[string]*tagKeyValue), } } // Authorized determines if this Measurement is authorized to be read, according // to the provided Authorizer. A measurement is authorized to be read if at // least one undeleted series from the measurement is authorized to be read. -func (m *Measurement) Authorized(auth query.Authorizer) bool { +func (m *measurement) Authorized(auth query.Authorizer) bool { // Note(edd): the cost of this check scales linearly with the number of series // belonging to a measurement, which means it may become expensive when there // are large numbers of series on a measurement. @@ -68,14 +68,14 @@ func (m *Measurement) Authorized(auth query.Authorizer) bool { continue } - if auth == nil || auth.AuthorizeSeriesRead(m.database, m.name, s.tags) { + if auth == nil || auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { return true } } return false } -func (m *Measurement) HasField(name string) bool { +func (m *measurement) HasField(name string) bool { m.mu.RLock() _, hasField := m.fieldNames[name] m.mu.RUnlock() @@ -83,24 +83,24 @@ func (m *Measurement) HasField(name string) bool { } // SeriesByID returns a series by identifier. -func (m *Measurement) SeriesByID(id uint64) *Series { +func (m *measurement) SeriesByID(id uint64) *series { m.mu.RLock() defer m.mu.RUnlock() return m.seriesByID[id] } // SeriesByIDMap returns the internal seriesByID map. -func (m *Measurement) SeriesByIDMap() map[uint64]*Series { +func (m *measurement) SeriesByIDMap() map[uint64]*series { m.mu.RLock() defer m.mu.RUnlock() return m.seriesByID } // SeriesByIDSlice returns a list of series by identifiers. -func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { +func (m *measurement) SeriesByIDSlice(ids []uint64) []*series { m.mu.RLock() defer m.mu.RUnlock() - a := make([]*Series, len(ids)) + a := make([]*series, len(ids)) for i, id := range ids { a[i] = m.seriesByID[id] } @@ -108,7 +108,7 @@ func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { } // AppendSeriesKeysByID appends keys for a list of series ids to a buffer. -func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { +func (m *measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string { m.mu.RLock() defer m.mu.RUnlock() for _, id := range ids { @@ -120,7 +120,7 @@ func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string } // SeriesKeysByID returns the a list of keys for a set of ids. -func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte { +func (m *measurement) SeriesKeysByID(ids seriesIDs) [][]byte { m.mu.RLock() defer m.mu.RUnlock() keys := make([][]byte, 0, len(ids)) @@ -140,7 +140,7 @@ func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte { } // SeriesKeys returns the keys of every series in this measurement -func (m *Measurement) SeriesKeys() [][]byte { +func (m *measurement) SeriesKeys() [][]byte { m.mu.RLock() defer m.mu.RUnlock() keys := make([][]byte, 0, len(m.seriesByID)) @@ -158,7 +158,7 @@ func (m *Measurement) SeriesKeys() [][]byte { return keys } -func (m *Measurement) SeriesIDs() SeriesIDs { +func (m *measurement) SeriesIDs() seriesIDs { m.mu.RLock() if len(m.sortedSeriesIDs) == len(m.seriesByID) { s := m.sortedSeriesIDs @@ -176,7 +176,7 @@ func (m *Measurement) SeriesIDs() SeriesIDs { m.sortedSeriesIDs = m.sortedSeriesIDs[:0] if cap(m.sortedSeriesIDs) < len(m.seriesByID) { - m.sortedSeriesIDs = make(SeriesIDs, 0, len(m.seriesByID)) + m.sortedSeriesIDs = make(seriesIDs, 0, len(m.seriesByID)) } for k, v := range m.seriesByID { @@ -192,28 +192,28 @@ func (m *Measurement) SeriesIDs() SeriesIDs { } // HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key -func (m *Measurement) HasTagKey(k string) bool { +func (m *measurement) HasTagKey(k string) bool { m.mu.RLock() defer m.mu.RUnlock() _, hasTag := m.seriesByTagKeyValue[k] return hasTag } -func (m *Measurement) HasTagKeyValue(k, v []byte) bool { +func (m *measurement) HasTagKeyValue(k, v []byte) bool { m.mu.RLock() defer m.mu.RUnlock() return m.seriesByTagKeyValue[string(k)].Contains(string(v)) } // HasSeries returns true if there is at least 1 series under this measurement. -func (m *Measurement) HasSeries() bool { +func (m *measurement) HasSeries() bool { m.mu.RLock() defer m.mu.RUnlock() return len(m.seriesByID) > 0 } // Cardinality returns the number of values associated with the given tag key. -func (m *Measurement) Cardinality(key string) int { +func (m *measurement) Cardinality(key string) int { var n int m.mu.RLock() n = m.cardinality(key) @@ -221,12 +221,12 @@ func (m *Measurement) Cardinality(key string) int { return n } -func (m *Measurement) cardinality(key string) int { +func (m *measurement) cardinality(key string) int { return m.seriesByTagKeyValue[key].Cardinality() } // CardinalityBytes returns the number of values associated with the given tag key. -func (m *Measurement) CardinalityBytes(key []byte) int { +func (m *measurement) CardinalityBytes(key []byte) int { m.mu.RLock() defer m.mu.RUnlock() return m.seriesByTagKeyValue[string(key)].Cardinality() @@ -234,7 +234,7 @@ func (m *Measurement) CardinalityBytes(key []byte) int { // AddSeries adds a series to the measurement's index. // It returns true if the series was added successfully or false if the series was already present. -func (m *Measurement) AddSeries(s *Series) bool { +func (m *measurement) AddSeries(s *series) bool { if s == nil { return false } @@ -260,10 +260,10 @@ func (m *Measurement) AddSeries(s *Series) bool { } // add this series id to the tag index on the measurement - s.ForEachTag(func(t models.Tag) { + for _, t := range s.Tags { valueMap := m.seriesByTagKeyValue[string(t.Key)] if valueMap == nil { - valueMap = NewTagKeyValue() + valueMap = newTagKeyValue() m.seriesByTagKeyValue[string(t.Key)] = valueMap } ids := valueMap.LoadByte(t.Value) @@ -274,14 +274,14 @@ func (m *Measurement) AddSeries(s *Series) bool { if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { sort.Sort(ids) } - valueMap.Store(string(t.Value), ids) - }) + valueMap.StoreByte(t.Value, ids) + } return true } // DropSeries removes a series from the measurement's index. -func (m *Measurement) DropSeries(series *Series) { +func (m *measurement) DropSeries(series *series) { seriesID := series.ID m.mu.Lock() defer m.mu.Unlock() @@ -299,7 +299,7 @@ func (m *Measurement) DropSeries(series *Series) { m.dirty = true } -func (m *Measurement) Rebuild() *Measurement { +func (m *measurement) Rebuild() *measurement { m.mu.RLock() // Nothing needs to be rebuilt. @@ -309,7 +309,7 @@ func (m *Measurement) Rebuild() *Measurement { } // Create a new measurement from the state of the existing measurement - nm := NewMeasurement(m.database, string(m.name)) + nm := newMeasurement(m.Database, string(m.NameBytes)) nm.fieldNames = m.fieldNames m.mu.RUnlock() @@ -339,7 +339,7 @@ func (m *Measurement) Rebuild() *Measurement { // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each -func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { +func (m *measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { if condition == nil { return m.SeriesIDs(), nil, nil } @@ -347,7 +347,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf } // ForEachSeriesByExpr iterates over all series filtered by condition. -func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error { +func (m *measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error { // Retrieve matching series ids. ids, _, err := m.filters(condition) if err != nil { @@ -357,7 +357,7 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags // Iterate over each series. for _, id := range ids { s := m.SeriesByID(id) - if err := fn(s.Tags()); err != nil { + if err := fn(s.Tags); err != nil { return err } } @@ -373,7 +373,7 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags // This will also populate the TagSet objects with the series IDs that match each tagset and any // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) { +func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) { // get the unique set of series ids and the filters that should be applied to each ids, filters, err := m.filters(opt.Condition) if err != nil { @@ -412,13 +412,13 @@ func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que continue } - if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.database, m.name, s.Tags()) { + if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { continue } var tagsAsKey []byte if len(dims) > 0 { - tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags()) + tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags) } tagSet := tagSets[string(tagsAsKey)] @@ -461,7 +461,7 @@ func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que } // intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. -func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { +func intersectSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) { // We only want to allocate a slice and map of the smaller size. var ids []uint64 if len(lids) > len(rids) { @@ -515,7 +515,7 @@ func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs } // unionSeriesFilters performs a union for two sets of ids and filter expressions. -func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { +func unionSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) { ids := make([]uint64, 0, len(lids)+len(rids)) // Setup the filters with the smallest size since we will discard filters @@ -618,14 +618,14 @@ func (m *Measurement) SeriesIDsByTagValue(key, value []byte) SeriesIDs { } // IDsForExpr returns the series IDs that are candidates to match the given expression. -func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs { +func (m *measurement) IDsForExpr(n *influxql.BinaryExpr) seriesIDs { ids, _, _ := m.idsForExpr(n) return ids } // idsForExpr returns a collection of series ids and a filter expression that should // be used to filter points from those series. -func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) { +func (m *measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Expr, error) { // If this binary expression has another binary expression, then this // is some expression math and we should just pass it to the underlying query. if _, ok := n.LHS.(*influxql.BinaryExpr); ok { @@ -661,7 +661,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex // if we're looking for series with a specific tag value if str, ok := value.(*influxql.StringLiteral); ok { - var ids SeriesIDs + var ids seriesIDs // Special handling for "_name" to match measurement name. if name.Val == "_name" { @@ -677,22 +677,22 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex ids = tagVals.Load(str.Val) } else { // Make a copy of all series ids and mark the ones we need to evict. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) + sIDs := newEvictSeriesIDs(m.SeriesIDs()) // Go through each slice and mark the values we find as zero so // they can be removed later. - tagVals.RangeAll(func(_ string, a SeriesIDs) { - seriesIDs.mark(a) + tagVals.RangeAll(func(_ string, a seriesIDs) { + sIDs.mark(a) }) // Make a new slice with only the remaining ids. - ids = seriesIDs.evict() + ids = sIDs.evict() } } else if n.Op == influxql.NEQ { if str.Val != "" { ids = m.SeriesIDs().Reject(tagVals.Load(str.Val)) } else { - tagVals.RangeAll(func(_ string, a SeriesIDs) { + tagVals.RangeAll(func(_ string, a seriesIDs) { ids = append(ids, a...) }) sort.Sort(ids) @@ -703,7 +703,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex // if we're looking for series with a tag value that matches a regex if re, ok := value.(*influxql.RegexLiteral); ok { - var ids SeriesIDs + var ids seriesIDs // Special handling for "_name" to match measurement name. if name.Val == "_name" { @@ -723,24 +723,24 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex // If we should not include the empty string, include series that match our condition. if empty && n.Op == influxql.EQREGEX { // See comments above for EQ with a StringLiteral. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) - tagVals.RangeAll(func(k string, a SeriesIDs) { + sIDs := newEvictSeriesIDs(m.SeriesIDs()) + tagVals.RangeAll(func(k string, a seriesIDs) { if !re.Val.MatchString(k) { - seriesIDs.mark(a) + sIDs.mark(a) } }) - ids = seriesIDs.evict() + ids = sIDs.evict() } else if empty && n.Op == influxql.NEQREGEX { - ids = make(SeriesIDs, 0, len(m.SeriesIDs())) - tagVals.RangeAll(func(k string, a SeriesIDs) { + ids = make(seriesIDs, 0, len(m.SeriesIDs())) + tagVals.RangeAll(func(k string, a seriesIDs) { if !re.Val.MatchString(k) { ids = append(ids, a...) } }) sort.Sort(ids) } else if !empty && n.Op == influxql.EQREGEX { - ids = make(SeriesIDs, 0, len(m.SeriesIDs())) - tagVals.RangeAll(func(k string, a SeriesIDs) { + ids = make(seriesIDs, 0, len(m.SeriesIDs())) + tagVals.RangeAll(func(k string, a seriesIDs) { if re.Val.MatchString(k) { ids = append(ids, a...) } @@ -748,27 +748,27 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex sort.Sort(ids) } else if !empty && n.Op == influxql.NEQREGEX { // See comments above for EQ with a StringLiteral. - seriesIDs := newEvictSeriesIDs(m.SeriesIDs()) - tagVals.RangeAll(func(k string, a SeriesIDs) { + sIDs := newEvictSeriesIDs(m.SeriesIDs()) + tagVals.RangeAll(func(k string, a seriesIDs) { if re.Val.MatchString(k) { - seriesIDs.mark(a) + sIDs.mark(a) } }) - ids = seriesIDs.evict() + ids = sIDs.evict() } return ids, nil, nil } // compare tag values if ref, ok := value.(*influxql.VarRef); ok { - var ids SeriesIDs + var ids seriesIDs if n.Op == influxql.NEQ { ids = m.SeriesIDs() } rhsTagVals := m.seriesByTagKeyValue[ref.Val] - tagVals.RangeAll(func(k string, a SeriesIDs) { + tagVals.RangeAll(func(k string, a seriesIDs) { tags := a.Intersect(rhsTagVals.Load(k)) if n.Op == influxql.EQ { ids = ids.Union(tags) @@ -808,7 +808,7 @@ func (fe FilterExprs) Len() int { // WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and // a map from those series IDs to filter expressions that should be used to limit points returned in // the final query result. -func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { +func (m *measurement) WalkWhereForSeriesIds(expr influxql.Expr) (seriesIDs, FilterExprs, error) { switch n := expr.(type) { case *influxql.BinaryExpr: switch n.Op { @@ -872,7 +872,7 @@ func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, Filt // expandExpr returns a list of expressions expanded by all possible tag // combinations. -func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { +func (m *measurement) expandExpr(expr influxql.Expr) []tagSetExpr { // Retrieve list of unique values for each tag. valuesByTagKey := m.uniqueTagValues(expr) @@ -930,7 +930,7 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, // SeriesIDsAllOrByExpr walks an expressions for matching series IDs // or, if no expressions is given, returns all series IDs for the measurement. -func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) { +func (m *measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error) { // If no expression given or the measurement has no series, // we can take just return the ids or nil accordingly. if expr == nil { @@ -954,7 +954,7 @@ func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error } // tagKeysByExpr extracts the tag keys wanted by the expression. -func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { +func (m *measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) { if expr == nil { set := make(map[string]struct{}) for _, key := range m.TagKeys() { @@ -1022,7 +1022,7 @@ func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, er } // tagKeysByFilter will filter the tag keys for the measurement. -func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { +func (m *measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { ss := newStringSet() for _, key := range m.TagKeys() { var matched bool @@ -1081,7 +1081,7 @@ func copyTagExprs(a []tagExpr) []tagExpr { } // uniqueTagValues returns a list of unique tag values used in an expression. -func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { +func (m *measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { // Track unique value per tag. tags := make(map[string]map[string]struct{}) @@ -1131,18 +1131,18 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { } // Measurements represents a list of *Measurement. -type Measurements []*Measurement +type measurements []*measurement // Len implements sort.Interface. -func (a Measurements) Len() int { return len(a) } +func (a measurements) Len() int { return len(a) } // Less implements sort.Interface. -func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } // Swap implements sort.Interface. -func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a Measurements) Intersect(other Measurements) Measurements { +func (a measurements) Intersect(other measurements) measurements { l := a r := other @@ -1156,7 +1156,7 @@ func (a Measurements) Intersect(other Measurements) Measurements { // That is, don't run comparisons against lower values that we've already passed var i, j int - result := make(Measurements, 0, len(l)) + result := make(measurements, 0, len(l)) for i < len(l) && j < len(r) { if l[i].Name == r[j].Name { result = append(result, l[i]) @@ -1172,8 +1172,8 @@ func (a Measurements) Intersect(other Measurements) Measurements { return result } -func (a Measurements) Union(other Measurements) Measurements { - result := make(Measurements, 0, len(a)+len(other)) +func (a measurements) Union(other measurements) measurements { + result := make(measurements, 0, len(a)+len(other)) var i, j int for i < len(a) && j < len(other) { if a[i].Name == other[j].Name { @@ -1199,15 +1199,17 @@ func (a Measurements) Union(other Measurements) Measurements { return result } -// Series belong to a Measurement and represent unique time series in a database. -type Series struct { - mu sync.RWMutex - Key string - tags models.Tags +// series belong to a Measurement and represent unique time series in a database. +type series struct { + // immutable ID uint64 - measurement *Measurement - shardIDs map[uint64]struct{} // shards that have this series defined - deleted bool + Measurement *measurement + Key string + Tags models.Tags + + mu sync.RWMutex + shardIDs map[uint64]struct{} // shards that have this series defined + deleted bool // lastModified tracks the last time the series was created. If the series // already exists and a request to create is received (a no-op), lastModified @@ -1215,17 +1217,25 @@ type Series struct { lastModified int64 } -// NewSeries returns an initialized series struct -func NewSeries(key []byte, tags models.Tags) *Series { - return &Series{ - Key: string(key), - tags: tags, +// newSeries returns an initialized series struct +func newSeries(id uint64, m *measurement, key string, tags models.Tags) *series { + return &series{ + ID: id, + Measurement: m, + Key: key, + Tags: tags, shardIDs: make(map[uint64]struct{}), lastModified: time.Now().UTC().UnixNano(), } } -func (s *Series) AssignShard(shardID uint64, ts int64) { +func (s *series) InitializeShard(shardID uint64) { + s.mu.Lock() + s.shardIDs[shardID] = struct{}{} + s.mu.Unlock() +} + +func (s *series) AssignShard(shardID uint64, ts int64) { atomic.StoreInt64(&s.lastModified, ts) if s.Assigned(shardID) { return @@ -1239,7 +1249,7 @@ func (s *Series) AssignShard(shardID uint64, ts int64) { s.mu.Unlock() } -func (s *Series) UnassignShard(shardID uint64, ts int64) { +func (s *series) UnassignShard(shardID uint64, ts int64) { s.mu.Lock() if s.LastModified() < ts { delete(s.shardIDs, shardID) @@ -1247,66 +1257,26 @@ func (s *Series) UnassignShard(shardID uint64, ts int64) { s.mu.Unlock() } -func (s *Series) Assigned(shardID uint64) bool { +func (s *series) Assigned(shardID uint64) bool { s.mu.RLock() _, ok := s.shardIDs[shardID] s.mu.RUnlock() return ok } -func (s *Series) LastModified() int64 { +func (s *series) LastModified() int64 { return atomic.LoadInt64(&s.lastModified) } -func (s *Series) ShardN() int { +func (s *series) ShardN() int { s.mu.RLock() n := len(s.shardIDs) s.mu.RUnlock() return n } -// Measurement returns the measurement on the series. -func (s *Series) Measurement() *Measurement { - return s.measurement -} - -// SetMeasurement sets the measurement on the series. -func (s *Series) SetMeasurement(m *Measurement) { - s.measurement = m -} - -// ForEachTag executes fn for every tag. Iteration occurs under lock. -func (s *Series) ForEachTag(fn func(models.Tag)) { - s.mu.RLock() - defer s.mu.RUnlock() - for _, t := range s.tags { - fn(t) - } -} - -// Tags returns a copy of the tags under lock. -func (s *Series) Tags() models.Tags { - s.mu.RLock() - defer s.mu.RUnlock() - return s.tags -} - -// CopyTags clones the tags on the series in-place, -func (s *Series) CopyTags() { - s.mu.Lock() - defer s.mu.Unlock() - s.tags = s.tags.Clone() -} - -// GetTagString returns a tag value under lock. -func (s *Series) GetTagString(key string) string { - s.mu.RLock() - defer s.mu.RUnlock() - return s.tags.GetString(key) -} - // Delete marks this series as deleted. A deleted series should not be returned for queries. -func (s *Series) Delete(ts int64) { +func (s *series) Delete(ts int64) { s.mu.Lock() if s.LastModified() < ts { s.deleted = true @@ -1315,7 +1285,7 @@ func (s *Series) Delete(ts int64) { } // Deleted indicates if this was previously deleted. -func (s *Series) Deleted() bool { +func (s *series) Deleted() bool { s.mu.RLock() v := s.deleted s.mu.RUnlock() @@ -1326,18 +1296,18 @@ func (s *Series) Deleted() bool { // ids mapping to a set of tag values. // // TODO(edd): This could possibly be replaced by a sync.Map once we use Go 1.9. -type TagKeyValue struct { +type tagKeyValue struct { mu sync.RWMutex - valueIDs map[string]SeriesIDs + valueIDs map[string]seriesIDs } // NewTagKeyValue initialises a new TagKeyValue. -func NewTagKeyValue() *TagKeyValue { - return &TagKeyValue{valueIDs: make(map[string]SeriesIDs)} +func newTagKeyValue() *tagKeyValue { + return &tagKeyValue{valueIDs: make(map[string]seriesIDs)} } // Cardinality returns the number of values in the TagKeyValue. -func (t *TagKeyValue) Cardinality() int { +func (t *tagKeyValue) Cardinality() int { if t == nil { return 0 } @@ -1348,7 +1318,7 @@ func (t *TagKeyValue) Cardinality() int { } // Contains returns true if the TagKeyValue contains value. -func (t *TagKeyValue) Contains(value string) bool { +func (t *tagKeyValue) Contains(value string) bool { if t == nil { return false } @@ -1360,33 +1330,35 @@ func (t *TagKeyValue) Contains(value string) bool { } // Load returns the SeriesIDs for the provided tag value. -func (t *TagKeyValue) Load(value string) SeriesIDs { +func (t *tagKeyValue) Load(value string) seriesIDs { if t == nil { return nil } t.mu.RLock() - defer t.mu.RUnlock() - return t.valueIDs[value] + sIDs := t.valueIDs[value] + t.mu.RUnlock() + return sIDs } // LoadByte returns the SeriesIDs for the provided tag value. It makes use of // Go's compiler optimisation for avoiding a copy when accessing maps with a []byte. -func (t *TagKeyValue) LoadByte(value []byte) SeriesIDs { +func (t *tagKeyValue) LoadByte(value []byte) seriesIDs { if t == nil { return nil } t.mu.RLock() - defer t.mu.RUnlock() - return t.valueIDs[string(value)] + sIDs := t.valueIDs[string(value)] + t.mu.RUnlock() + return sIDs } // Range calls f sequentially on each key and value. A call to Range on a nil // TagKeyValue is a no-op. // // If f returns false then iteration over any remaining keys or values will cease. -func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) { +func (t *tagKeyValue) Range(f func(tagValue string, a seriesIDs) bool) { if t == nil { return } @@ -1402,35 +1374,42 @@ func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) { // RangeAll calls f sequentially on each key and value. A call to RangeAll on a // nil TagKeyValue is a no-op. -func (t *TagKeyValue) RangeAll(f func(k string, a SeriesIDs)) { - t.Range(func(k string, a SeriesIDs) bool { +func (t *tagKeyValue) RangeAll(f func(k string, a seriesIDs)) { + t.Range(func(k string, a seriesIDs) bool { f(k, a) return true }) } // Store stores ids under the value key. -func (t *TagKeyValue) Store(value string, ids SeriesIDs) { +func (t *tagKeyValue) Store(value string, ids seriesIDs) { t.mu.Lock() - defer t.mu.Unlock() t.valueIDs[value] = ids + t.mu.Unlock() +} + +// StoreByte stores ids under the value key. +func (t *tagKeyValue) StoreByte(value []byte, ids seriesIDs) { + t.mu.Lock() + t.valueIDs[string(value)] = ids + t.mu.Unlock() } // SeriesIDs is a convenience type for sorting, checking equality, and doing // union and intersection of collections of series ids. -type SeriesIDs []uint64 +type seriesIDs []uint64 // Len implements sort.Interface. -func (a SeriesIDs) Len() int { return len(a) } +func (a seriesIDs) Len() int { return len(a) } // Less implements sort.Interface. -func (a SeriesIDs) Less(i, j int) bool { return a[i] < a[j] } +func (a seriesIDs) Less(i, j int) bool { return a[i] < a[j] } // Swap implements sort.Interface. -func (a SeriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a seriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // Equals assumes that both are sorted. -func (a SeriesIDs) Equals(other SeriesIDs) bool { +func (a seriesIDs) Equals(other seriesIDs) bool { if len(a) != len(other) { return false } @@ -1444,7 +1423,7 @@ func (a SeriesIDs) Equals(other SeriesIDs) bool { // Intersect returns a new collection of series ids in sorted order that is the intersection of the two. // The two collections must already be sorted. -func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs { +func (a seriesIDs) Intersect(other seriesIDs) seriesIDs { l := a r := other @@ -1471,12 +1450,12 @@ func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs { } } - return SeriesIDs(ids) + return seriesIDs(ids) } // Union returns a new collection of series ids in sorted order that is the union of the two. // The two collections must already be sorted. -func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs { +func (a seriesIDs) Union(other seriesIDs) seriesIDs { l := a r := other ids := make([]uint64, 0, len(l)+len(r)) @@ -1507,7 +1486,7 @@ func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs { // Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. // This is useful for the NOT operator. The two collections must already be sorted. -func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs { +func (a seriesIDs) Reject(other seriesIDs) seriesIDs { l := a r := other var i, j int @@ -1530,7 +1509,7 @@ func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs { ids = append(ids, l[i:]...) } - return SeriesIDs(ids) + return seriesIDs(ids) } // seriesID is a series id that may or may not have been evicted from the @@ -1563,9 +1542,9 @@ func newEvictSeriesIDs(ids []uint64) evictSeriesIDs { // mark marks all of the ids in the sorted slice to be evicted from the list of // series ids. If an id to be evicted does not exist, it just gets ignored. func (a *evictSeriesIDs) mark(ids []uint64) { - seriesIDs := a.ids + sIDs := a.ids for _, id := range ids { - if len(seriesIDs) == 0 { + if len(sIDs) == 0 { break } @@ -1573,29 +1552,29 @@ func (a *evictSeriesIDs) mark(ids []uint64) { // the first element does not match the value we're // looking for. i := 0 - if seriesIDs[0].val < id { - i = sort.Search(len(seriesIDs), func(i int) bool { - return seriesIDs[i].val >= id + if sIDs[0].val < id { + i = sort.Search(len(sIDs), func(i int) bool { + return sIDs[i].val >= id }) } - if i >= len(seriesIDs) { + if i >= len(sIDs) { break - } else if seriesIDs[i].val == id { - if !seriesIDs[i].evict { - seriesIDs[i].evict = true + } else if sIDs[i].val == id { + if !sIDs[i].evict { + sIDs[i].evict = true a.sz-- } // Skip over this series since it has been evicted and won't be // encountered again. i++ } - seriesIDs = seriesIDs[i:] + sIDs = sIDs[i:] } } // evict creates a new slice with only the series that have not been evicted. -func (a *evictSeriesIDs) evict() (ids SeriesIDs) { +func (a *evictSeriesIDs) evict() (ids seriesIDs) { if a.sz == 0 { return ids } @@ -1621,7 +1600,7 @@ type TagFilter struct { // WalkTagKeys calls fn for each tag key associated with m. The order of the // keys is undefined. -func (m *Measurement) WalkTagKeys(fn func(k string)) { +func (m *measurement) WalkTagKeys(fn func(k string)) { m.mu.RLock() defer m.mu.RUnlock() @@ -1631,7 +1610,7 @@ func (m *Measurement) WalkTagKeys(fn func(k string)) { } // TagKeys returns a list of the measurement's tag names, in sorted order. -func (m *Measurement) TagKeys() []string { +func (m *measurement) TagKeys() []string { m.mu.RLock() keys := make([]string, 0, len(m.seriesByTagKeyValue)) for k := range m.seriesByTagKeyValue { @@ -1643,12 +1622,12 @@ func (m *Measurement) TagKeys() []string { } // TagValues returns all the values for the given tag key, in an arbitrary order. -func (m *Measurement) TagValues(auth query.Authorizer, key string) []string { +func (m *measurement) TagValues(auth query.Authorizer, key string) []string { m.mu.RLock() defer m.mu.RUnlock() values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality()) - m.seriesByTagKeyValue[key].RangeAll(func(k string, a SeriesIDs) { + m.seriesByTagKeyValue[key].RangeAll(func(k string, a seriesIDs) { if auth == nil { values = append(values, k) } else { @@ -1657,7 +1636,7 @@ func (m *Measurement) TagValues(auth query.Authorizer, key string) []string { if s == nil { continue } - if auth.AuthorizeSeriesRead(m.database, m.name, s.Tags()) { + if auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) { values = append(values, k) return } @@ -1668,7 +1647,7 @@ func (m *Measurement) TagValues(auth query.Authorizer, key string) []string { } // SetFieldName adds the field name to the measurement. -func (m *Measurement) SetFieldName(name string) { +func (m *measurement) SetFieldName(name string) { m.mu.RLock() _, ok := m.fieldNames[name] m.mu.RUnlock() @@ -1683,7 +1662,7 @@ func (m *Measurement) SetFieldName(name string) { } // FieldNames returns a list of the measurement's field names, in an arbitrary order. -func (m *Measurement) FieldNames() []string { +func (m *measurement) FieldNames() []string { m.mu.RLock() defer m.mu.RUnlock() @@ -1695,7 +1674,7 @@ func (m *Measurement) FieldNames() []string { } // SeriesByTagKeyValue returns the TagKeyValue for the provided tag key. -func (m *Measurement) SeriesByTagKeyValue(key string) *TagKeyValue { +func (m *measurement) SeriesByTagKeyValue(key string) *tagKeyValue { m.mu.RLock() defer m.mu.RUnlock() return m.seriesByTagKeyValue[key] diff --git a/tsdb/index/inmem/meta_test.go b/tsdb/index/inmem/meta_test.go index 69714ed85dc..910d4b16a74 100644 --- a/tsdb/index/inmem/meta_test.go +++ b/tsdb/index/inmem/meta_test.go @@ -1,4 +1,4 @@ -package inmem_test +package inmem import ( "fmt" @@ -8,15 +8,14 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" - "github.com/influxdata/influxdb/tsdb/index/inmem" "github.com/influxdata/influxql" ) // Test comparing SeriesIDs for equality. func TestSeriesIDs_Equals(t *testing.T) { - ids1 := inmem.SeriesIDs([]uint64{1, 2, 3}) - ids2 := inmem.SeriesIDs([]uint64{1, 2, 3}) - ids3 := inmem.SeriesIDs([]uint64{4, 5, 6}) + ids1 := seriesIDs([]uint64{1, 2, 3}) + ids2 := seriesIDs([]uint64{1, 2, 3}) + ids3 := seriesIDs([]uint64{4, 5, 6}) if !ids1.Equals(ids2) { t.Fatal("expected ids1 == ids2") @@ -27,10 +26,10 @@ func TestSeriesIDs_Equals(t *testing.T) { // Test intersecting sets of SeriesIDs. func TestSeriesIDs_Intersect(t *testing.T) { - // Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)' - ids1 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - ids2 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) - exp := inmem.SeriesIDs([]uint64{1, 3}) + // Test swapping l & r, all branches of if-else, and exit loop when 'j < len(r)' + ids1 := seriesIDs([]uint64{1, 3, 4, 5, 6}) + ids2 := seriesIDs([]uint64{1, 2, 3, 7}) + exp := seriesIDs([]uint64{1, 3}) got := ids1.Intersect(ids2) if !exp.Equals(got) { @@ -38,9 +37,9 @@ func TestSeriesIDs_Intersect(t *testing.T) { } // Test exit for loop when 'i < len(l)' - ids1 = inmem.SeriesIDs([]uint64{1}) - ids2 = inmem.SeriesIDs([]uint64{1, 2}) - exp = inmem.SeriesIDs([]uint64{1}) + ids1 = seriesIDs([]uint64{1}) + ids2 = seriesIDs([]uint64{1, 2}) + exp = seriesIDs([]uint64{1}) got = ids1.Intersect(ids2) if !exp.Equals(got) { @@ -51,9 +50,9 @@ func TestSeriesIDs_Intersect(t *testing.T) { // Test union sets of SeriesIDs. func TestSeriesIDs_Union(t *testing.T) { // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. - ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) - ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - exp := inmem.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7}) + ids1 := seriesIDs([]uint64{1, 2, 3, 7}) + ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6}) + exp := seriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7}) got := ids1.Union(ids2) if !exp.Equals(got) { @@ -61,9 +60,9 @@ func TestSeriesIDs_Union(t *testing.T) { } // Test exit because of 'i < len(l)' and append remainder from right. - ids1 = inmem.SeriesIDs([]uint64{1}) - ids2 = inmem.SeriesIDs([]uint64{1, 2}) - exp = inmem.SeriesIDs([]uint64{1, 2}) + ids1 = seriesIDs([]uint64{1}) + ids2 = seriesIDs([]uint64{1, 2}) + exp = seriesIDs([]uint64{1, 2}) got = ids1.Union(ids2) if !exp.Equals(got) { @@ -74,9 +73,9 @@ func TestSeriesIDs_Union(t *testing.T) { // Test removing one set of SeriesIDs from another. func TestSeriesIDs_Reject(t *testing.T) { // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. - ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7}) - ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6}) - exp := inmem.SeriesIDs([]uint64{2, 7}) + ids1 := seriesIDs([]uint64{1, 2, 3, 7}) + ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6}) + exp := seriesIDs([]uint64{2, 7}) got := ids1.Reject(ids2) if !exp.Equals(got) { @@ -84,9 +83,9 @@ func TestSeriesIDs_Reject(t *testing.T) { } // Test exit because of 'i < len(l)'. - ids1 = inmem.SeriesIDs([]uint64{1}) - ids2 = inmem.SeriesIDs([]uint64{1, 2}) - exp = inmem.SeriesIDs{} + ids1 = seriesIDs([]uint64{1}) + ids2 = seriesIDs([]uint64{1, 2}) + exp = seriesIDs{} got = ids1.Reject(ids2) if !exp.Equals(got) { @@ -95,14 +94,14 @@ func TestSeriesIDs_Reject(t *testing.T) { } func TestMeasurement_AddSeries_Nil(t *testing.T) { - m := inmem.NewMeasurement("foo", "cpu") + m := newMeasurement("foo", "cpu") if m.AddSeries(nil) { t.Fatalf("AddSeries mismatch: exp false, got true") } } func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { - m := inmem.NewMeasurement("foo", "cpu") + m := newMeasurement("foo", "cpu") var dst []string dst = m.AppendSeriesKeysByID(dst, []uint64{1}) if exp, got := 0, len(dst); exp != got { @@ -111,9 +110,8 @@ func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { } func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) { - m := inmem.NewMeasurement("foo", "cpu") - s := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) - s.ID = 1 + m := newMeasurement("foo", "cpu") + s := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) m.AddSeries(s) var dst []string @@ -128,13 +126,11 @@ func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) { } func TestMeasurement_TagsSet_Deadlock(t *testing.T) { - m := inmem.NewMeasurement("foo", "cpu") - s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) - s1.ID = 1 + m := newMeasurement("foo", "cpu") + s1 := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) m.AddSeries(s1) - s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) - s2.ID = 2 + s2 := newSeries(2, m, "cpu,host=bar", models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) m.AddSeries(s2) m.DropSeries(s1) @@ -147,12 +143,11 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) { } func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) { - m := inmem.NewMeasurement("foo", "cpu") + m := newMeasurement("foo", "cpu") for i := 0; i < 100000; i++ { - s := inmem.NewSeries([]byte("cpu"), models.Tags{models.NewTag( + s := newSeries(uint64(i), m, "cpu", models.Tags{models.NewTag( []byte("host"), []byte(fmt.Sprintf("host%d", i)))}) - s.ID = uint64(i) m.AddSeries(s) } @@ -178,12 +173,11 @@ func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) { } func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { - m := inmem.NewMeasurement("foo", "cpu") + m := newMeasurement("foo", "cpu") for i := 0; i < 100000; i++ { - s := inmem.NewSeries([]byte("cpu"), models.Tags{models.Tag{ + s := newSeries(uint64(i), m, "cpu", models.Tags{models.Tag{ Key: []byte("host"), Value: []byte(fmt.Sprintf("host%d", i))}}) - s.ID = uint64(i) m.AddSeries(s) } @@ -210,11 +204,10 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { } func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) { - m := inmem.NewMeasurement("foo", "m") + m := newMeasurement("foo", "m") for i := 0; i < n; i++ { tags := map[string]string{"tag1": "value1", "tag2": "value2"} - s := inmem.NewSeries([]byte(fmt.Sprintf("m,tag1=value1,tag2=value2")), models.NewTags(tags)) - s.ID = uint64(i) + s := newSeries(uint64(i), m, fmt.Sprintf("m,tag1=value1,tag2=value2"), models.NewTags(tags)) s.AssignShard(0, time.Now().UnixNano()) m.AddSeries(s) } diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index fff9fda46c7..72f83432174 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" - "github.com/influxdata/influxdb/tsdb/index/inmem" ) // Ensure tags can be marshaled into a byte slice. @@ -142,18 +141,20 @@ func benchmarkMakeTagsKey(b *testing.B, keyN int) { type TestSeries struct { Measurement string - Series *inmem.Series + Key string + Tags models.Tags } func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { measurements := genStrList("measurement", mCnt) tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets() - series := []*TestSeries{} + var series []*TestSeries for _, m := range measurements { for _, ts := range tagSets { series = append(series, &TestSeries{ Measurement: m, - Series: inmem.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)), + Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), + Tags: models.NewTags(ts), }) } } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 69e5ae5a8e3..226a87f1a83 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1732,7 +1732,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { points := []models.Point{} for _, s := range series { for val := 0.0; val < float64(pntCnt); val++ { - p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now()) + p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now()) points = append(points, p) } } @@ -1774,7 +1774,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt points := []models.Point{} for _, s := range series { for val := 0.0; val < float64(pntCnt); val++ { - p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now()) + p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now()) points = append(points, p) } } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 58202b969f9..1cc0a42e44f 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -540,7 +540,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { points := make([]models.Point, 0, len(series)) for _, s := range series { - points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now())) + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) } // Create requested number of shards in the store & write points across @@ -623,7 +623,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) { points := make([]models.Point, 0, len(series)) for _, s := range series { - points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now())) + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) } // Create requested number of shards in the store & write points across @@ -694,7 +694,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) { points := make([]models.Point, 0, len(series)) for _, s := range series { - points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now())) + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) } // Create requested number of shards in the store & write points. @@ -778,7 +778,7 @@ func testStoreCardinalityCompactions(store *Store) error { points := make([]models.Point, 0, len(series)) for _, s := range series { - points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now())) + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) } // Create requested number of shards in the store & write points across @@ -1331,7 +1331,7 @@ func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) points := []models.Point{} for _, s := range series { for val := 0.0; val < float64(pntCnt); val++ { - p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now()) + p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now()) points = append(points, p) } }