From e1effa660158d43a8c4cc0f6272a95cb534752bb Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Sun, 24 Jan 2016 21:56:06 -0700 Subject: [PATCH 01/15] fix build after rebase on master --- cmd/influxd/run/server.go | 16 +++++-- services/meta/client.go | 49 +++++++++++++++++++- services/meta/handler.go | 18 +++++++- services/meta/raft_state.go | 20 +++------ services/meta/service.go | 8 ++-- services/meta/store.go | 90 ++++++++++++++++++++++++++++++------- 6 files changed, 164 insertions(+), 37 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 40d80ddd852..649ea9556bd 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -128,6 +128,11 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { return nil, err } + // Create the root directory if it doesn't already exist. + if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil { + return nil, fmt.Errorf("mkdir all: %s", err) + } + // load the node information metaAddresses := []string{nodeAddr} if !c.Meta.Enabled { @@ -627,6 +632,8 @@ func (s *Server) initializeMetaClient() error { go s.updateMetaNodeInformation() + s.MetaClient.WaitForDataChanged() + return nil } @@ -645,13 +652,16 @@ func (s *Server) initializeMetaClient() error { if err := s.MetaClient.Open(); err != nil { return err } - - if s.TSDBStore != nil { + for { n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) if err != nil { - return err + println("Unable to create data node. retry...", err.Error()) + time.Sleep(time.Second) + continue } s.Node.ID = n.ID + + break } metaNodes, err := s.MetaClient.MetaNodes() if err != nil { diff --git a/services/meta/client.go b/services/meta/client.go index 25970d516d1..03595c93213 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -827,13 +827,23 @@ func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error { continue } resp.Body.Close() + + // Successfully joined + if resp.StatusCode == http.StatusOK { + return nil + } + + // We tried to join a meta node that was not the leader, rety at the node + // they think is the leader. if resp.StatusCode == http.StatusTemporaryRedirect { redirectServer = resp.Header.Get("Location") continue } - return nil + // Something failed, try the next node + currentServer++ } + return nil } func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*NodeInfo, error) { @@ -1109,6 +1119,43 @@ func (c *Client) getSnapshot(server string, index uint64) (*Data, error) { return data, nil } +// peers returns the TCPHost addresses of all the metaservers +func (c *Client) peers() []string { + distinct := map[string]struct{}{} + + // query each server and keep track of who their peers are + for _, server := range c.metaServers { + url := c.url(server) + "/peers" + resp, err := http.Get(url) + if err != nil { + continue + } + defer resp.Body.Close() + + // This meta-server might not be ready to answer, continue on + if resp.StatusCode != http.StatusOK { + continue + } + + dec := json.NewDecoder(resp.Body) + var peers []string + if err := dec.Decode(&peers); err != nil { + continue + } + + for _, p := range peers { + distinct[p] = struct{}{} + } + } + + // Return the unique set of peer addresses + var peers []string + for k := range distinct { + peers = append(peers, k) + } + return peers +} + func (c *Client) url(server string) string { url := fmt.Sprintf("://%s", server) diff --git a/services/meta/handler.go b/services/meta/handler.go index 863ba8becf8..af3eb6e2631 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -10,6 +10,7 @@ import ( "log" "net/http" "os" + "runtime" "strconv" "strings" "sync" @@ -38,6 +39,7 @@ type handler struct { apply(b []byte) error join(n *NodeInfo) error otherMetaServersHTTP() []string + peers() []string } s *Service @@ -84,6 +86,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.WrapHandler("ping", h.servePing).ServeHTTP(w, r) case "/lease": h.WrapHandler("lease", h.serveLease).ServeHTTP(w, r) + case "/peers": + h.WrapHandler("peers", h.servePeers).ServeHTTP(w, r) default: h.WrapHandler("snapshot", h.serveSnapshot).ServeHTTP(w, r) } @@ -120,7 +124,7 @@ func (h *handler) isClosed() bool { // serveExec executes the requested command. func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { if h.isClosed() { - h.httpError(fmt.Errorf("server closed"), w, http.StatusInternalServerError) + h.httpError(fmt.Errorf("server closed"), w, http.StatusServiceUnavailable) return } @@ -137,6 +141,7 @@ func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { h.httpError(err, w, http.StatusInternalServerError) return } + err := h.store.join(n) if err == raft.ErrNotLeader { l := h.store.leaderHTTP() @@ -307,6 +312,13 @@ func (h *handler) servePing(w http.ResponseWriter, r *http.Request) { h.httpError(fmt.Errorf("one or more metaservers not up"), w, http.StatusInternalServerError) } +func (h *handler) servePeers(w http.ResponseWriter, r *http.Request) { + enc := json.NewEncoder(w) + if err := enc.Encode(h.store.peers()); err != nil { + h.httpError(err, w, http.StatusInternalServerError) + } +} + // serveLease func (h *handler) serveLease(w http.ResponseWriter, r *http.Request) { var name, nodeIDStr string @@ -442,8 +454,10 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler defer func() { if err := recover(); err != nil { + b := make([]byte, 1024) + runtime.Stack(b, false) logLine := buildLogLine(l, r, start) - logLine = fmt.Sprintf(`%s [panic:%s]`, logLine, err) + logLine = fmt.Sprintf("%s [panic:%s]\n%s", logLine, err, string(b)) weblog.Println(logLine) } }() diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index b409d8c18ee..fd2be2a58e7 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -88,7 +88,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er } // If no peers are set in the config or there is one and we are it, then start as a single server. - if len(peers) <= 1 { + if len(initializePeers) <= 1 { config.EnableSingleNode = true // Ensure we can always become the leader @@ -96,9 +96,12 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er // For single-node clusters, we can update the raft peers before we start the cluster // just in case the hostname has changed. - if err := r.peerStore.SetPeers([]string{r.addr}); err != nil { - return err + if !raft.PeerContained(peers, r.addr) { + if err := r.peerStore.SetPeers([]string{r.addr}); err != nil { + return err + } } + peers = []string{r.addr} } @@ -184,7 +187,7 @@ func (r *raftState) close() error { // apply applies a serialized command to the raft log. func (r *raftState) apply(b []byte) error { - // Apply to raft log. + // Apply to raft log.` f := r.raft.Apply(b, 0) if err := f.Error(); err != nil { return err @@ -214,15 +217,6 @@ func (r *raftState) snapshot() error { // addPeer adds addr to the list of peers in the cluster. func (r *raftState) addPeer(addr string) error { - // peers, err := r.peerStore.Peers() - // if err != nil { - // return err - // } - // peers = append(peers, addr) - // if fut := r.raft.SetPeers(peers); fut.Error() != nil { - // return fut.Error() - // } - peers, err := r.peerStore.Peers() if err != nil { return err diff --git a/services/meta/service.go b/services/meta/service.go index fd923705be8..923d73a98fb 100644 --- a/services/meta/service.go +++ b/services/meta/service.go @@ -110,9 +110,6 @@ func (s *Service) Open() error { // Open the store s.store = newStore(s.config, s.httpAddr, s.raftAddr) - if err := s.store.open(s.RaftListener); err != nil { - return err - } handler := newHandler(s.config, s) handler.logger = s.Logger @@ -121,6 +118,11 @@ func (s *Service) Open() error { // Begin listening for requests in a separate goroutine. go s.serve() + + if err := s.store.open(s.RaftListener); err != nil { + return err + } + return nil } diff --git a/services/meta/store.go b/services/meta/store.go index 6c2405d55bc..20d032cfdea 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/services/meta/internal" "github.com/gogo/protobuf/proto" @@ -74,16 +75,28 @@ func newStore(c *Config, httpAddr, raftAddr string) *store { func (s *store) open(raftln net.Listener) error { s.logger.Printf("Using data dir: %v", s.path) - // See if this server needs to join the raft consensus group + joinPeers, err := s.filterAddr(s.config.JoinPeers, s.httpAddr) + if err != nil { + return err + } + joinPeers = s.config.JoinPeers + var initializePeers []string - if len(s.config.JoinPeers) > 0 { - c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled) - data := c.retryUntilSnapshot(0) - for _, n := range data.MetaNodes { - initializePeers = append(initializePeers, n.TCPHost) + if len(joinPeers) > 0 { + c := NewClient(joinPeers, s.config.HTTPSEnabled) + for { + peers := c.peers() + spew.Dump(peers) + if len(s.config.JoinPeers)-len(peers) == 0 { + initializePeers = peers + break + } + + s.logger.Printf("Waiting for %d join peers", len(s.config.JoinPeers)-len(peers)) + time.Sleep(time.Second) } - initializePeers = append(initializePeers, s.raftAddr) } + initializePeers = append(initializePeers, s.raftAddr) if err := func() error { s.mu.Lock() @@ -110,8 +123,14 @@ func (s *store) open(raftln net.Listener) error { return err } - if len(s.config.JoinPeers) > 0 { - c := NewClient(s.config.JoinPeers, s.config.HTTPSEnabled) + // Wait for a leader to be elected so we know the raft log is loaded + // and up to date + if err := s.waitForLeader(0); err != nil { + return err + } + + if len(joinPeers) > 0 { + c := NewClient(joinPeers, s.config.HTTPSEnabled) if err := c.Open(); err != nil { return err } @@ -122,12 +141,6 @@ func (s *store) open(raftln net.Listener) error { } } - // Wait for a leader to be elected so we know the raft log is loaded - // and up to date - if err := s.waitForLeader(0); err != nil { - return err - } - // Make sure this server is in the list of metanodes peers, err := s.raftState.peers() if err != nil { @@ -149,6 +162,50 @@ func (s *store) open(raftln net.Listener) error { return nil } +// peers returns the raft peers known to this store +func (s *store) peers() []string { + if s.raftState == nil { + return []string{s.raftAddr} + } + peers, err := s.raftState.peers() + if err != nil { + return []string{s.raftAddr} + } + return peers +} + +func (s *store) filterAddr(addrs []string, filter string) ([]string, error) { + host, port, err := net.SplitHostPort(filter) + if err != nil { + return nil, err + } + + ip, err := net.ResolveIPAddr("ip", host) + if err != nil { + return nil, err + } + + var joinPeers []string + for _, addr := range addrs { + joinHost, joinPort, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + joinIp, err := net.ResolveIPAddr("ip", joinHost) + if err != nil { + return nil, err + } + + // Don't allow joining ourselves + if ip.String() == joinIp.String() && port == joinPort { + continue + } + joinPeers = append(joinPeers, addr) + } + return joinPeers, nil +} + func (s *store) openRaft(initializePeers []string, raftln net.Listener) error { rs := newRaftState(s.config, s.raftAddr) rs.logger = s.logger @@ -288,6 +345,9 @@ func (s *store) index() uint64 { // apply applies a command to raft. func (s *store) apply(b []byte) error { + if s.raftState == nil { + return fmt.Errorf("store not open") + } return s.raftState.apply(b) } From d9f1df0ecfd0b6fcc4a0c890422769e7e837a8c4 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 9 Feb 2016 14:42:30 -0600 Subject: [PATCH 02/15] sane cluster starting with join args --- cmd/influxd/run/command.go | 10 ++------ services/meta/client.go | 1 - services/meta/store.go | 47 +++++++++++++++++++------------------- 3 files changed, 25 insertions(+), 33 deletions(-) diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index f00b9291ed7..47e9bf6659a 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -14,7 +14,6 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/influxdata/influxdb" ) const logo = ` @@ -95,13 +94,8 @@ func (cmd *Command) Run(args ...string) error { return fmt.Errorf("apply env config: %v", err) } - // If we have a node ID, ignore the join argument - // We are not using the reference to this node var, just checking - // to see if we have a node ID on disk - if node, _ := influxdb.LoadNode(config.Meta.Dir, []string{config.Meta.HTTPBindAddress}); node == nil || node.ID == 0 { - if options.Join != "" { - config.Meta.JoinPeers = strings.Split(options.Join, ",") - } + if options.Join != "" { + config.Meta.JoinPeers = strings.Split(options.Join, ",") } // Validate the configuration. diff --git a/services/meta/client.go b/services/meta/client.go index 03595c93213..25132d48912 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -843,7 +843,6 @@ func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error { // Something failed, try the next node currentServer++ } - return nil } func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*NodeInfo, error) { diff --git a/services/meta/store.go b/services/meta/store.go index 20d032cfdea..e88bbc5473f 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/services/meta/internal" "github.com/gogo/protobuf/proto" @@ -86,7 +85,6 @@ func (s *store) open(raftln net.Listener) error { c := NewClient(joinPeers, s.config.HTTPSEnabled) for { peers := c.peers() - spew.Dump(peers) if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break @@ -96,31 +94,19 @@ func (s *store) open(raftln net.Listener) error { time.Sleep(time.Second) } } - initializePeers = append(initializePeers, s.raftAddr) - if err := func() error { - s.mu.Lock() - defer s.mu.Unlock() - - // Check if store has already been opened. - if s.opened { - return ErrStoreOpen - } - s.opened = true - - // Create the root directory if it doesn't already exist. - if err := os.MkdirAll(s.path, 0777); err != nil { - return fmt.Errorf("mkdir all: %s", err) - } + if err := s.setOpen(); err != nil { + return err + } - // Open the raft store. - if err := s.openRaft(initializePeers, raftln); err != nil { - return fmt.Errorf("raft: %s", err) - } + // Create the root directory if it doesn't already exist. + if err := os.MkdirAll(s.path, 0777); err != nil { + return fmt.Errorf("mkdir all: %s", err) + } - return nil - }(); err != nil { - return err + // Open the raft store. + if err := s.openRaft(initializePeers, raftln); err != nil { + return fmt.Errorf("raft: %s", err) } // Wait for a leader to be elected so we know the raft log is loaded @@ -162,6 +148,17 @@ func (s *store) open(raftln net.Listener) error { return nil } +func (s *store) setOpen() error { + s.mu.Lock() + defer s.mu.Unlock() + // Check if store has already been opened. + if s.opened { + return ErrStoreOpen + } + s.opened = true + return nil +} + // peers returns the raft peers known to this store func (s *store) peers() []string { if s.raftState == nil { @@ -207,6 +204,8 @@ func (s *store) filterAddr(addrs []string, filter string) ([]string, error) { } func (s *store) openRaft(initializePeers []string, raftln net.Listener) error { + s.mu.Lock() + defer s.mu.Unlock() rs := newRaftState(s.config, s.raftAddr) rs.logger = s.logger rs.path = s.path From 807354f1950a1df703acde82d3ac20abcd9d5bec Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 09:46:17 -0600 Subject: [PATCH 03/15] passing test suite... hopefully --- services/meta/raft_state.go | 6 +- services/meta/service_test.go | 212 +++++++++++++++++++----------- services/meta/store.go | 20 +-- services/opentsdb/service_test.go | 17 ++- 4 files changed, 161 insertions(+), 94 deletions(-) diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index fd2be2a58e7..73e1084e89c 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -1,7 +1,6 @@ package meta import ( - "errors" "fmt" "io/ioutil" "log" @@ -159,6 +158,9 @@ func (r *raftState) logLeaderChanges() { } func (r *raftState) close() error { + if r == nil { + return nil + } if r.closing != nil { close(r.closing) } @@ -238,7 +240,7 @@ func (r *raftState) addPeer(addr string) error { func (r *raftState) removePeer(addr string) error { // Only do this on the leader if !r.isLeader() { - return errors.New("not the leader") + return raft.ErrNotLeader } if fut := r.raft.RemovePeer(addr); fut.Error() != nil { return fut.Error() diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 829260b8d99..27d28843238 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -2,6 +2,7 @@ package meta_test import ( "encoding/json" + "fmt" "io/ioutil" "net" "os" @@ -772,47 +773,61 @@ func TestMetaService_Shards(t *testing.T) { func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Parallel() + joinPeers := freePorts(4) + cfg1 := newConfig() + cfg1.HTTPBindAddress = joinPeers[0] defer os.RemoveAll(cfg1.Dir) cfg2 := newConfig() + cfg2.HTTPBindAddress = joinPeers[1] defer os.RemoveAll(cfg2.Dir) cfg3 := newConfig() + cfg3.HTTPBindAddress = joinPeers[2] defer os.RemoveAll(cfg3.Dir) cfg4 := newConfig() + cfg4.HTTPBindAddress = joinPeers[3] defer os.RemoveAll(cfg4.Dir) + var wg sync.WaitGroup + wg.Add(2) + cfg1.JoinPeers = joinPeers[0:2] s1 := newService(cfg1) - if err := s1.Open(); err != nil { - t.Fatalf(err.Error()) - } + go func() { + defer wg.Done() + if err := s1.Open(); err != nil { + t.Fatalf(err.Error()) + } + }() defer s1.Close() - cfg2.JoinPeers = []string{s1.HTTPAddr()} + cfg2.JoinPeers = joinPeers[0:2] s2 := newService(cfg2) - if err := s2.Open(); err != nil { - t.Fatal(err.Error()) - } - defer s2.Close() - - func() { - cfg3.JoinPeers = []string{s2.HTTPAddr()} - s3 := newService(cfg3) - if err := s3.Open(); err != nil { + go func() { + defer wg.Done() + if err := s2.Open(); err != nil { t.Fatal(err.Error()) } - defer s3.Close() + }() + defer s2.Close() + wg.Wait() - c1 := meta.NewClient([]string{s1.HTTPAddr()}, false) - if err := c1.Open(); err != nil { - t.Fatal(err.Error()) - } - defer c1.Close() + cfg3.JoinPeers = joinPeers[0:3] + s3 := newService(cfg3) + if err := s3.Open(); err != nil { + t.Fatal(err.Error()) + } + defer s3.Close() - metaNodes, _ := c1.MetaNodes() - if len(metaNodes) != 3 { - t.Fatalf("meta nodes wrong: %v", metaNodes) - } - }() + c1 := meta.NewClient(joinPeers[0:3], false) + if err := c1.Open(); err != nil { + t.Fatal(err.Error()) + } + defer c1.Close() + + metaNodes, _ := c1.MetaNodes() + if len(metaNodes) != 3 { + t.Fatalf("meta nodes wrong: %v", metaNodes) + } c := meta.NewClient([]string{s1.HTTPAddr()}, false) if err := c.Open(); err != nil { @@ -824,19 +839,25 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Fatal(res.Err) } - metaNodes, _ := c.MetaNodes() + metaNodes, _ = c.MetaNodes() if len(metaNodes) != 2 { t.Fatalf("meta nodes wrong: %v", metaNodes) } - cfg4.JoinPeers = []string{s1.HTTPAddr()} + cfg4.JoinPeers = []string{joinPeers[0], joinPeers[1], joinPeers[3]} s4 := newService(cfg4) if err := s4.Open(); err != nil { t.Fatal(err.Error()) } defer s4.Close() - metaNodes, _ = c.MetaNodes() + c2 := meta.NewClient(cfg4.JoinPeers, false) + if err := c2.Open(); err != nil { + t.Fatal(err.Error()) + } + defer c2.Close() + + metaNodes, _ = c2.MetaNodes() if len(metaNodes) != 3 { t.Fatalf("meta nodes wrong: %v", metaNodes) } @@ -850,39 +871,48 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + + var wg sync.WaitGroup + wg.Add(len(cfgs)) + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } + go func(srv *testService) { + defer wg.Done() + if err := srv.Open(); err != nil { + t.Fatal(err.Error()) + } + }(srvs[i]) defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + wg.Wait() - c := meta.NewClient([]string{srvs[2].HTTPAddr()}, false) - if err := c.Open(); err != nil { - t.Fatal(err.Error()) - } - defer c.Close() + for i := range cfgs { + c := meta.NewClient([]string{joinPeers[i]}, false) + if err := c.Open(); err != nil { + t.Fatal(err.Error()) + } + defer c.Close() - metaNodes, _ := c.MetaNodes() - if len(metaNodes) != 3 { - t.Fatalf("meta nodes wrong: %v", metaNodes) - } + metaNodes, _ := c.MetaNodes() + if len(metaNodes) != 3 { + t.Fatalf("node %d - meta nodes wrong: %v", i, metaNodes) + } - if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err) - } + if _, err := c.CreateDatabase(fmt.Sprintf("foo%d", i)); err != nil { + t.Fatalf("node %d: %s", i, err) + } - if db, err := c.Database("foo"); db == nil || err != nil { - t.Fatalf("database foo wasn't created: %s", err.Error()) + if db, err := c.Database(fmt.Sprintf("foo%d", i)); db == nil || err != nil { + t.Fatalf("node %d: database foo wasn't created: %s", i, err.Error()) + } } } @@ -893,26 +923,31 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + var swg sync.WaitGroup + swg.Add(len(cfgs)) + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } - c.HTTPBindAddress = srvs[i].HTTPAddr() - c.BindAddress = srvs[i].RaftAddr() - c.JoinPeers = nil + go func(i int, srv *testService) { + defer swg.Done() + if err := srv.Open(); err != nil { + t.Logf("opening server %d", i) + t.Fatal(err.Error()) + } + }(i, srvs[i]) + defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + swg.Wait() - c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false) + c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { t.Fatal(err.Error()) } @@ -954,12 +989,11 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { // give them a second to shut down time.Sleep(time.Second) - // when we start back up they need to happen simultaneously, otherwise - // a leader won't get elected + // need to start them all at once so they can discover the bind addresses for raft var wg sync.WaitGroup + wg.Add(len(cfgs)) for i, cfg := range cfgs { srvs[i] = newService(cfg) - wg.Add(1) go func(srv *testService) { if err := srv.Open(); err != nil { panic(err) @@ -971,7 +1005,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { wg.Wait() time.Sleep(time.Second) - c2 := meta.NewClient([]string{srvs[0].HTTPAddr()}, false) + c2 := meta.NewClient(joinPeers, false) if err := c2.Open(); err != nil { t.Fatal(err) } @@ -1186,42 +1220,46 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { func TestMetaService_Ping(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) - for i := range cfgs { - c := newConfig() + joinPeers := freePorts(len(cfgs)) + + var swg sync.WaitGroup + swg.Add(len(cfgs)) + for i, _ := range cfgs { + c := newConfig() + c.HTTPBindAddress = joinPeers[i] + c.JoinPeers = joinPeers cfgs[i] = c - if i > 0 { - c.JoinPeers = []string{srvs[0].HTTPAddr()} - } srvs[i] = newService(c) - if err := srvs[i].Open(); err != nil { - t.Fatal(err.Error()) - } - c.HTTPBindAddress = srvs[i].HTTPAddr() - c.BindAddress = srvs[i].RaftAddr() - c.JoinPeers = nil + go func(i int, srv *testService) { + defer swg.Done() + if err := srv.Open(); err != nil { + t.Fatalf("error opening server %d: %s", i, err.Error()) + } + }(i, srvs[i]) defer srvs[i].Close() defer os.RemoveAll(c.Dir) } + swg.Wait() - c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false) + c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { t.Fatal(err.Error()) } defer c.Close() if err := c.Ping(false); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false all failed: %s", err.Error()) } if err := c.Ping(true); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false true failed: %s", err.Error()) } srvs[1].Close() if err := c.Ping(false); err != nil { - t.Fatal(err.Error()) + t.Fatalf("ping false some failed: %s", err.Error()) } if err := c.Ping(true); err == nil { @@ -1375,3 +1413,17 @@ func mustMarshalJSON(v interface{}) string { } return string(b) } + +func freePort() string { + l, _ := net.Listen("tcp", "127.0.0.1:0") + defer l.Close() + return l.Addr().String() +} + +func freePorts(i int) []string { + var ports []string + for j := 0; j < i; j++ { + ports = append(ports, freePort()) + } + return ports +} diff --git a/services/meta/store.go b/services/meta/store.go index e88bbc5473f..c6b7c6a59be 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -90,7 +90,11 @@ func (s *store) open(raftln net.Listener) error { break } - s.logger.Printf("Waiting for %d join peers", len(s.config.JoinPeers)-len(peers)) + if len(peers) > len(s.config.JoinPeers) { + s.logger.Printf("waiting for join peers to match config specified. found %v, config specified %v", peers, s.config.JoinPeers) + } else { + s.logger.Printf("Waiting for %d join peers. Have %v. Asking nodes: %v", len(s.config.JoinPeers)-len(peers), peers, joinPeers) + } time.Sleep(time.Second) } } @@ -109,12 +113,6 @@ func (s *store) open(raftln net.Listener) error { return fmt.Errorf("raft: %s", err) } - // Wait for a leader to be elected so we know the raft log is loaded - // and up to date - if err := s.waitForLeader(0); err != nil { - return err - } - if len(joinPeers) > 0 { c := NewClient(joinPeers, s.config.HTTPSEnabled) if err := c.Open(); err != nil { @@ -127,6 +125,12 @@ func (s *store) open(raftln net.Listener) error { } } + // Wait for a leader to be elected so we know the raft log is loaded + // and up to date + if err := s.waitForLeader(0); err != nil { + return err + } + // Make sure this server is in the list of metanodes peers, err := s.raftState.peers() if err != nil { @@ -295,7 +299,7 @@ func (s *store) isLeader() bool { func (s *store) leader() string { s.mu.RLock() defer s.mu.RUnlock() - if s.raftState == nil { + if s.raftState == nil || s.raftState.raft == nil { return "" } return s.raftState.raft.Leader() diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index 2a409b7ce9d..3d9d412594b 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -65,11 +65,20 @@ func TestService_Telnet(t *testing.T) { if err := conn.Close(); err != nil { t.Fatal(err) } - time.Sleep(10 * time.Millisecond) - // Verify that the writer was called. - if atomic.LoadInt32(&called) == 0 { - t.Fatal("points writer not called") + tick := time.Tick(10 * time.Millisecond) + timeout := time.After(10 * time.Second) + + for { + select { + case <-tick: + // Verify that the writer was called. + if atomic.LoadInt32(&called) > 0 { + return + } + case <-timeout: + t.Fatal("points writer not called") + } } } From f861d5811ebc262a32cf84c8e3b34d26ad81495e Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 12:16:54 -0600 Subject: [PATCH 04/15] fix adhoc joining of cluster --- services/meta/client.go | 47 ++++++++++++++++++++++++++++++----------- services/meta/store.go | 6 ++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/services/meta/client.go b/services/meta/client.go index 25132d48912..ea5807ab5f2 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -1120,8 +1120,8 @@ func (c *Client) getSnapshot(server string, index uint64) (*Data, error) { // peers returns the TCPHost addresses of all the metaservers func (c *Client) peers() []string { - distinct := map[string]struct{}{} + var peers Peers // query each server and keep track of who their peers are for _, server := range c.metaServers { url := c.url(server) + "/peers" @@ -1137,22 +1137,15 @@ func (c *Client) peers() []string { } dec := json.NewDecoder(resp.Body) - var peers []string - if err := dec.Decode(&peers); err != nil { + var p []string + if err := dec.Decode(&p); err != nil { continue } - - for _, p := range peers { - distinct[p] = struct{}{} - } + peers = peers.Append(p...) } // Return the unique set of peer addresses - var peers []string - for k := range distinct { - peers = append(peers, k) - } - return peers + return []string(peers.Unique()) } func (c *Client) url(server string) string { @@ -1216,6 +1209,36 @@ func (c *Client) updateAuthCache() { c.authCache = newCache } +type Peers []string + +func (peers Peers) Append(p ...string) Peers { + peers = append(peers, p...) + + return peers.Unique() +} + +func (peers Peers) Unique() Peers { + distinct := map[string]struct{}{} + for _, p := range peers { + distinct[p] = struct{}{} + } + + var u Peers + for k := range distinct { + u = append(u, k) + } + return u +} + +func (peers Peers) Contains(peer string) bool { + for _, p := range peers { + if p == peer { + return true + } + } + return false +} + type errRedirect struct { host string } diff --git a/services/meta/store.go b/services/meta/store.go index c6b7c6a59be..35c8a31b08c 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -85,6 +85,9 @@ func (s *store) open(raftln net.Listener) error { c := NewClient(joinPeers, s.config.HTTPSEnabled) for { peers := c.peers() + if !Peers(peers).Contains(s.raftAddr) { + peers = append(peers, s.raftAddr) + } if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break @@ -356,6 +359,9 @@ func (s *store) apply(b []byte) error { // join adds a new server to the metaservice and raft func (s *store) join(n *NodeInfo) error { + if s.raftState == nil { + return fmt.Errorf("store not open") + } if err := s.raftState.addPeer(n.TCPHost); err != nil { return err } From b17293f75c1506c6f32f080786c29caf49028e46 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 12:45:52 -0600 Subject: [PATCH 05/15] fix data race --- services/meta/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/meta/store.go b/services/meta/store.go index 35c8a31b08c..ac33987e6eb 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -359,12 +359,16 @@ func (s *store) apply(b []byte) error { // join adds a new server to the metaservice and raft func (s *store) join(n *NodeInfo) error { + s.mu.RLock() if s.raftState == nil { + s.mu.RUnlock() return fmt.Errorf("store not open") } if err := s.raftState.addPeer(n.TCPHost); err != nil { + s.mu.RUnlock() return err } + s.mu.RUnlock() return s.createMetaNode(n.Host, n.TCPHost) } From 7e62201793fc41f914ba316fe8256aede4c9aa71 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 14:15:53 -0600 Subject: [PATCH 06/15] specify bind address meta test --- services/meta/service_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 27d28843238..34556aeb472 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -930,6 +930,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { for i, _ := range cfgs { c := newConfig() c.HTTPBindAddress = joinPeers[i] + c.BindAddress = "127.0.0.1:0" c.JoinPeers = joinPeers cfgs[i] = c From 92e8516660167fc4aeb07acf4a400b37f9a5f9a5 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 15:43:34 -0600 Subject: [PATCH 07/15] specify raft bind address with real random ports --- services/meta/service_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 34556aeb472..75278aa6b12 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -924,13 +924,14 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { cfgs := make([]*meta.Config, 3) srvs := make([]*testService, 3) joinPeers := freePorts(len(cfgs)) + raftPeers := freePorts(len(cfgs)) var swg sync.WaitGroup swg.Add(len(cfgs)) for i, _ := range cfgs { c := newConfig() c.HTTPBindAddress = joinPeers[i] - c.BindAddress = "127.0.0.1:0" + c.BindAddress = raftPeers[i] c.JoinPeers = joinPeers cfgs[i] = c From 360f40561edadd6008e6ba831223dc9caecc71d6 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 20:19:24 -0600 Subject: [PATCH 08/15] misc fixes and changelog --- CHANGELOG.md | 1 + cmd/influxd/run/server.go | 2 +- services/meta/raft_state.go | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0beee5d825..d65c6e4b9e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#5419](https://github.com/influxdata/influxdb/pull/5419): Graphite: Support matching tags multiple times Thanks @m4ce - [#5598](https://github.com/influxdata/influxdb/pull/5598): Client: Add Ping to v2 client @PSUdaemon - [#4125](https://github.com/influxdata/influxdb/pull/4125): Admin UI: Fetch and display server version on connect. Thanks @alexiri! +- [#5602](https://github.com/influxdata/influxdb/pull/5602): Simplify cluster startup for scripting and deployment ### Bugfixes diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 649ea9556bd..2221b69db4f 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -655,7 +655,7 @@ func (s *Server) initializeMetaClient() error { for { n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) if err != nil { - println("Unable to create data node. retry...", err.Error()) + log.Printf("Unable to create data node. retry in 1s: %s", err.Error()) time.Sleep(time.Second) continue } diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index 73e1084e89c..7180aecbb88 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -93,8 +93,8 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er // Ensure we can always become the leader config.DisableBootstrapAfterElect = false - // For single-node clusters, we can update the raft peers before we start the cluster - // just in case the hostname has changed. + // Make sure our peer address is here. This happens with either a single node cluster + // or a node joining the cluster, as no one else has that information yet. if !raft.PeerContained(peers, r.addr) { if err := r.peerStore.SetPeers([]string{r.addr}); err != nil { return err @@ -189,7 +189,7 @@ func (r *raftState) close() error { // apply applies a serialized command to the raft log. func (r *raftState) apply(b []byte) error { - // Apply to raft log.` + // Apply to raft log. f := r.raft.Apply(b, 0) if err := f.Error(); err != nil { return err From df5d587105308bd33717ce8c2912f683915a5d8c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 20:39:07 -0600 Subject: [PATCH 09/15] make meta test suite less racy --- services/meta/service_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 75278aa6b12..b606c679bd1 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -774,18 +774,23 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Parallel() joinPeers := freePorts(4) + raftPeers := freePorts(4) cfg1 := newConfig() cfg1.HTTPBindAddress = joinPeers[0] + cfg1.BindAddress = raftPeers[0] defer os.RemoveAll(cfg1.Dir) cfg2 := newConfig() cfg2.HTTPBindAddress = joinPeers[1] + cfg2.BindAddress = raftPeers[1] defer os.RemoveAll(cfg2.Dir) cfg3 := newConfig() cfg3.HTTPBindAddress = joinPeers[2] + cfg3.BindAddress = raftPeers[2] defer os.RemoveAll(cfg3.Dir) cfg4 := newConfig() cfg4.HTTPBindAddress = joinPeers[3] + cfg4.BindAddress = raftPeers[3] defer os.RemoveAll(cfg4.Dir) var wg sync.WaitGroup @@ -1259,6 +1264,8 @@ func TestMetaService_Ping(t *testing.T) { } srvs[1].Close() + // give the server time to close + time.Sleep(time.Second) if err := c.Ping(false); err != nil { t.Fatalf("ping false some failed: %s", err.Error()) From e9a2c33556683ee96584a1b81a3fd6699731d170 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 20:59:05 -0600 Subject: [PATCH 10/15] give less time to lose lease on random port for test --- services/meta/service_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/meta/service_test.go b/services/meta/service_test.go index b606c679bd1..3b328787631 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -788,10 +788,6 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { cfg3.HTTPBindAddress = joinPeers[2] cfg3.BindAddress = raftPeers[2] defer os.RemoveAll(cfg3.Dir) - cfg4 := newConfig() - cfg4.HTTPBindAddress = joinPeers[3] - cfg4.BindAddress = raftPeers[3] - defer os.RemoveAll(cfg4.Dir) var wg sync.WaitGroup wg.Add(2) @@ -849,7 +845,11 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Fatalf("meta nodes wrong: %v", metaNodes) } - cfg4.JoinPeers = []string{joinPeers[0], joinPeers[1], joinPeers[3]} + cfg4 := newConfig() + cfg4.HTTPBindAddress = freePort() + cfg4.BindAddress = freePort() + cfg4.JoinPeers = []string{joinPeers[0], joinPeers[1], cfg4.HTTPBindAddress} + defer os.RemoveAll(cfg4.Dir) s4 := newService(cfg4) if err := s4.Open(); err != nil { t.Fatal(err.Error()) From 52077b2dfc678375c27d061f07cb821a353374da Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 21:27:07 -0600 Subject: [PATCH 11/15] fix race condition --- services/meta/store.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/meta/store.go b/services/meta/store.go index ac33987e6eb..c9f511e29c3 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -168,6 +168,8 @@ func (s *store) setOpen() error { // peers returns the raft peers known to this store func (s *store) peers() []string { + s.mu.RLock() + defer s.mu.RUnlock() if s.raftState == nil { return []string{s.raftAddr} } From 1b25c0cb80dad8ff34b14cdbd7ba1e36ca8c1249 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 10 Feb 2016 22:13:16 -0600 Subject: [PATCH 12/15] ask for a free port immediatly before using to prevent it being returned to the available pool --- services/meta/service_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 3b328787631..7c0870cf908 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -784,10 +784,6 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { cfg2.HTTPBindAddress = joinPeers[1] cfg2.BindAddress = raftPeers[1] defer os.RemoveAll(cfg2.Dir) - cfg3 := newConfig() - cfg3.HTTPBindAddress = joinPeers[2] - cfg3.BindAddress = raftPeers[2] - defer os.RemoveAll(cfg3.Dir) var wg sync.WaitGroup wg.Add(2) @@ -812,6 +808,13 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { defer s2.Close() wg.Wait() + cfg3 := newConfig() + joinPeers[2] = freePort() + cfg3.HTTPBindAddress = joinPeers[2] + raftPeers[2] = freePort() + cfg3.BindAddress = raftPeers[2] + defer os.RemoveAll(cfg3.Dir) + cfg3.JoinPeers = joinPeers[0:3] s3 := newService(cfg3) if err := s3.Open(); err != nil { From ddcfac7e8e83f81a39ee22de7f21e0b0b3f78c13 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 12 Feb 2016 10:15:22 -0700 Subject: [PATCH 13/15] Remove peers.json No longer needed now that peers are pull from the meta nodes. --- cmd/influxd/run/server.go | 3 +++ services/meta/raft_state.go | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 2221b69db4f..a249ec017aa 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -133,6 +133,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { return nil, fmt.Errorf("mkdir all: %s", err) } + // 0.11 we no longer use peers.json. Remove the file if we have one on disk. + os.RemoveAll(filepath.Join(c.Meta.Dir, "peers.json")) + // load the node information metaAddresses := []string{nodeAddr} if !c.Meta.Enabled { diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index 7180aecbb88..6d7140bb3c9 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -72,7 +72,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput) // Create peer storage. - r.peerStore = raft.NewJSONPeers(r.path, r.transport) + r.peerStore = &peerStore{} // This server is joining the raft cluster for the first time if initializePeers are passed in if len(initializePeers) > 0 { @@ -109,7 +109,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er // is difficult to resolve automatically because we need to have all the raft peers agree on the current members // of the cluster before we can change them. if len(peers) > 0 && !raft.PeerContained(peers, r.addr) { - r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.addr, r.path) + r.logger.Printf("%s is not in the list of raft peers. Please ensure all nodes have the same meta nodes configured", r.addr, r.path) return fmt.Errorf("peers out of sync: %v not in %v", r.addr, peers) } @@ -322,3 +322,17 @@ func (l *raftLayer) Accept() (net.Conn, error) { return l.ln.Accept() } // Close closes the layer. func (l *raftLayer) Close() error { return l.ln.Close() } + +// peerStore is an in-memory implementation of raft.PeerStore +type peerStore struct { + peers []string +} + +func (m *peerStore) Peers() ([]string, error) { + return m.peers, nil +} + +func (m *peerStore) SetPeers(peers []string) error { + m.peers = peers + return nil +} From 7ad31fa6ab996a12569f91f4dc06bed9771ad741 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 12 Feb 2016 08:06:05 -0600 Subject: [PATCH 14/15] address pr feedback --- cmd/influxd/run/server.go | 17 +++--- services/meta/service_test.go | 106 +++++++++++++++++----------------- 2 files changed, 60 insertions(+), 63 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index a249ec017aa..59aca76a1b1 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -655,17 +655,14 @@ func (s *Server) initializeMetaClient() error { if err := s.MetaClient.Open(); err != nil { return err } - for { - n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) - if err != nil { - log.Printf("Unable to create data node. retry in 1s: %s", err.Error()) - time.Sleep(time.Second) - continue - } - s.Node.ID = n.ID - - break + n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) + for err != nil { + log.Printf("Unable to create data node. retry in 1s: %s", err.Error()) + time.Sleep(time.Second) + n, err = s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr) } + s.Node.ID = n.ID + metaNodes, err := s.MetaClient.MetaNodes() if err != nil { return err diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 7c0870cf908..7f1892306f5 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -118,21 +118,21 @@ func TestMetaService_Databases(t *testing.T) { // Create two databases. db, err := c.CreateDatabase("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } db, err = c.CreateDatabase("db1") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db1" { t.Fatalf("db name wrong: %s", db.Name) } dbs, err := c.Databases() if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } if len(dbs) != 2 { t.Fatalf("expected 2 databases but got %d", len(dbs)) @@ -187,7 +187,7 @@ func TestMetaService_CreateRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -240,7 +240,7 @@ func TestMetaService_SetDefaultRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -276,7 +276,7 @@ func TestMetaService_DropRetentionPolicy(t *testing.T) { db, err := c.Database("db0") if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } else if db.Name != "db0" { t.Fatalf("db name wrong: %s", db.Name) } @@ -669,7 +669,7 @@ func TestMetaService_Subscriptions_Drop(t *testing.T) { // subscription is unknown. res := c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION foo ON db0."default"`)) if got, exp := res.Err, meta.ErrSubscriptionNotFound; got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // Create a subscription. @@ -681,20 +681,20 @@ func TestMetaService_Subscriptions_Drop(t *testing.T) { // the database is unknown. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON foo."default"`)) if got, exp := res.Err, influxdb.ErrDatabaseNotFound("foo"); got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // DROP SUBSCRIPTION returns an influxdb.ErrRetentionPolicyNotFound // when the retention policy is unknown. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."foo_policy"`)) if got, exp := res.Err, influxdb.ErrRetentionPolicyNotFound("foo_policy"); got.Error() != exp.Error() { - t.Fatalf("got: %s, exp: %s", got.Error(), exp) + t.Fatalf("got: %s, exp: %s", got, exp) } // DROP SUBSCRIPTION drops the subsciption if it can find it. res = c.ExecuteStatement(mustParseStatement(`DROP SUBSCRIPTION sub0 ON db0."default"`)) if got := res.Err; got != nil { - t.Fatalf("got: %s, exp: %v", got.Error(), nil) + t.Fatalf("got: %s, exp: %v", got, nil) } if res = c.ExecuteStatement(mustParseStatement(`SHOW SUBSCRIPTIONS`)); res.Err != nil { @@ -719,7 +719,7 @@ func TestMetaService_Shards(t *testing.T) { } if _, err := c.CreateDataNode(exp.Host, exp.TCPHost); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if _, err := c.CreateDatabase("db0"); err != nil { @@ -792,7 +792,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { go func() { defer wg.Done() if err := s1.Open(); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } }() defer s1.Close() @@ -802,7 +802,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { go func() { defer wg.Done() if err := s2.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } }() defer s2.Close() @@ -818,13 +818,13 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { cfg3.JoinPeers = joinPeers[0:3] s3 := newService(cfg3) if err := s3.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer s3.Close() c1 := meta.NewClient(joinPeers[0:3], false) if err := c1.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c1.Close() @@ -835,7 +835,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { c := meta.NewClient([]string{s1.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -855,13 +855,13 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { defer os.RemoveAll(cfg4.Dir) s4 := newService(cfg4) if err := s4.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer s4.Close() c2 := meta.NewClient(cfg4.JoinPeers, false) if err := c2.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c2.Close() @@ -894,7 +894,7 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { go func(srv *testService) { defer wg.Done() if err := srv.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } }(srvs[i]) defer srvs[i].Close() @@ -905,7 +905,7 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { for i := range cfgs { c := meta.NewClient([]string{joinPeers[i]}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -919,7 +919,7 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { } if db, err := c.Database(fmt.Sprintf("foo%d", i)); db == nil || err != nil { - t.Fatalf("node %d: database foo wasn't created: %s", i, err.Error()) + t.Fatalf("node %d: database foo wasn't created: %s", i, err) } } } @@ -948,7 +948,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { defer swg.Done() if err := srv.Open(); err != nil { t.Logf("opening server %d", i) - t.Fatal(err.Error()) + t.Fatal(err) } }(i, srvs[i]) @@ -959,7 +959,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -974,11 +974,11 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c.Database("foo"); db == nil || err != nil { - t.Fatalf("database foo wasn't created: %s", err.Error()) + t.Fatalf("database foo wasn't created: %s", err) } if err := srvs[0].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if _, err := c.CreateDatabase("bar"); err != nil { @@ -986,14 +986,14 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c.Database("bar"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } if err := srvs[1].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if err := srvs[2].Close(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } // give them a second to shut down @@ -1027,7 +1027,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c2.Database("bar"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } if _, err := c2.CreateDatabase("asdf"); err != nil { @@ -1035,7 +1035,7 @@ func TestMetaService_FailureAndRestartCluster(t *testing.T) { } if db, err := c2.Database("asdf"); db == nil || err != nil { - t.Fatalf("database bar wasn't created: %s", err.Error()) + t.Fatalf("database bar wasn't created: %s", err) } } @@ -1058,12 +1058,12 @@ func TestMetaService_NameChangeSingleNode(t *testing.T) { c := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } s.Close() @@ -1073,24 +1073,24 @@ func TestMetaService_NameChangeSingleNode(t *testing.T) { cfg.HTTPBindAddress = "asdf" + ":" + strings.Split(s.HTTPAddr(), ":")[1] s = newService(cfg) if err := s.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer s.Close() c2 := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c2.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c2.Close() db, err := c2.Database("foo") if db == nil || err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } nodes, err := c2.MetaNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } exp := []meta.NodeInfo{{ID: 1, Host: cfg.HTTPBindAddress, TCPHost: cfg.BindAddress}} @@ -1116,7 +1116,7 @@ func TestMetaService_CreateDataNode(t *testing.T) { n, err := c.CreateDataNode(exp.Host, exp.TCPHost) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(n, exp) { @@ -1125,7 +1125,7 @@ func TestMetaService_CreateDataNode(t *testing.T) { nodes, err := c.DataNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(nodes, []meta.NodeInfo{*exp}) { @@ -1149,7 +1149,7 @@ func TestMetaService_DropDataNode(t *testing.T) { n, err := c.CreateDataNode(exp.Host, exp.TCPHost) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(n, exp) { @@ -1158,7 +1158,7 @@ func TestMetaService_DropDataNode(t *testing.T) { nodes, err := c.DataNodes() if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(nodes, []meta.NodeInfo{*exp}) { @@ -1166,11 +1166,11 @@ func TestMetaService_DropDataNode(t *testing.T) { } if _, err := c.CreateDatabase("foo"); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } sg, err := c.CreateShardGroup("foo", "default", time.Now()) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } if !reflect.DeepEqual(sg.Shards[0].Owners, []meta.ShardOwner{{1}}) { @@ -1178,7 +1178,7 @@ func TestMetaService_DropDataNode(t *testing.T) { } if res := c.ExecuteStatement(mustParseStatement("DROP DATA SERVER 1")); res.Err != nil { - t.Fatal(res.Err.Error()) + t.Fatal(res.Err) } rp, _ := c.RetentionPolicy("foo", "default") @@ -1200,7 +1200,7 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { c := meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } id := c.ClusterID() if id == 0 { @@ -1210,12 +1210,12 @@ func TestMetaService_PersistClusterIDAfterRestart(t *testing.T) { s.Close() s = newService(cfg) if err := s.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } c = meta.NewClient([]string{s.HTTPAddr()}, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() @@ -1245,7 +1245,7 @@ func TestMetaService_Ping(t *testing.T) { go func(i int, srv *testService) { defer swg.Done() if err := srv.Open(); err != nil { - t.Fatalf("error opening server %d: %s", i, err.Error()) + t.Fatalf("error opening server %d: %s", i, err) } }(i, srvs[i]) defer srvs[i].Close() @@ -1255,15 +1255,15 @@ func TestMetaService_Ping(t *testing.T) { c := meta.NewClient(joinPeers, false) if err := c.Open(); err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } defer c.Close() if err := c.Ping(false); err != nil { - t.Fatalf("ping false all failed: %s", err.Error()) + t.Fatalf("ping false all failed: %s", err) } if err := c.Ping(true); err != nil { - t.Fatalf("ping false true failed: %s", err.Error()) + t.Fatalf("ping false true failed: %s", err) } srvs[1].Close() @@ -1271,7 +1271,7 @@ func TestMetaService_Ping(t *testing.T) { time.Sleep(time.Second) if err := c.Ping(false); err != nil { - t.Fatalf("ping false some failed: %s", err.Error()) + t.Fatalf("ping false some failed: %s", err) } if err := c.Ping(true); err == nil { @@ -1291,12 +1291,12 @@ func TestMetaService_AcquireLease(t *testing.T) { n1, err := c1.CreateDataNode("foo1:8180", "bar1:8281") if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } n2, err := c2.CreateDataNode("foo2:8180", "bar2:8281") if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } // Client 1 acquires a lease. Should succeed. From cd568548fda669f268c6a2e299e9a9fefb7eb720 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 12 Feb 2016 12:06:23 -0700 Subject: [PATCH 15/15] Fix race in peerStore --- services/meta/raft_state.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/meta/raft_state.go b/services/meta/raft_state.go index 6d7140bb3c9..720555c793c 100644 --- a/services/meta/raft_state.go +++ b/services/meta/raft_state.go @@ -325,14 +325,19 @@ func (l *raftLayer) Close() error { return l.ln.Close() } // peerStore is an in-memory implementation of raft.PeerStore type peerStore struct { + mu sync.RWMutex peers []string } func (m *peerStore) Peers() ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() return m.peers, nil } func (m *peerStore) SetPeers(peers []string) error { + m.mu.Lock() + defer m.mu.Unlock() m.peers = peers return nil }