From 8b5307f6e8bd9077c255e3b7e0729359c22f42ce Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 3 Apr 2015 13:49:54 -0600 Subject: [PATCH] Remove all join URLs from config This removes all join URLs from the config. To join a node to a cluster, the URL of another member of the cluster should be passed on the command line w/ the -join flag. The join URLs can now be any node regardless of whether the node is a broker only or data only node. At join time, the receiving node will redirect the request to a valid broker or data node if it cannot handle the request itself. --- cmd/influxd/config.go | 16 ------ cmd/influxd/config_test.go | 13 ----- cmd/influxd/handler.go | 10 +++- cmd/influxd/run.go | 30 +++++------- cmd/influxd/server_integration_test.go | 15 +++--- influxdb.go | 3 +- server.go | 68 ++++++++++++++++++++------ 7 files changed, 83 insertions(+), 72 deletions(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 3106f8bc0b7..8a3f6a4551c 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -42,9 +42,6 @@ const ( // DefaultSnapshotPort is the default port to serve snapshots from. DefaultSnapshotPort = 8087 - // DefaultJoinURLs represents the default URLs for joining a cluster. - DefaultJoinURLs = "" - // DefaultRetentionCreatePeriod represents how often the server will check to see if new // shard groups need to be created in advance for writing DefaultRetentionCreatePeriod = 45 * time.Minute @@ -116,7 +113,6 @@ type Data struct { RetentionCheckEnabled bool `toml:"retention-check-enabled"` RetentionCheckPeriod Duration `toml:"retention-check-period"` RetentionCreatePeriod Duration `toml:"retention-create-period"` - JoinURLs string `toml:"join-urls"` } // Config represents the configuration format for the influxd binary. @@ -128,10 +124,6 @@ type Config struct { Version string `toml:"-"` InfluxDBVersion string `toml:"-"` - Initialization struct { - JoinURLs string `toml:"join-urls"` - } `toml:"initialization"` - Authentication struct { Enabled bool `toml:"enabled"` } `toml:"authentication"` @@ -321,14 +313,6 @@ func (c *Config) DataDir() string { return p } -func (c *Config) JoinURLs() string { - if c.Initialization.JoinURLs == "" { - return DefaultJoinURLs - } else { - return c.Initialization.JoinURLs - } -} - // ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups. // If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration { diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 159497d430f..5e70680e388 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -21,10 +21,6 @@ const testFile = ` hostname = "myserver.com" port = 8086 -# Controls certain parameters that only take effect until an initial successful -# start-up has occurred. -[initialization] -join-urls = "http://127.0.0.1:8086" # Control authentication [authentication] @@ -110,7 +106,6 @@ retention-auto-create = false retention-check-enabled = true retention-check-period = "5m" enabled = false -join-urls = "http://127.0.0.1:8087" [continuous_queries] disabled = true @@ -157,10 +152,6 @@ func TestParseConfig(t *testing.T) { t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp) } - if c.JoinURLs() != "http://127.0.0.1:8086" { - t.Fatalf("JoinURLs mistmatch: %v", c.JoinURLs()) - } - if !c.Authentication.Enabled { t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled) } @@ -267,10 +258,6 @@ func TestParseConfig(t *testing.T) { t.Fatalf("data disabled mismatch: %v, got: %v", false, c.Data.Enabled) } - if exp := "http://127.0.0.1:8087"; c.Data.JoinURLs != exp { - t.Fatalf("data join urls mismatch: %v, got: %v", exp, c.Data.JoinURLs) - } - if c.Monitoring.WriteInterval.String() != "1m0s" { t.Fatalf("Monitoring.WriteInterval mismatch: %v", c.Monitoring.WriteInterval) } diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go index 2b7f68fff39..89b656905a3 100644 --- a/cmd/influxd/handler.go +++ b/cmd/influxd/handler.go @@ -70,7 +70,7 @@ func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) { h.redirect(h.Server.BrokerURLs(), w, r) } -// serverMetadata responds to broker requests +// serveMetadata responds to broker requests func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) { if h.Broker == nil && h.Server == nil { log.Println("no broker or server configured to handle messaging endpoints") @@ -128,6 +128,12 @@ func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) { return } + t := h.Broker.Topic(influxdb.BroadcastTopicID) + if t == nil { + http.Error(w, "not found", http.StatusNotFound) + return + } + // Redirect to a valid data URL to handle the request h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r) } @@ -145,5 +151,5 @@ func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) // this is happening frequently, the clients are using a suboptimal endpoint // Redirect the client to a valid data node that can handle the request - http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusSeeOther) + http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusTemporaryRedirect) } diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 8abdd79a2a2..0082883ae9f 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -130,18 +130,11 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in log.Printf("influxdb started, version %s, commit %s", version, commit) // Parse join urls from the --join flag. - var brokerURLs []url.URL - if join == "" { - brokerURLs = parseURLs(cmd.config.JoinURLs()) - } else { - brokerURLs = parseURLs(join) - } - - dataURLs := parseURLs(cmd.config.Data.JoinURLs) + joinURLs := parseURLs(join) // Open broker & raft log, initialize or join as necessary. if cmd.config.Broker.Enabled { - cmd.openBroker(brokerURLs) + cmd.openBroker(joinURLs) } // Start the broker handler. @@ -176,7 +169,7 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in if cmd.config.Data.Enabled { //FIXME: Need to also pass in dataURLs to bootstrap a data node - s = cmd.openServer(brokerURLs, dataURLs) + s = cmd.openServer(joinURLs) s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled) // Enable retention policy enforcement if requested. @@ -445,13 +438,13 @@ func joinLog(l *raft.Log, brokerURLs []url.URL) { } // creates and initializes a server. -func (cmd *RunCommand) openServer(brokerURLs []url.URL, dataURLs []url.URL) *influxdb.Server { +func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { // Create messaging client to the brokers. c := influxdb.NewMessagingClient(cmd.config.ClusterURL()) // If join URLs were passed in then use them to override the client's URLs. - if len(brokerURLs) > 0 { - c.SetURLs(brokerURLs) + if len(joinURLs) > 0 { + c.SetURLs(joinURLs) } else if cmd.server.broker != nil { c.SetURLs([]url.URL{cmd.server.broker.URL()}) } @@ -484,13 +477,12 @@ func (cmd *RunCommand) openServer(brokerURLs []url.URL, dataURLs []url.URL) *inf } log.Printf("data server opened at %s", cmd.config.Data.Dir) - if len(dataURLs) > 0 { - joinServer(s, cmd.config.ClusterURL(), dataURLs) - return s + if len(joinURLs) > 0 { + joinServer(s, cmd.config.ClusterURL(), joinURLs) } dataNodeIndex := s.Index() - if dataNodeIndex == 0 && len(dataURLs) == 0 { + if dataNodeIndex == 0 { if err := s.Initialize(cmd.config.ClusterURL()); err != nil { log.Fatalf("server initialization error: %s", err) } @@ -509,6 +501,10 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) { // Create data node on an existing data node. for _, joinURL := range joinURLs { if err := s.Join(&u, &joinURL); err != nil { + // No data nodes could be found to join. We're the first. + if err == influxdb.ErrDataNodeNotFound { + return + } log.Printf("join: failed to connect data node: %s: %s", u, err) } else { log.Printf("join: connected data node to %s", u) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 029ca339cc6..758d41e8a3b 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1868,10 +1868,9 @@ func TestSeparateBrokerDataNode(t *testing.T) { } u := b.URL() - dataConfig.Initialization.JoinURLs = (&u).String() dataCmd := main.NewRunCommand() - _, s, _ := dataCmd.Open(dataConfig, "") + _, s, _ := dataCmd.Open(dataConfig, (&u).String()) if s == nil { t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig.Port) } @@ -1920,14 +1919,17 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) { dataConfig1.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig1.Port)) dataConfig1.ReportingDisabled = true - dataConfig1.Initialization.JoinURLs = brokerURL dataCmd1 := main.NewRunCommand() - _, s1, _ := dataCmd1.Open(dataConfig1, "") + _, s1, _ := dataCmd1.Open(dataConfig1, brokerURL) if s1 == nil { t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig1.Port) } + // FIXME: This is needed for now because cmd.Open() will return before the server + // is actually ready to handle requests. + time.Sleep(1 * time.Second) + // Join data node 2 to single broker and first data node dataConfig2 := main.NewConfig() dataConfig2.Port = 9012 @@ -1935,12 +1937,9 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) { dataConfig2.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig2.Port)) dataConfig2.ReportingDisabled = true - dataNode1Url := s1.URL() - dataConfig2.Data.JoinURLs = (&dataNode1Url).String() - dataConfig2.Initialization.JoinURLs = brokerURL dataCmd2 := main.NewRunCommand() - _, s2, _ := dataCmd2.Open(dataConfig2, "") + _, s2, _ := dataCmd2.Open(dataConfig2, brokerURL) if s2 == nil { t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig2.Port) } diff --git a/influxdb.go b/influxdb.go index 0ea79615d3b..21136d18aef 100644 --- a/influxdb.go +++ b/influxdb.go @@ -35,7 +35,8 @@ var ( // ErrDataNodeExists is returned when creating a duplicate data node. ErrDataNodeExists = errors.New("data node exists") - // ErrDataNodeNotFound is returned when dropping a non-existent data node. + // ErrDataNodeNotFound is returned when dropping a non-existent data node or + // attempting to join another data node when no data nodes exist yet ErrDataNodeNotFound = errors.New("data node not found") // ErrDataNodeRequired is returned when using a blank data node id. diff --git a/server.go b/server.go index b3fa7ff453d..09885d20bb2 100644 --- a/server.go +++ b/server.go @@ -611,18 +611,18 @@ func (s *Server) syncBroadcast(index uint64) error { } } -// Initialize creates a new data node and initializes the server's id to 1. +// Initialize creates a new data node and initializes the server's id to the latest. func (s *Server) Initialize(u url.URL) error { // Create a new data node. if err := s.CreateDataNode(&u); err != nil { return err } - // Ensure the data node returns with an ID of 1. + // Ensure the data node returns with an ID. // If it doesn't then something went really wrong. We have to panic because // the messaging client relies on the first server being assigned ID 1. n := s.DataNodeByURL(&u) - assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID) + assert(n != nil, "node not created: %s", u.String()) // Set the ID on the metastore. if err := s.meta.mustUpdate(0, func(tx *metatx) error { @@ -632,7 +632,7 @@ func (s *Server) Initialize(u url.URL) error { } // Set the ID on the server. - s.id = 1 + s.id = n.ID return nil } @@ -657,20 +657,58 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error { s.mu.Lock() defer s.mu.Unlock() - // Encode data node request. - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(&dataNodeJSON{URL: u.String()}); err != nil { - return err - } - - // Send request. + // Create the initial request. Might get a redirect though depending on + // the nodes role joinURL = copyURL(joinURL) joinURL.Path = "/data_nodes" - resp, err := http.Post(joinURL.String(), "application/octet-stream", &buf) - if err != nil { - return err + + var retries int + var resp *http.Response + var err error + + // When POSTing the to the join endoinpoint, we are manually following redirects + // and not relying on the Go http client redirect policy. The Go http client will convert + // POSTs to GETSs when following redirects which is not what we want when joining. + // (i.e. we want to join a node, not list the nodes) If we recieve a redirect response, + // the Location header is where we should resend the POST. We also need to re-encode + // body since the buf was already read. + for { + + // Should never get here but bail to avoid a infinite redirect loop to be safe + if retries >= 3 { + return ErrUnableToJoin + } + + // Encode data node request. + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(&dataNodeJSON{URL: u.String()}); err != nil { + return err + } + + resp, err = http.Post(joinURL.String(), "application/octet-stream", &buf) + if err != nil { + return err + } + defer resp.Body.Close() + + // We likely tried to join onto a broker which cannot handle this request. It + // has given us the address of a known data node to join instead. + if resp.StatusCode == http.StatusTemporaryRedirect { + joinURL, err = url.Parse(resp.Header.Get("Location")) + if err != nil { + return err + } + retries += 1 + resp.Body.Close() + continue + } + + // If we are first data node, we can't join anyone and need to initialize + if resp.StatusCode == http.StatusNotFound { + return ErrDataNodeNotFound + } + break } - defer resp.Body.Close() // Check if created. if resp.StatusCode != http.StatusCreated {