diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index 13603fcdcd5..d3b136ac96a 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -14,7 +14,6 @@ import ( "parser" "protocol" "sync" - "sync/atomic" "time" "wal" ) @@ -76,7 +75,6 @@ type ClusterConfiguration struct { lastServerToGetShard *ClusterServer shardCreator ShardCreator shardLock sync.Mutex - lastShardId uint32 shardsById map[uint32]*ShardData shardsByIdLock sync.RWMutex LocalRaftName string @@ -818,7 +816,7 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat durationIsSplit := len(shards) > 1 for _, newShard := range shards { - id := atomic.AddUint32(&self.lastShardId, uint32(1)) + id := uint32(len(self.GetAllShards()) + 1) shard := NewShard(id, newShard.StartTime, newShard.EndTime, shardType, durationIsSplit, self.wal) servers := make([]*ClusterServer, 0) for _, serverId := range newShard.ServerIds { diff --git a/src/integration/server_test.go b/src/integration/server_test.go index c4f67f08740..274d2a429de 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -1273,3 +1273,56 @@ func (self *ServerSuite) TestContinuousQueryWithMixedGroupByOperations(c *C) { c.Assert(series.GetValueForPointAndColumn(2, "mean", c), Equals, float64(4.5)) c.Assert(series.GetValueForPointAndColumn(2, "url", c), Equals, "/register") } + +// fix for #305: https://github.com/influxdb/influxdb/issues/305 +func (self *ServerSuite) TestShardIdUniquenessAfterRestart(c *C) { + server := self.serverProcesses[0] + t := (time.Now().Unix() + 86400*720) * 1000 + data := fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_shard_id_uniqueness", "columns": ["value", "time"]}]`, t) + server.Post("/db/test_rep/series?u=paul&p=pass", data, c) + + body := server.Get("/cluster/shards?u=root&p=root", c) + res := make(map[string]interface{}) + err := json.Unmarshal(body, &res) + c.Assert(err, IsNil) + shardIds := make(map[float64]bool) + for _, s := range res["shortTerm"].([]interface{}) { + sh := s.(map[string]interface{}) + shardId := sh["id"].(float64) + hasId := shardIds[shardId] + c.Assert(hasId, Equals, false) + shardIds[shardId] = true + } + c.Assert(len(shardIds) > 0, Equals, true) + + resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + + for _, s := range self.serverProcesses { + s.Stop() + } + time.Sleep(time.Second * 2) + for _, s := range self.serverProcesses { + s.Start() + time.Sleep(time.Second) + } + + server = self.serverProcesses[0] + t = (time.Now().Unix() + 86400*720*2) * 1000 + data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_shard_id_uniqueness", "columns": ["value", "time"]}]`, t) + server.Post("/db/test_rep/series?u=paul&p=pass", data, c) + + body = server.Get("/cluster/shards?u=root&p=root", c) + res = make(map[string]interface{}) + err = json.Unmarshal(body, &res) + c.Assert(err, IsNil) + shardIds = make(map[float64]bool) + for _, s := range res["shortTerm"].([]interface{}) { + sh := s.(map[string]interface{}) + shardId := sh["id"].(float64) + hasId := shardIds[shardId] + c.Assert(hasId, Equals, false) + shardIds[shardId] = true + } + c.Assert(len(shardIds) > 0, Equals, true) +}