Skip to content

Commit

Permalink
Fix #305. Ensure that the shard id is set based on the number of curr…
Browse files Browse the repository at this point in the history
…ent shards + 1. 305 was only occuring if a restart after a Raft log compaction happened
  • Loading branch information
pauldix committed Mar 6, 2014
1 parent 58cdd6f commit a3c83b6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
4 changes: 1 addition & 3 deletions src/cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"parser"
"protocol"
"sync"
"sync/atomic"
"time"
"wal"
)
Expand Down Expand Up @@ -76,7 +75,6 @@ type ClusterConfiguration struct {
lastServerToGetShard *ClusterServer
shardCreator ShardCreator
shardLock sync.Mutex
lastShardId uint32
shardsById map[uint32]*ShardData
shardsByIdLock sync.RWMutex
LocalRaftName string
Expand Down Expand Up @@ -818,7 +816,7 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat

durationIsSplit := len(shards) > 1
for _, newShard := range shards {
id := atomic.AddUint32(&self.lastShardId, uint32(1))
id := uint32(len(self.GetAllShards()) + 1)
shard := NewShard(id, newShard.StartTime, newShard.EndTime, shardType, durationIsSplit, self.wal)
servers := make([]*ClusterServer, 0)
for _, serverId := range newShard.ServerIds {
Expand Down
53 changes: 53 additions & 0 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,3 +1273,56 @@ func (self *ServerSuite) TestContinuousQueryWithMixedGroupByOperations(c *C) {
c.Assert(series.GetValueForPointAndColumn(2, "mean", c), Equals, float64(4.5))
c.Assert(series.GetValueForPointAndColumn(2, "url", c), Equals, "/register")
}

// fix for #305: https://github.com/influxdb/influxdb/issues/305
func (self *ServerSuite) TestShardIdUniquenessAfterRestart(c *C) {
server := self.serverProcesses[0]
t := (time.Now().Unix() + 86400*720) * 1000
data := fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_shard_id_uniqueness", "columns": ["value", "time"]}]`, t)
server.Post("/db/test_rep/series?u=paul&p=pass", data, c)

body := server.Get("/cluster/shards?u=root&p=root", c)
res := make(map[string]interface{})
err := json.Unmarshal(body, &res)
c.Assert(err, IsNil)
shardIds := make(map[float64]bool)
for _, s := range res["shortTerm"].([]interface{}) {
sh := s.(map[string]interface{})
shardId := sh["id"].(float64)
hasId := shardIds[shardId]
c.Assert(hasId, Equals, false)
shardIds[shardId] = true
}
c.Assert(len(shardIds) > 0, Equals, true)

resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusOK)

for _, s := range self.serverProcesses {
s.Stop()
}
time.Sleep(time.Second * 2)
for _, s := range self.serverProcesses {
s.Start()
time.Sleep(time.Second)
}

server = self.serverProcesses[0]
t = (time.Now().Unix() + 86400*720*2) * 1000
data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_shard_id_uniqueness", "columns": ["value", "time"]}]`, t)
server.Post("/db/test_rep/series?u=paul&p=pass", data, c)

body = server.Get("/cluster/shards?u=root&p=root", c)
res = make(map[string]interface{})
err = json.Unmarshal(body, &res)
c.Assert(err, IsNil)
shardIds = make(map[float64]bool)
for _, s := range res["shortTerm"].([]interface{}) {
sh := s.(map[string]interface{})
shardId := sh["id"].(float64)
hasId := shardIds[shardId]
c.Assert(hasId, Equals, false)
shardIds[shardId] = true
}
c.Assert(len(shardIds) > 0, Equals, true)
}

0 comments on commit a3c83b6

Please sign in to comment.