Skip to content

Commit

Permalink
Remove all join URLs from config
Browse files Browse the repository at this point in the history
This removes all join URLs from the config.  To join a node to a
cluster, the URL of another member of the cluster should be passed
on the command line w/ the -join flag.  The join URLs can now be
any node regardless of whether the node is a broker only or data
only node.  At join time, the receiving node will redirect the
request to a valid broker or data node if it cannot handle the request
itself.
  • Loading branch information
jwilder committed Apr 6, 2015
1 parent 9af362b commit 8b5307f
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 72 deletions.
16 changes: 0 additions & 16 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ const (
// DefaultSnapshotPort is the default port to serve snapshots from.
DefaultSnapshotPort = 8087

// DefaultJoinURLs represents the default URLs for joining a cluster.
DefaultJoinURLs = ""

// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute
Expand Down Expand Up @@ -116,7 +113,6 @@ type Data struct {
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod Duration `toml:"retention-check-period"`
RetentionCreatePeriod Duration `toml:"retention-create-period"`
JoinURLs string `toml:"join-urls"`
}

// Config represents the configuration format for the influxd binary.
Expand All @@ -128,10 +124,6 @@ type Config struct {
Version string `toml:"-"`
InfluxDBVersion string `toml:"-"`

Initialization struct {
JoinURLs string `toml:"join-urls"`
} `toml:"initialization"`

Authentication struct {
Enabled bool `toml:"enabled"`
} `toml:"authentication"`
Expand Down Expand Up @@ -321,14 +313,6 @@ func (c *Config) DataDir() string {
return p
}

func (c *Config) JoinURLs() string {
if c.Initialization.JoinURLs == "" {
return DefaultJoinURLs
} else {
return c.Initialization.JoinURLs
}
}

// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups.
// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration {
Expand Down
13 changes: 0 additions & 13 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ const testFile = `
hostname = "myserver.com"
port = 8086
# Controls certain parameters that only take effect until an initial successful
# start-up has occurred.
[initialization]
join-urls = "http://127.0.0.1:8086"
# Control authentication
[authentication]
Expand Down Expand Up @@ -110,7 +106,6 @@ retention-auto-create = false
retention-check-enabled = true
retention-check-period = "5m"
enabled = false
join-urls = "http://127.0.0.1:8087"
[continuous_queries]
disabled = true
Expand Down Expand Up @@ -157,10 +152,6 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp)
}

if c.JoinURLs() != "http://127.0.0.1:8086" {
t.Fatalf("JoinURLs mistmatch: %v", c.JoinURLs())
}

if !c.Authentication.Enabled {
t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled)
}
Expand Down Expand Up @@ -267,10 +258,6 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("data disabled mismatch: %v, got: %v", false, c.Data.Enabled)
}

if exp := "http://127.0.0.1:8087"; c.Data.JoinURLs != exp {
t.Fatalf("data join urls mismatch: %v, got: %v", exp, c.Data.JoinURLs)
}

if c.Monitoring.WriteInterval.String() != "1m0s" {
t.Fatalf("Monitoring.WriteInterval mismatch: %v", c.Monitoring.WriteInterval)
}
Expand Down
10 changes: 8 additions & 2 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
h.redirect(h.Server.BrokerURLs(), w, r)
}

// serverMetadata responds to broker requests
// serveMetadata responds to broker requests
func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
Expand Down Expand Up @@ -128,6 +128,12 @@ func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
return
}

t := h.Broker.Topic(influxdb.BroadcastTopicID)
if t == nil {
http.Error(w, "not found", http.StatusNotFound)
return
}

// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}
Expand All @@ -145,5 +151,5 @@ func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request)
// this is happening frequently, the clients are using a suboptimal endpoint

// Redirect the client to a valid data node that can handle the request
http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusSeeOther)
http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusTemporaryRedirect)
}
30 changes: 13 additions & 17 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,11 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
log.Printf("influxdb started, version %s, commit %s", version, commit)

// Parse join urls from the --join flag.
var brokerURLs []url.URL
if join == "" {
brokerURLs = parseURLs(cmd.config.JoinURLs())
} else {
brokerURLs = parseURLs(join)
}

dataURLs := parseURLs(cmd.config.Data.JoinURLs)
joinURLs := parseURLs(join)

// Open broker & raft log, initialize or join as necessary.
if cmd.config.Broker.Enabled {
cmd.openBroker(brokerURLs)
cmd.openBroker(joinURLs)
}

// Start the broker handler.
Expand Down Expand Up @@ -176,7 +169,7 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
if cmd.config.Data.Enabled {

//FIXME: Need to also pass in dataURLs to bootstrap a data node
s = cmd.openServer(brokerURLs, dataURLs)
s = cmd.openServer(joinURLs)
s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled)

// Enable retention policy enforcement if requested.
Expand Down Expand Up @@ -445,13 +438,13 @@ func joinLog(l *raft.Log, brokerURLs []url.URL) {
}

// creates and initializes a server.
func (cmd *RunCommand) openServer(brokerURLs []url.URL, dataURLs []url.URL) *influxdb.Server {
func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {

// Create messaging client to the brokers.
c := influxdb.NewMessagingClient(cmd.config.ClusterURL())
// If join URLs were passed in then use them to override the client's URLs.
if len(brokerURLs) > 0 {
c.SetURLs(brokerURLs)
if len(joinURLs) > 0 {
c.SetURLs(joinURLs)
} else if cmd.server.broker != nil {
c.SetURLs([]url.URL{cmd.server.broker.URL()})
}
Expand Down Expand Up @@ -484,13 +477,12 @@ func (cmd *RunCommand) openServer(brokerURLs []url.URL, dataURLs []url.URL) *inf
}
log.Printf("data server opened at %s", cmd.config.Data.Dir)

if len(dataURLs) > 0 {
joinServer(s, cmd.config.ClusterURL(), dataURLs)
return s
if len(joinURLs) > 0 {
joinServer(s, cmd.config.ClusterURL(), joinURLs)
}

dataNodeIndex := s.Index()
if dataNodeIndex == 0 && len(dataURLs) == 0 {
if dataNodeIndex == 0 {
if err := s.Initialize(cmd.config.ClusterURL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
Expand All @@ -509,6 +501,10 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(&u, &joinURL); err != nil {
// No data nodes could be found to join. We're the first.
if err == influxdb.ErrDataNodeNotFound {
return
}
log.Printf("join: failed to connect data node: %s: %s", u, err)
} else {
log.Printf("join: connected data node to %s", u)
Expand Down
15 changes: 7 additions & 8 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1868,10 +1868,9 @@ func TestSeparateBrokerDataNode(t *testing.T) {
}

u := b.URL()
dataConfig.Initialization.JoinURLs = (&u).String()
dataCmd := main.NewRunCommand()

_, s, _ := dataCmd.Open(dataConfig, "")
_, s, _ := dataCmd.Open(dataConfig, (&u).String())
if s == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig.Port)
}
Expand Down Expand Up @@ -1920,27 +1919,27 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) {
dataConfig1.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig1.Port))
dataConfig1.ReportingDisabled = true

dataConfig1.Initialization.JoinURLs = brokerURL
dataCmd1 := main.NewRunCommand()

_, s1, _ := dataCmd1.Open(dataConfig1, "")
_, s1, _ := dataCmd1.Open(dataConfig1, brokerURL)
if s1 == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig1.Port)
}

// FIXME: This is needed for now because cmd.Open() will return before the server
// is actually ready to handle requests.
time.Sleep(1 * time.Second)

// Join data node 2 to single broker and first data node
dataConfig2 := main.NewConfig()
dataConfig2.Port = 9012
dataConfig2.Data.Enabled = true
dataConfig2.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig2.Port))
dataConfig2.ReportingDisabled = true

dataNode1Url := s1.URL()
dataConfig2.Data.JoinURLs = (&dataNode1Url).String()
dataConfig2.Initialization.JoinURLs = brokerURL
dataCmd2 := main.NewRunCommand()

_, s2, _ := dataCmd2.Open(dataConfig2, "")
_, s2, _ := dataCmd2.Open(dataConfig2, brokerURL)
if s2 == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, dataConfig2.Port)
}
Expand Down
3 changes: 2 additions & 1 deletion influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (
// ErrDataNodeExists is returned when creating a duplicate data node.
ErrDataNodeExists = errors.New("data node exists")

// ErrDataNodeNotFound is returned when dropping a non-existent data node.
// ErrDataNodeNotFound is returned when dropping a non-existent data node or
// attempting to join another data node when no data nodes exist yet
ErrDataNodeNotFound = errors.New("data node not found")

// ErrDataNodeRequired is returned when using a blank data node id.
Expand Down
68 changes: 53 additions & 15 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,18 @@ func (s *Server) syncBroadcast(index uint64) error {
}
}

// Initialize creates a new data node and initializes the server's id to 1.
// Initialize creates a new data node and initializes the server's id to the latest.
func (s *Server) Initialize(u url.URL) error {
// Create a new data node.
if err := s.CreateDataNode(&u); err != nil {
return err
}

// Ensure the data node returns with an ID of 1.
// Ensure the data node returns with an ID.
// If it doesn't then something went really wrong. We have to panic because
// the messaging client relies on the first server being assigned ID 1.
n := s.DataNodeByURL(&u)
assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID)
assert(n != nil, "node not created: %s", u.String())

// Set the ID on the metastore.
if err := s.meta.mustUpdate(0, func(tx *metatx) error {
Expand All @@ -632,7 +632,7 @@ func (s *Server) Initialize(u url.URL) error {
}

// Set the ID on the server.
s.id = 1
s.id = n.ID

return nil
}
Expand All @@ -657,20 +657,58 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
s.mu.Lock()
defer s.mu.Unlock()

// Encode data node request.
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&dataNodeJSON{URL: u.String()}); err != nil {
return err
}

// Send request.
// Create the initial request. Might get a redirect though depending on
// the nodes role
joinURL = copyURL(joinURL)
joinURL.Path = "/data_nodes"
resp, err := http.Post(joinURL.String(), "application/octet-stream", &buf)
if err != nil {
return err

var retries int
var resp *http.Response
var err error

// When POSTing the to the join endoinpoint, we are manually following redirects
// and not relying on the Go http client redirect policy. The Go http client will convert
// POSTs to GETSs when following redirects which is not what we want when joining.
// (i.e. we want to join a node, not list the nodes) If we recieve a redirect response,
// the Location header is where we should resend the POST. We also need to re-encode
// body since the buf was already read.
for {

// Should never get here but bail to avoid a infinite redirect loop to be safe
if retries >= 3 {
return ErrUnableToJoin
}

// Encode data node request.
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&dataNodeJSON{URL: u.String()}); err != nil {
return err
}

resp, err = http.Post(joinURL.String(), "application/octet-stream", &buf)
if err != nil {
return err
}
defer resp.Body.Close()

// We likely tried to join onto a broker which cannot handle this request. It
// has given us the address of a known data node to join instead.
if resp.StatusCode == http.StatusTemporaryRedirect {
joinURL, err = url.Parse(resp.Header.Get("Location"))
if err != nil {
return err
}
retries += 1
resp.Body.Close()
continue
}

// If we are first data node, we can't join anyone and need to initialize
if resp.StatusCode == http.StatusNotFound {
return ErrDataNodeNotFound
}
break
}
defer resp.Body.Close()

// Check if created.
if resp.StatusCode != http.StatusCreated {
Expand Down

9 comments on commit 8b5307f

@cboggs
Copy link

@cboggs cboggs commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a significant gain in removing this from the config file? My only concern (admittedly very nitpick-y) is that automated deployment of InfluxDB nodes will now require the users config management code to lay down a custom init script or upstart config as opposed to just laying down a custom Influx config file and expecting the stock packaged startup mechanisms to work. Not onerous, but out of the norm it seems.

Certainly not a huge problem, but seems like a papercut that could avoided - unless I'm missing some benefit to doing it this way. :-)

@pauldix
Copy link
Member

@pauldix pauldix commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cboggs we were doing it for cleanliness since it's something that is only paid attention to on the first startup. And it's easier in our code to know that you're configuring for the first time if that flag is passed in.

If it makes config management much easier to have it in the config file, perhaps we can have a section in there titled run_once or something. What you think @jwilder or @corylanou?

@otoolep
Copy link
Contributor

@otoolep otoolep commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cboggs makes an excellent point. Forcing our users to have a custom init script is a non-starter because every time InfluxDB is upgraded, the new package unconditionally overwrites any existing script in /etc/opt/init.d. This is important because sometimes bug fixes require changes to the init scripts. If user-specific changes are in the init script, they will be lost on upgrade.

We have two choices are far as I can see -- back to an "initialization" section in the config file, which we had before but previously removed (this may have been a mistake), or allow the join-urls to be specified in /etc/influxdb/defaults, which is sourced by the init.d script on startup. I prefer option 1.

@otoolep
Copy link
Contributor

@otoolep otoolep commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, if end-users launch influxd using their own tools which specify all options, then perhaps they shouldn't be using the init.d script i.e. service influxdb start from those tools. In that case our init.d script is only useful for single-node systems, which I don't think is desirable.

@jwilder
Copy link
Contributor Author

@jwilder jwilder commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this isn't ideal at the moment. I think we should:

  1. Bring back [Initialization].join-urls
  2. Update the init script to allow passing options to via /etc/influxdb/defaults. Should never need to maintain a custom init script.
  3. Document when and how join URLs are used (e.g. only on the first boot, -join flag takes precedence over config values and existing cluster state takes precedence over any -join flag or join-urls config option.
  4. Add info logging in the server around what join URLs are being used so it's clear what's going on. Currently in there but could be improved.

@pauldix
Copy link
Member

@pauldix pauldix commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to everything @jwilder said

@otoolep
Copy link
Contributor

@otoolep otoolep commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of options 1 and 2, we only need to implement 1, right? What use is 2 if we have 1? I might be missing something, of course.

I prefer 1, FWIW.

@jwilder
Copy link
Contributor Author

@jwilder jwilder commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, both need to be implemented. The preferred way to set joins URLs when using config management would be through the config file. 2 is needed because the init script is broken if you can't pass command-line options w/o editing it.

@cboggs
Copy link

@cboggs cboggs commented on 8b5307f Apr 8, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I like @jwilder's summary, I think those are all pretty necessary.
Thanks guys!

Please sign in to comment.