diff --git a/CHANGELOG.md b/CHANGELOG.md index f0beee5d825..d65c6e4b9e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#5419](https://github.com/influxdata/influxdb/pull/5419): Graphite: Support matching tags multiple times Thanks @m4ce - [#5598](https://github.com/influxdata/influxdb/pull/5598): Client: Add Ping to v2 client @PSUdaemon - [#4125](https://github.com/influxdata/influxdb/pull/4125): Admin UI: Fetch and display server version on connect. Thanks @alexiri! +- [#5602](https://github.com/influxdata/influxdb/pull/5602): Simplify cluster startup for scripting and deployment ### Bugfixes diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index f00b9291ed7..47e9bf6659a 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -14,7 +14,6 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/influxdata/influxdb" ) const logo = ` @@ -95,13 +94,8 @@ func (cmd *Command) Run(args ...string) error { return fmt.Errorf("apply env config: %v", err) } - // If we have a node ID, ignore the join argument - // We are not using the reference to this node var, just checking - // to see if we have a node ID on disk - if node, _ := influxdb.LoadNode(config.Meta.Dir, []string{config.Meta.HTTPBindAddress}); node == nil || node.ID == 0 { - if options.Join != "" { - config.Meta.JoinPeers = strings.Split(options.Join, ",") - } + if options.Join != "" { + config.Meta.JoinPeers = strings.Split(options.Join, ",") } // Validate the configuration. diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 40d80ddd852..59aca76a1b1 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -128,6 +128,14 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { return nil, err } + // Create the root directory if it doesn't already exist. + if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil { + return nil, fmt.Errorf("mkdir all: %s", err) + } + + // 0.11 we no longer use peers.json. Remove the file if we have one on disk. + os.RemoveAll(filepath.Join(c.Meta.Dir, "peers.json")) + // load the node information metaAddresses := []string{nodeAddr} if !c.Meta.Enabled { @@ -627,6 +635,8 @@ func (s *Server) initializeMetaClient() error { go s.updateMetaNodeInformation() + s.MetaClient.WaitForDataChanged() + return nil } @@ -645,14 +655,14 @@ func (s *Server) initializeMetaClient() error { if err := s.MetaClient.Open(); err != nil { return err } - - if s.TSDBStore != nil { - n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) - if err != nil { - return err - } - s.Node.ID = n.ID + n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) + for err != nil { + log.Printf("Unable to create data node. retry in 1s: %s", err.Error()) + time.Sleep(time.Second) + n, err = s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) } + s.Node.ID = n.ID + metaNodes, err := s.MetaClient.MetaNodes() if err != nil { return err diff --git a/services/meta/client.go b/services/meta/client.go index 25970d516d1..ea5807ab5f2 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -827,12 +827,21 @@ func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error { continue } resp.Body.Close() + + // Successfully joined + if resp.StatusCode == http.StatusOK { + return nil + } + + // We tried to join a meta node that was not the leader, rety at the node + // they think is the leader. if resp.StatusCode == http.StatusTemporaryRedirect { redirectServer = resp.Header.Get("Location") continue } - return nil + // Something failed, try the next node + currentServer++ } } @@ -1109,6 +1118,36 @@ func (c *Client) getSnapshot(server string, index uint64) (*Data, error) { return data, nil } +// peers returns the TCPHost addresses of all the metaservers +func (c *Client) peers() []string { + + var peers Peers + // query each server and keep track of who their peers are + for _, server := range c.metaServers { + url := c.url(server) + "/peers" + resp, err := http.Get(url) + if err != nil { + continue + } + defer resp.Body.Close() + + // This meta-server might not be ready to answer, continue on + if resp.StatusCode != http.StatusOK { + continue + } + + dec := json.NewDecoder(resp.Body) + var p []string + if err := dec.Decode(&p); err != nil { + continue + } + peers = peers.Append(p...) + } + + // Return the unique set of peer addresses + return []string(peers.Unique()) +} + func (c *Client) url(server string) string { url := fmt.Sprintf("://%s", server) @@ -1170,6 +1209,36 @@ func (c *Client) updateAuthCache() { c.authCache = newCache } +type Peers []string + +func (peers Peers) Append(p ...string) Peers { + peers = append(peers, p...) + + return peers.Unique() +} + +func (peers Peers) Unique() Peers { + distinct := map[string]struct{}{} + for _, p := range peers { + distinct[p] = struct{}{} + } + + var u Peers + for k := range distinct { + u = append(u, k) + } + return u +} + +func (peers Peers) Contains(peer string) bool { + for _, p := range peers { + if p == peer { + return true + } + } + return false +} + type errRedirect struct { host string } diff --git a/services/meta/handler.go b/services/meta/handler.go index 863ba8becf8..af3eb6e2631 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -10,6 +10,7 @@ import ( "log" "net/http" "os" + "runtime" "strconv" "strings" "sync" @@ -38,6 +39,7 @@ type handler struct { apply(b []byte) error join(n *NodeInfo) error otherMetaServersHTTP() []string + peers() []string } s *Service @@ -84,6 +86,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.WrapHandler("ping", h.servePing).ServeHTTP(w, r) case "/lease": h.WrapHandler("lease", h.serveLease).ServeHTTP(w, r) + case "/peers": + h.WrapHandler("peers", h.servePeers).ServeHTTP(w, r) default: h.WrapHandler("snapshot", h.serveSnapshot).ServeHTTP(w, r) } @@ -120,7 +124,7 @@ func (h *handler) isClosed() bool { // serveExec executes the requested command. func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { if h.isClosed() { - h.httpError(fmt.Errorf("server closed"), w, http.StatusInternalServerError) + h.httpError(fmt.Errorf("server closed"), w, http.StatusServiceUnavailable) return } @@ -137,6 +141,7 @@ func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { h.httpError(err, w, http.StatusInternalServerError) return } + err := h.store.join(n) if err == raft.ErrNotLeader { l := h.store.leaderHTTP() @@ -307,6 +312,13 @@ func (h *handler) servePing(w http.ResponseWriter, r *http.Request) { h.httpError(fmt.Errorf("one or more metaservers not up"), w, http.StatusInternalServerError) } +func (h *handler) servePeers(w http.ResponseWriter, r *http.Request) { + enc := json.NewEncoder(w) + if err := enc.Encode(h.store.peers()); err != nil { + h.httpError(err, w, http.StatusInternalServerError) + } +} + // serveLease func (h *handler) serveLease(w http.ResponseWriter, r *http.Request) { var name, nodeIDStr string @@ -442,8 +454,10 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler defer func() { if err := recover(); err != nil { + b := make([]byte, 1024) + runtime.Stack(b, false) logLine := buildLogLine(l, r, start) - logLine = fmt.Sprintf(`%s [panic:%s]`, logLine, err) + logLine = fmt.Sprintf("%s [panic:%s]\n%s", logLine, err, string(b)) weblog.Println(logLine) } }() diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index b409d8c18ee..720555c793c 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -1,7 +1,6 @@ package meta import ( - "errors" "fmt" "io/ioutil" "log" @@ -73,7 +72,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput) // Create peer storage. - r.peerStore = raft.NewJSONPeers(r.path, r.transport) + r.peerStore = &peerStore{} // This server is joining the raft cluster for the first time if initializePeers are passed in if len(initializePeers) > 0 { @@ -88,17 +87,20 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er } // If no peers are set in the config or there is one and we are it, then start as a single server. - if len(peers) <= 1 { + if len(initializePeers) <= 1 { config.EnableSingleNode = true // Ensure we can always become the leader config.DisableBootstrapAfterElect = false - // For single-node clusters, we can update the raft peers before we start the cluster - // just in case the hostname has changed. - if err := r.peerStore.SetPeers([]string{r.addr}); err != nil { - return err + // Make sure our peer address is here. This happens with either a single node cluster + // or a node joining the cluster, as no one else has that information yet. + if !raft.PeerContained(peers, r.addr) { + if err := r.peerStore.SetPeers([]string{r.addr}); err != nil { + return err + } } + peers = []string{r.addr} } @@ -107,7 +109,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er // is difficult to resolve automatically because we need to have all the raft peers agree on the current members // of the cluster before we can change them. if len(peers) > 0 && !raft.PeerContained(peers, r.addr) { - r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.addr, r.path) + r.logger.Printf("%s is not in the list of raft peers. Please ensure all nodes have the same meta nodes configured", r.addr, r.path) return fmt.Errorf("peers out of sync: %v not in %v", r.addr, peers) } @@ -156,6 +158,9 @@ func (r *raftState) logLeaderChanges() { } func (r *raftState) close() error { + if r == nil { + return nil + } if r.closing != nil { close(r.closing) } @@ -214,15 +219,6 @@ func (r *raftState) snapshot() error { // addPeer adds addr to the list of peers in the cluster. func (r *raftState) addPeer(addr string) error { - // peers, err := r.peerStore.Peers() - // if err != nil { - // return err - // } - // peers = append(peers, addr) - // if fut := r.raft.SetPeers(peers); fut.Error() != nil { - // return fut.Error() - // } - peers, err := r.peerStore.Peers() if err != nil { return err @@ -244,7 +240,7 @@ func (r *raftState) addPeer(addr string) error { func (r *raftState) removePeer(addr string) error { // Only do this on the leader if !r.isLeader() { - return errors.New("not the leader") + return raft.ErrNotLeader } if fut := r.raft.RemovePeer(addr); fut.Error() != nil { return fut.Error() @@ -326,3 +322,22 @@ func (l *raftLayer) Accept() (net.Conn, error) { return l.ln.Accept() } // Close closes the layer. func (l *raftLayer) Close() error { return l.ln.Close() } + +// peerStore is an in-memory implementation of raft.PeerStore +type peerStore struct { + mu sync.RWMutex + peers []string +} + +func (m *peerStore) Peers() ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.peers, nil +} + +func (m *peerStore) SetPeers(peers []string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.peers = peers + return nil +} diff --git a/services/meta/service.go b/services/meta/service.go index fd923705be8..923d73a98fb 100644 --- a/services/meta/service.go +++ b/services/meta/service.go @@ -110,9 +110,6 @@ func (s *Service) Open() error { // Open the store s.store = newStore(s.config, s.httpAddr, s.raftAddr) - if err := s.store.open(s.RaftListener); err != nil { - return err - } handler := newHandler(s.config, s) handler.logger = s.Logger @@ -121,6 +118,11 @@ func (s *Service) Open() error { // Begin listening for requests in a separate goroutine. go s.serve() + + if err := s.store.open(s.RaftListener); err != nil { + return err + } + return nil } diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 829260b8d99..7f1892306f5 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -2,6 +2,7 @@ package meta_test import ( "encoding/json" + "fmt" "io/ioutil" "net" "os" @@ -117,21 +118,21 @@ func TestMetaService_Databases(t *testing.T) { // Create two databases. db, err := c.CreateDatabase("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } db, err = c.CreateDatabase("db1") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db1" { t.Fatalf("db name wrong: %s", db.Name) } dbs, err := c.Databases() if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } if len(dbs) != 2 { t.Fatalf("expected 2 databases but got %d", len(dbs)) @@ -186,7 +187,7 @@ func TestMetaService_CreateRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -239,7 +240,7 @@ func TestMetaService_SetDefaultRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -275,7 +276,7 @@ func TestMetaService_DropRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -668,7 +669,7 @@ func TestMetaService_Subscriptions_Drop(t *testing.T) { // subscription is unknown. res := c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION foo ON db0."default"`)) if got, exp := res.Err, meta.ErrSubscriptionNotFound; got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // Create a subscription. @@ -680,20 +681,20 @@ func TestMetaService_Subscriptions_Drop(t *testing.T) { // the database is unknown. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON foo."default"`)) if got, exp := res.Err, influxdb.ErrDatabaseNotFound("foo"); got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // DROP SUBSCRIPTION returns an influxdb.ErrRetentionPolicyNotFound // when the retention policy is unknown. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."foo_policy"`)) if got, exp := res.Err, influxdb.ErrRetentionPolicyNotFound("foo_policy"); got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // DROP SUBSCRIPTION drops the subsciption if it can find it. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."default"`)) if got := res.Err; got != nil { - t.Fatalf("got: %s, exp: %v", got.Error(), nil) + t.Fatalf("got: %s, exp: %v", got, nil) } if res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`)); res.Err != nil { @@ -718,7 +719,7 @@ func TestMetaService_Shards(t *testing.T) { } if _, err := c.CreateDataNode(exp.Host, exp.TCPHost); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if _, err := c.CreateDatabase("db0"); err != nil { @@ -772,51 +773,69 @@ func TestMetaService_Shards(t *testing.T) { func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Parallel() + joinPeers := freePorts(4) + raftPeers := freePorts(4) + cfg1 := newConfig() + cfg1.HTTPBindAddress = joinPeers[0] + cfg1.BindAddress = raftPeers[0] defer os.RemoveAll(cfg1.Dir) cfg2 := newConfig() + cfg2.HTTPBindAddress = joinPeers[1] + cfg2.BindAddress = raftPeers[1] defer os.RemoveAll(cfg2.Dir) - cfg3 := newConfig() - defer os.RemoveAll(cfg3.Dir) - cfg4 := newConfig() - defer os.RemoveAll(cfg4.Dir) + var wg sync.WaitGroup + wg.Add(2) + cfg1.JoinPeers = joinPeers[0:2] s1 := newService(cfg1) - if err := s1.Open(); err != nil { - t.Fatalf(err.Error()) - } + go func() { + defer wg.Done() + if err := s1.Open(); err != nil { + t.Fatal(err) + } + }() defer s1.Close() - cfg2.JoinPeers = []string{s1.HTTPAddr()} + cfg2.JoinPeers = joinPeers[0:2] s2 := newService(cfg2) - if err := s2.Open(); err != nil { - t.Fatal(err.Error()) - } + go func() { + defer wg.Done() + if err := s2.Open(); err != nil { + t.Fatal(err) + } + }() defer s2.Close() + wg.Wait() - func() { - cfg3.JoinPeers = []string{s2.HTTPAddr()} - s3 := newService(cfg3) - if err := s3.Open(); err != nil { - t.Fatal(err.Error()) - } - defer s3.Close() + cfg3 := newConfig() + joinPeers[2] = freePort() + cfg3.HTTPBindAddress = joinPeers[2] + raftPeers[2] = freePort() + cfg3.BindAddress = raftPeers[2] + defer os.RemoveAll(cfg3.Dir) - c1 := meta.NewClient([]string{s1.HTTPAddr()}, false) - if err := c1.Open(); err != nil { - t.Fatal(err.Error()) - } - defer c1.Close() + cfg3.JoinPeers = joinPeers[0:3] + s3 := newService(cfg3) + if err := s3.Open(); err != nil { + t.Fatal(err) + } + defer s3.Close() - metaNodes, _ := c1.MetaNodes() - if len(metaNodes) != 3 { - t.Fatalf("meta nodes wrong: %v", metaNodes) - } - }() + c1 := meta.NewClient(joinPeers[0:3], false) + if err := c1.Open(); err != nil { + t.Fatal(err) + } + defer c1.Close() + + metaNodes, _ := c1.MetaNodes() + if len(metaNodes) != 3 { + t.Fatalf("meta nodes wrong: %v", metaNodes) + } c := meta.NewClient([]string{s1.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -824,19 +843,29 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Fatal(res.Err) } - metaNodes, _ := c.MetaNodes() + metaNodes, _ = c.MetaNodes() if len(metaNodes) != 2 { t.Fatalf("meta nodes wrong: %v", metaNodes) } - cfg4.JoinPeers = []string{s1.HTTPAddr()} + cfg4 := newConfig() + cfg4.HTTPBindAddress = freePort() + cfg4.BindAddress = freePort() + cfg4.JoinPeers = []string{joinPeers[0], joinPeers[1], cfg4.HTTPBindAddress} + defer os.RemoveAll(cfg4.Dir) s4 := newService(cfg4) if err := s4.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer s4.Close() - metaNodes, _ = c.MetaNodes() + c2 := meta.NewClient(cfg4.JoinPeers, false) + if err := c2.Open(); err != nil { + t.Fatal(err) + } + defer c2.Close() + + metaNodes, _ = c2.MetaNodes() if len(metaNodes) != 3 { t.Fatalf("meta nodes wrong: %v", metaNodes) } @@ -850,39 +879,48 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + var wg sync.WaitGroup + wg.Add(len(cfgs)) + + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } + go func(srv *testService) { + defer wg.Done() + if err := srv.Open(); err != nil { + t.Fatal(err) + } + }(srvs[i]) defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + wg.Wait() - c := meta.NewClient([]string{srvs[2].HTTPAddr()}, false) - if err := c.Open(); err != nil { - t.Fatal(err.Error()) - } - defer c.Close() + for i := range cfgs { + c := meta.NewClient([]string{joinPeers[i]}, false) + if err := c.Open(); err != nil { + t.Fatal(err) + } + defer c.Close() - metaNodes, _ := c.MetaNodes() - if len(metaNodes) != 3 { - t.Fatalf("meta nodes wrong: %v", metaNodes) - } + metaNodes, _ := c.MetaNodes() + if len(metaNodes) != 3 { + t.Fatalf("node %d - meta nodes wrong: %v", i, metaNodes) + } - if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err) - } + if _, err := c.CreateDatabase(fmt.Sprintf("foo%d", i)); err != nil { + t.Fatalf("node %d: %s", i, err) + } - if db, err := c.Database("foo"); db == nil || err != nil { - t.Fatalf("database foo wasn't created: %s", err.Error()) + if db, err := c.Database(fmt.Sprintf("foo%d", i)); db == nil || err != nil { + t.Fatalf("node %d: database foo wasn't created: %s", i, err) + } } } @@ -893,28 +931,35 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + raftPeers := freePorts(len(cfgs)) + var swg sync.WaitGroup + swg.Add(len(cfgs)) + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.BindAddress = raftPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } - c.HTTPBindAddress = srvs[i].HTTPAddr() - c.BindAddress = srvs[i].RaftAddr() - c.JoinPeers = nil + go func(i int, srv *testService) { + defer swg.Done() + if err := srv.Open(); err != nil { + t.Logf("opening server %d", i) + t.Fatal(err) + } + }(i, srvs[i]) + defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + swg.Wait() - c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false) + c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -929,11 +974,11 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c.Database("foo"); db == nil || err != nil { - t.Fatalf("database foo wasn't created: %s", err.Error()) + t.Fatalf("database foo wasn't created: %s", err) } if err := srvs[0].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if _, err := c.CreateDatabase("bar"); err != nil { @@ -941,25 +986,24 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c.Database("bar"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } if err := srvs[1].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if err := srvs[2].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } // give them a second to shut down time.Sleep(time.Second) - // when we start back up they need to happen simultaneously, otherwise - // a leader won't get elected + // need to start them all at once so they can discover the bind addresses for raft var wg sync.WaitGroup + wg.Add(len(cfgs)) for i, cfg := range cfgs { srvs[i] = newService(cfg) - wg.Add(1) go func(srv *testService) { if err := srv.Open(); err != nil { panic(err) @@ -971,7 +1015,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { wg.Wait() time.Sleep(time.Second) - c2 := meta.NewClient([]string{srvs[0].HTTPAddr()}, false) + c2 := meta.NewClient(joinPeers, false) if err := c2.Open(); err != nil { t.Fatal(err) } @@ -983,7 +1027,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c2.Database("bar"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } if _, err := c2.CreateDatabase("asdf"); err != nil { @@ -991,7 +1035,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c2.Database("asdf"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } } @@ -1014,12 +1058,12 @@ func TestMetaService_NameChangeSingleNode(t *testing.T) { c := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } s.Close() @@ -1029,24 +1073,24 @@ func TestMetaService_NameChangeSingleNode(t *testing.T) { cfg.HTTPBindAddress = "asdf" + ":" + strings.Split(s.HTTPAddr(), ":")[1] s = newService(cfg) if err := s.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer s.Close() c2 := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c2.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c2.Close() db, err := c2.Database("foo") if db == nil || err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } nodes, err := c2.MetaNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } exp := []meta.NodeInfo{{ID: 1, Host: cfg.HTTPBindAddress, TCPHost: cfg.BindAddress}} @@ -1072,7 +1116,7 @@ func TestMetaService_CreateDataNode(t *testing.T) { n, err := c.CreateDataNode(exp.Host, exp.TCPHost) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(n, exp) { @@ -1081,7 +1125,7 @@ func TestMetaService_CreateDataNode(t *testing.T) { nodes, err := c.DataNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(nodes, []meta.NodeInfo{*exp}) { @@ -1105,7 +1149,7 @@ func TestMetaService_DropDataNode(t *testing.T) { n, err := c.CreateDataNode(exp.Host, exp.TCPHost) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(n, exp) { @@ -1114,7 +1158,7 @@ func TestMetaService_DropDataNode(t *testing.T) { nodes, err := c.DataNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(nodes, []meta.NodeInfo{*exp}) { @@ -1122,11 +1166,11 @@ func TestMetaService_DropDataNode(t *testing.T) { } if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } sg, err := c.CreateShardGroup("foo", "default", time.Now()) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(sg.Shards[0].Owners, []meta.ShardOwner{{1}}) { @@ -1134,7 +1178,7 @@ func TestMetaService_DropDataNode(t *testing.T) { } if res := c.ExecuteStatement(mustParseStatement("DROP DATA SERVER 1")); res.Err != nil { - t.Fatal(res.Err.Error()) + t.Fatal(res.Err) } rp, _ := c.RetentionPolicy("foo", "default") @@ -1156,7 +1200,7 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { c := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } id := c.ClusterID() if id == 0 { @@ -1166,12 +1210,12 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { s.Close() s = newService(cfg) if err := s.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } c = meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -1186,42 +1230,48 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { func TestMetaService_Ping(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + var swg sync.WaitGroup + swg.Add(len(cfgs)) + + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } - c.HTTPBindAddress = srvs[i].HTTPAddr() - c.BindAddress = srvs[i].RaftAddr() - c.JoinPeers = nil + go func(i int, srv *testService) { + defer swg.Done() + if err := srv.Open(); err != nil { + t.Fatalf("error opening server %d: %s", i, err) + } + }(i, srvs[i]) defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + swg.Wait() - c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false) + c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() if err := c.Ping(false); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false all failed: %s", err) } if err := c.Ping(true); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false true failed: %s", err) } srvs[1].Close() + // give the server time to close + time.Sleep(time.Second) if err := c.Ping(false); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false some failed: %s", err) } if err := c.Ping(true); err == nil { @@ -1241,12 +1291,12 @@ func TestMetaService_AcquireLease(t *testing.T) { n1, err := c1.CreateDataNode("foo1:8180", "bar1:8281") if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } n2, err := c2.CreateDataNode("foo2:8180", "bar2:8281") if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } // Client 1 acquires a lease. Should succeed. @@ -1375,3 +1425,17 @@ func mustMarshalJSON(v interface{}) string { } return string(b) } + +func freePort() string { + l, _ := net.Listen("tcp", "127.0.0.1:0") + defer l.Close() + return l.Addr().String() +} + +func freePorts(i int) []string { + var ports []string + for j := 0; j < i; j++ { + ports = append(ports, freePort()) + } + return ports +} diff --git a/services/meta/store.go b/services/meta/store.go index 6c2405d55bc..c9f511e29c3 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -74,44 +74,50 @@ func newStore(c *Config, httpAddr, raftAddr string) *store { func (s *store) open(raftln net.Listener) error { s.logger.Printf("Using data dir: %v", s.path) - // See if this server needs to join the raft consensus group - var initializePeers []string - if len(s.config.JoinPeers) > 0 { - c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled) - data := c.retryUntilSnapshot(0) - for _, n := range data.MetaNodes { - initializePeers = append(initializePeers, n.TCPHost) - } - initializePeers = append(initializePeers, s.raftAddr) + joinPeers, err := s.filterAddr(s.config.JoinPeers, s.httpAddr) + if err != nil { + return err } + joinPeers = s.config.JoinPeers - if err := func() error { - s.mu.Lock() - defer s.mu.Unlock() + var initializePeers []string + if len(joinPeers) > 0 { + c := NewClient(joinPeers, s.config.HTTPSEnabled) + for { + peers := c.peers() + if !Peers(peers).Contains(s.raftAddr) { + peers = append(peers, s.raftAddr) + } + if len(s.config.JoinPeers)-len(peers) == 0 { + initializePeers = peers + break + } - // Check if store has already been opened. - if s.opened { - return ErrStoreOpen + if len(peers) > len(s.config.JoinPeers) { + s.logger.Printf("waiting for join peers to match config specified. found %v, config specified %v", peers, s.config.JoinPeers) + } else { + s.logger.Printf("Waiting for %d join peers. Have %v. Asking nodes: %v", len(s.config.JoinPeers)-len(peers), peers, joinPeers) + } + time.Sleep(time.Second) } - s.opened = true + } - // Create the root directory if it doesn't already exist. - if err := os.MkdirAll(s.path, 0777); err != nil { - return fmt.Errorf("mkdir all: %s", err) - } + if err := s.setOpen(); err != nil { + return err + } - // Open the raft store. - if err := s.openRaft(initializePeers, raftln); err != nil { - return fmt.Errorf("raft: %s", err) - } + // Create the root directory if it doesn't already exist. + if err := os.MkdirAll(s.path, 0777); err != nil { + return fmt.Errorf("mkdir all: %s", err) + } - return nil - }(); err != nil { - return err + // Open the raft store. + if err := s.openRaft(initializePeers, raftln); err != nil { + return fmt.Errorf("raft: %s", err) } - if len(s.config.JoinPeers) > 0 { - c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled) + if len(joinPeers) > 0 { + c := NewClient(joinPeers, s.config.HTTPSEnabled) if err := c.Open(); err != nil { return err } @@ -149,7 +155,66 @@ func (s *store) open(raftln net.Listener) error { return nil } +func (s *store) setOpen() error { + s.mu.Lock() + defer s.mu.Unlock() + // Check if store has already been opened. + if s.opened { + return ErrStoreOpen + } + s.opened = true + return nil +} + +// peers returns the raft peers known to this store +func (s *store) peers() []string { + s.mu.RLock() + defer s.mu.RUnlock() + if s.raftState == nil { + return []string{s.raftAddr} + } + peers, err := s.raftState.peers() + if err != nil { + return []string{s.raftAddr} + } + return peers +} + +func (s *store) filterAddr(addrs []string, filter string) ([]string, error) { + host, port, err := net.SplitHostPort(filter) + if err != nil { + return nil, err + } + + ip, err := net.ResolveIPAddr("ip", host) + if err != nil { + return nil, err + } + + var joinPeers []string + for _, addr := range addrs { + joinHost, joinPort, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + joinIp, err := net.ResolveIPAddr("ip", joinHost) + if err != nil { + return nil, err + } + + // Don't allow joining ourselves + if ip.String() == joinIp.String() && port == joinPort { + continue + } + joinPeers = append(joinPeers, addr) + } + return joinPeers, nil +} + func (s *store) openRaft(initializePeers []string, raftln net.Listener) error { + s.mu.Lock() + defer s.mu.Unlock() rs := newRaftState(s.config, s.raftAddr) rs.logger = s.logger rs.path = s.path @@ -239,7 +304,7 @@ func (s *store) isLeader() bool { func (s *store) leader() string { s.mu.RLock() defer s.mu.RUnlock() - if s.raftState == nil { + if s.raftState == nil || s.raftState.raft == nil { return "" } return s.raftState.raft.Leader() @@ -288,14 +353,24 @@ func (s *store) index() uint64 { // apply applies a command to raft. func (s *store) apply(b []byte) error { + if s.raftState == nil { + return fmt.Errorf("store not open") + } return s.raftState.apply(b) } // join adds a new server to the metaservice and raft func (s *store) join(n *NodeInfo) error { + s.mu.RLock() + if s.raftState == nil { + s.mu.RUnlock() + return fmt.Errorf("store not open") + } if err := s.raftState.addPeer(n.TCPHost); err != nil { + s.mu.RUnlock() return err } + s.mu.RUnlock() return s.createMetaNode(n.Host, n.TCPHost) } diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index 2a409b7ce9d..3d9d412594b 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -65,11 +65,20 @@ func TestService_Telnet(t *testing.T) { if err := conn.Close(); err != nil { t.Fatal(err) } - time.Sleep(10 * time.Millisecond) - // Verify that the writer was called. - if atomic.LoadInt32(&called) == 0 { - t.Fatal("points writer not called") + tick := time.Tick(10 * time.Millisecond) + timeout := time.After(10 * time.Second) + + for { + select { + case <-tick: + // Verify that the writer was called. + if atomic.LoadInt32(&called) > 0 { + return + } + case <-timeout: + t.Fatal("points writer not called") + } } }