Skip to content

Commit

Permalink
Remove shards from Store; encapsulate functionality into shardLocation
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Feb 16, 2016
1 parent f57e2a0 commit 9e73c57
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ type Store struct {

databaseIndexes map[string]*DatabaseIndex

// shards is a map of shard IDs to Shards for *ALL DATABASES*.
shards map[uint64]*Shard

// shardLocations is a map of shard IDs to meta information about
// where the shard is located on disk.
// shardLocations is a map of shard IDs to both the associated
// Shard, and meta information about where the shard is located on
// disk.
//
// shardLocations stores mappings for all shards on all databases.
shardLocations map[uint64]*shardLocation

EngineOptions EngineOptions
Expand Down Expand Up @@ -74,7 +74,6 @@ func (s *Store) Open() error {

s.closing = make(chan struct{})

s.shards = map[uint64]*Shard{}
s.shardLocations = map[uint64]*shardLocation{}
s.databaseIndexes = map[string]*DatabaseIndex{}

Expand Down Expand Up @@ -151,7 +150,6 @@ func (s *Store) loadShards() error {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
}

s.shards[shardID] = shard
s.shardLocations[shardID] = &shardLocation{Database: db, RetentionPolicy: rp.Name(), Shard: shard}
}
}
Expand All @@ -171,13 +169,12 @@ func (s *Store) Close() error {
}
s.wg.Wait()

for _, sh := range s.shards {
if err := sh.Close(); err != nil {
for _, sl := range s.shardLocations {
if err := sl.Shard.Close(); err != nil {
return err
}
}
s.opened = false
s.shards = nil
s.shardLocations = nil
s.databaseIndexes = nil

Expand All @@ -195,7 +192,11 @@ func (s *Store) DatabaseIndexN() int {
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
defer s.mu.RUnlock()
return s.shards[id]
sl, ok := s.shardLocations[id]
if !ok {
return nil
}
return sl.Shard
}

// Shards returns a list of shards by id.
Expand All @@ -204,11 +205,11 @@ func (s *Store) Shards(ids []uint64) []*Shard {
defer s.mu.RUnlock()
a := make([]*Shard, 0, len(ids))
for _, id := range ids {
sh := s.shards[id]
if sh == nil {
sl, ok := s.shardLocations[id]
if !ok || sl.Shard == nil {
continue
}
a = append(a, sh)
a = append(a, sl.Shard)
}
return a
}
Expand All @@ -217,7 +218,7 @@ func (s *Store) Shards(ids []uint64) []*Shard {
func (s *Store) ShardN() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.shards)
return len(s.shardLocations)
}

// CreateShard creates a shard with the given id and retention policy on a database.
Expand All @@ -232,7 +233,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
}

// shard already exists
if _, ok := s.shards[shardID]; ok {
if sl, ok := s.shardLocations[shardID]; ok && sl.Shard != nil {
return nil
}

Expand Down Expand Up @@ -260,7 +261,6 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
return err
}

s.shards[shardID] = shard
s.shardLocations[shardID] = &shardLocation{Database: database, RetentionPolicy: retentionPolicy, Shard: shard}

return nil
Expand All @@ -277,26 +277,24 @@ func (s *Store) DeleteShard(shardID uint64) error {
// to handle locks appropriately.
func (s *Store) deleteShard(shardID uint64) error {
// ensure shard exists
sh, ok := s.shards[shardID]
if !ok {
sl, ok := s.shardLocations[shardID]
if !ok || sl.Shard == nil {
return nil
}

if err := sh.Close(); err != nil {
if err := sl.Shard.Close(); err != nil {
return err
}

if err := os.RemoveAll(sh.path); err != nil {
if err := os.RemoveAll(sl.Shard.path); err != nil {
return err
}

if err := os.RemoveAll(sh.walPath); err != nil {
if err := os.RemoveAll(sl.Shard.walPath); err != nil {
return err
}

delete(s.shards, shardID)
delete(s.shardLocations, shardID)

return nil
}

Expand Down Expand Up @@ -375,11 +373,12 @@ func (s *Store) DeleteMeasurement(database, name string) error {
db.DropMeasurement(m.Name)

// Remove underlying data.
for _, sh := range s.shards {
if sh.index != db {
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) || sl.Shard == nil {
continue
}
if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {

if err := sl.Shard.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
return err
}
}
Expand All @@ -395,18 +394,18 @@ func (s *Store) ShardIDs() []uint64 {
}

func (s *Store) shardIDs() []uint64 {
a := make([]uint64, 0, len(s.shards))
for shardID := range s.shards {
a := make([]uint64, 0, len(s.shardLocations))
for shardID := range s.shardLocations {
a = append(a, shardID)
}
return a
}

// shardsSlice returns an ordered list of shards.
func (s *Store) shardsSlice() []*Shard {
a := make([]*Shard, 0, len(s.shards))
for _, sh := range s.shards {
a = append(a, sh)
a := make([]*Shard, 0, len(s.shardLocations))
for _, sl := range s.shardLocations {
a = append(a, sl.Shard)
}
sort.Sort(Shards(a))
return a
Expand Down Expand Up @@ -486,16 +485,16 @@ func (s *Store) DeleteSeries(database string, seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()

db, ok := s.databaseIndexes[database]
if !ok {
if _, ok := s.databaseIndexes[database]; !ok {
return ErrDatabaseNotFound(database)
}

for _, sh := range s.shards {
if sh.index != db {
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) || sl.Shard == nil {
continue
}
if err := sh.DeleteSeries(seriesKeys); err != nil {

if err := sl.Shard.DeleteSeries(seriesKeys); err != nil {
return err
}
}
Expand All @@ -517,14 +516,14 @@ func (s *Store) periodicMaintenance() {
}
}

// performMaintenance will loop through the shars and tell them
// to perform any maintenance tasks. Those tasks should kick off
// their own goroutines if it's anything that could take time.
// performMaintenance loops through shards and executes any maintenance
// tasks. Those tasks should run in their own goroutines if they will
// take significant time.
func (s *Store) performMaintenance() {
s.mu.Lock()
defer s.mu.Unlock()
for _, sh := range s.shards {
s.performMaintenanceOnShard(sh)
for _, sl := range s.shardLocations {
s.performMaintenanceOnShard(sl.Shard)
}
}

Expand Down Expand Up @@ -604,12 +603,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
default:
}

sh, ok := s.shards[shardID]
if !ok {
sl, ok := s.shardLocations[shardID]
if !ok || sl.Shard == nil {
return ErrShardNotFound
}

return sh.WritePoints(points)
return sl.Shard.WritePoints(points)
}

// shardLocation is a wrapper around a shard that provides extra
Expand Down

0 comments on commit 9e73c57

Please sign in to comment.