Skip to content

Commit

Permalink
Merge pull request #6168 from influxdata/jw-locks
Browse files Browse the repository at this point in the history
Fix write path lock contention
  • Loading branch information
jwilder committed Mar 31, 2016
2 parents 86385f9 + 07e3215 commit 319a2d9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- [#6121](https://github.com/influxdata/influxdb/issues/6121): Fix panic: slice index out of bounds in TSM index
- [#6140](https://github.com/influxdata/influxdb/issues/6140): Ensure Shard engine not accessed when closed.
- [#6110](https://github.com/influxdata/influxdb/issues/6110): Fix for 0.9 upgrade path when using RPM
- [#6131](https://github.com/influxdata/influxdb/issues/6061): Fix write throughput regression with large number of measurments

## v0.11.0 [2016-03-22]

Expand Down
48 changes: 14 additions & 34 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,15 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
ss := d.Series(k)
if ss != nil {
d.mu.Lock()
ss.AssignShard(shardID)
d.mu.Unlock()
}
}

// TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
d.mu.RLock()
defer d.mu.RUnlock()

ss := d.series[key]
if ss == nil {
return nil
Expand Down Expand Up @@ -375,8 +374,6 @@ func (d *DatabaseIndex) DropMeasurement(name string) {
delete(d.series, s.Key)
}

m.drop()

d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID)))
d.statMap.Add(statDatabaseMeasurements, -1)
}
Expand Down Expand Up @@ -418,8 +415,6 @@ type Measurement struct {
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
seriesIDs SeriesIDs // sorted list of series IDs in this measurement

statMap *expvar.Map
}

// NewMeasurement allocates and initializes a new Measurement.
Expand All @@ -432,12 +427,6 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
seriesIDs: make(SeriesIDs, 0),

statMap: influxdb.NewStatistics(
fmt.Sprintf("measurement:%s.%s", name, idx.name),
"measurement",
map[string]string{"database": idx.name, "measurement": name},
),
}
}

Expand Down Expand Up @@ -530,7 +519,6 @@ func (m *Measurement) AddSeries(s *Series) bool {
valueMap[v] = ids
}

m.statMap.Add(statMeasurementSeries, 1)
return true
}

Expand Down Expand Up @@ -578,17 +566,9 @@ func (m *Measurement) DropSeries(seriesID uint64) {
}
}

m.statMap.Add(statMeasurementSeries, -1)

return
}

// drop handles any cleanup for when a measurement is dropped.
// Currently only cleans up stats.
func (m *Measurement) drop() {
m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs)))
}

// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) {
Expand Down Expand Up @@ -1311,9 +1291,9 @@ func (a Measurements) union(other Measurements) Measurements {

// Series belong to a Measurement and represent unique time series in a database
type Series struct {
Key string
Tags map[string]string

mu sync.RWMutex
Key string
Tags map[string]string
id uint64
measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined
Expand All @@ -1329,11 +1309,16 @@ func NewSeries(key string, tags map[string]string) *Series {
}

func (s *Series) AssignShard(shardID uint64) {
s.mu.Lock()
s.shardIDs[shardID] = true
s.mu.Unlock()
}

// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()

var pb internal.Series
pb.Key = &s.Key
for k, v := range s.Tags {
Expand All @@ -1346,6 +1331,9 @@ func (s *Series) MarshalBinary() ([]byte, error) {

// UnmarshalBinary decodes the object from a binary format.
func (s *Series) UnmarshalBinary(buf []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

var pb internal.Series
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
Expand All @@ -1360,17 +1348,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error {

// InitializeShards initializes the list of shards.
func (s *Series) InitializeShards() {
s.mu.Lock()
s.shardIDs = make(map[uint64]bool)
}

// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {
if s.Tags[k] != v {
return false
}
}
return true
s.mu.Unlock()
}

// SeriesIDs is a convenience type for sorting, checking equality, and doing
Expand Down

0 comments on commit 319a2d9

Please sign in to comment.