Skip to content

Commit

Permalink
Merge pull request #6190 from influxdata/jw-race
Browse files Browse the repository at this point in the history
Fix race on measurementFields
  • Loading branch information
joelegasse committed Apr 6, 2016
2 parents 734b14b + ca8b0ca commit 84f8dd7
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 151 deletions.
5 changes: 3 additions & 2 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ type Engine interface {
Close() error

SetLogOutput(io.Writer)
LoadMetadataIndex(shard *Shard, index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error
LoadMetadataIndex(shard *Shard, index *DatabaseIndex) error

CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
WritePoints(points []models.Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error
WritePoints(points []models.Point) error
DeleteSeries(keys []string) error
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields

// Format will return the format for the engine
Format() EngineFormat
Expand Down
52 changes: 32 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
}

e := &Engine{
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
measurementFields: make(map[string]*tsdb.MeasurementFields),

WAL: w,
Cache: cache,
Expand Down Expand Up @@ -110,13 +111,23 @@ func (e *Engine) Index() *tsdb.DatabaseIndex {
}

// MeasurementFields returns the measurement fields for a measurement.
func (e *Engine) MeasurementFields(name string) *tsdb.MeasurementFields {
func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
e.mu.RLock()
m := e.measurementFields[measurement]
e.mu.RUnlock()

if m != nil {
return m
}

e.mu.Lock()
defer e.mu.Unlock()
if e.measurementFields[name] == nil {
e.measurementFields[name] = &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
m = e.measurementFields[measurement]
if m == nil {
m = tsdb.NewMeasurementFields()
e.measurementFields[measurement] = m
}
return e.measurementFields[name]
e.mu.Unlock()
return m
}

// Format returns the format type of this engine
Expand Down Expand Up @@ -187,10 +198,9 @@ func (e *Engine) Close() error {
func (e *Engine) SetLogOutput(w io.Writer) {}

// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error {
// Save reference to index for iterator creation.
e.index = index
e.measurementFields = measurementFields

start := time.Now()

Expand All @@ -200,7 +210,7 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, me
return err
}

if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil {
if err := e.addToIndexFromKey(key, fieldType, index); err != nil {
return err
}
return nil
Expand All @@ -220,7 +230,7 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, me
continue
}

if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil {
if err := e.addToIndexFromKey(key, fieldType, index); err != nil {
return err
}
}
Expand Down Expand Up @@ -297,19 +307,17 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar

// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
seriesKey, field := seriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)

m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)

mf := measurementFields[measurement]
mf := e.measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
mf = tsdb.NewMeasurementFields()
e.measurementFields[measurement] = mf
}

if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
Expand All @@ -330,7 +338,7 @@ func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, inde

// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
func (e *Engine) WritePoints(points []models.Point) error {
values := map[string][]Value{}
for _, p := range points {
for k, v := range p.Fields() {
Expand Down Expand Up @@ -397,6 +405,10 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error {

// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
e.mu.Lock()
delete(e.measurementFields, name)
e.mu.Unlock()

return e.DeleteSeries(seriesKeys)
}

Expand Down Expand Up @@ -740,7 +752,7 @@ func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList,
return influxql.Unknown
}

f := mf.Fields[field]
f := mf.Field(field)
if f == nil {
return influxql.Unknown
}
Expand Down Expand Up @@ -904,7 +916,7 @@ func (e *Engine) buildCursor(measurement, seriesKey, field string, opt influxql.
}

// Find individual field.
f := mf.Fields[field]
f := mf.Field(field)
if f == nil {
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {

// Load metadata index.
index := tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, index); err != nil {
t.Fatal(err)
}

Expand All @@ -57,7 +57,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {

// Load metadata index.
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, index); err != nil {
t.Fatal(err)
}

Expand All @@ -71,7 +71,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Write a new point and ensure we can close and load index from TSM and WAL
if err := e.WritePoints([]models.Point{
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}, nil, nil); err != nil {
}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

Expand All @@ -82,7 +82,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {

// Load metadata index.
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, index); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -152,14 +152,14 @@ func TestEngine_Backup(t *testing.T) {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}

if err := e.WritePoints([]models.Point{p1}, nil, nil); err != nil {
if err := e.WritePoints([]models.Point{p1}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}

if err := e.WritePoints([]models.Point{p2}, nil, nil); err != nil {
if err := e.WritePoints([]models.Point{p2}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

Expand Down Expand Up @@ -189,7 +189,7 @@ func TestEngine_Backup(t *testing.T) {
// so this test won't work properly unless the file is at least a second past the last one
time.Sleep(time.Second)

if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
if err := e.WritePoints([]models.Point{p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func MustOpenEngine() *Engine {
if err := e.Open(); err != nil {
panic(err)
}
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db"), make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db")); err != nil {
panic(err)
}
return e
Expand Down Expand Up @@ -559,7 +559,7 @@ func (e *Engine) MustWriteSnapshot() {

// WritePointsString parses a string buffer and writes the points.
func (e *Engine) WritePointsString(buf ...string) error {
return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")), nil, nil)
return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")))
}

// MustParsePointsString parses points from a string. Panic on error.
Expand Down
33 changes: 23 additions & 10 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func NewDatabaseIndex(name string) *DatabaseIndex {
// Series returns a series by key.
func (d *DatabaseIndex) Series(key string) *Series {
d.mu.RLock()
defer d.mu.RUnlock()
return d.series[key]
s := d.series[key]
d.mu.RUnlock()
return s
}

// SeriesN returns the number of series.
Expand Down Expand Up @@ -146,7 +147,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
// and acquire the write lock
m = d.measurements[name]
if m == nil {
m = NewMeasurement(name, d)
m = NewMeasurement(name)
d.measurements[name] = m
d.statMap.Add(statDatabaseMeasurements, 1)
}
Expand Down Expand Up @@ -293,7 +294,9 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme
for _, m := range d.measurements {
// Iterate filters seeing if the measurement has a matching tag.
for _, f := range filters {
m.mu.RLock()
tagVals, ok := m.seriesByTagKeyValue[f.Key]
m.mu.RUnlock()
if !ok {
continue
}
Expand Down Expand Up @@ -352,10 +355,12 @@ func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements {

// Measurements returns a list of all measurements.
func (d *DatabaseIndex) Measurements() Measurements {
d.mu.RLock()
measurements := make(Measurements, 0, len(d.measurements))
for _, m := range d.measurements {
measurements = append(measurements, m)
}
d.mu.RUnlock()
return measurements
}

Expand Down Expand Up @@ -408,7 +413,6 @@ type Measurement struct {
mu sync.RWMutex
Name string `json:"name,omitempty"`
fieldNames map[string]struct{}
index *DatabaseIndex

// in-memory index fields
seriesByID map[uint64]*Series // lookup table for series by their id
Expand All @@ -418,11 +422,10 @@ type Measurement struct {
}

// NewMeasurement allocates and initializes a new Measurement.
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
func NewMeasurement(name string) *Measurement {
return &Measurement{
Name: name,
fieldNames: make(map[string]struct{}),
index: idx,

seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
Expand All @@ -433,7 +436,12 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
// HasField returns true if the measurement has a field by the given name
func (m *Measurement) HasField(name string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
hasField := m.hasField(name)
m.mu.RUnlock()
return hasField
}

func (m *Measurement) hasField(name string) bool {
_, hasField := m.fieldNames[name]
return hasField
}
Expand Down Expand Up @@ -606,8 +614,6 @@ func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.index.mu.RLock()
defer m.index.mu.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()

Expand Down Expand Up @@ -753,7 +759,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex

// For fields, return all series IDs from this measurement and return
// the expression passed in, as the filter.
if name.Val != "_name" && m.HasField(name.Val) {
if name.Val != "_name" && m.hasField(name.Val) {
return m.seriesIDs, n, nil
}

Expand Down Expand Up @@ -1314,6 +1320,13 @@ func (s *Series) AssignShard(shardID uint64) {
s.mu.Unlock()
}

func (s *Series) Assigned(shardID uint64) bool {
s.mu.RLock()
b := s.shardIDs[shardID]
s.mu.RUnlock()
return b
}

// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock()
Expand Down
Loading

0 comments on commit 84f8dd7

Please sign in to comment.