Skip to content

Commit

Permalink
Merge pull request #5602 from influxdata/cluster-startup
Browse files Browse the repository at this point in the history
Simplify cluster startup for scripting and deployment
  • Loading branch information
jwilder committed Feb 12, 2016
2 parents 30a31d3 + cd56854 commit ef571fc
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 197 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [#5419](https://github.com/influxdata/influxdb/pull/5419): Graphite: Support matching tags multiple times Thanks @m4ce
- [#5598](https://github.com/influxdata/influxdb/pull/5598): Client: Add Ping to v2 client @PSUdaemon
- [#4125](https://github.com/influxdata/influxdb/pull/4125): Admin UI: Fetch and display server version on connect. Thanks @alexiri!
- [#5602](https://github.com/influxdata/influxdb/pull/5602): Simplify cluster startup for scripting and deployment

### Bugfixes

Expand Down
10 changes: 2 additions & 8 deletions cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb"
)

const logo = `
Expand Down Expand Up @@ -95,13 +94,8 @@ func (cmd *Command) Run(args ...string) error {
return fmt.Errorf("apply env config: %v", err)
}

// If we have a node ID, ignore the join argument
// We are not using the reference to this node var, just checking
// to see if we have a node ID on disk
if node, _ := influxdb.LoadNode(config.Meta.Dir, []string{config.Meta.HTTPBindAddress}); node == nil || node.ID == 0 {
if options.Join != "" {
config.Meta.JoinPeers = strings.Split(options.Join, ",")
}
if options.Join != "" {
config.Meta.JoinPeers = strings.Split(options.Join, ",")
}

// Validate the configuration.
Expand Down
24 changes: 17 additions & 7 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
return nil, err
}

// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
return nil, fmt.Errorf("mkdir all: %s", err)
}

// 0.11 we no longer use peers.json. Remove the file if we have one on disk.
os.RemoveAll(filepath.Join(c.Meta.Dir, "peers.json"))

// load the node information
metaAddresses := []string{nodeAddr}
if !c.Meta.Enabled {
Expand Down Expand Up @@ -627,6 +635,8 @@ func (s *Server) initializeMetaClient() error {

go s.updateMetaNodeInformation()

s.MetaClient.WaitForDataChanged()

return nil
}

Expand All @@ -645,14 +655,14 @@ func (s *Server) initializeMetaClient() error {
if err := s.MetaClient.Open(); err != nil {
return err
}

if s.TSDBStore != nil {
n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
if err != nil {
return err
}
s.Node.ID = n.ID
n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
for err != nil {
log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
time.Sleep(time.Second)
n, err = s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
}
s.Node.ID = n.ID

metaNodes, err := s.MetaClient.MetaNodes()
if err != nil {
return err
Expand Down
71 changes: 70 additions & 1 deletion services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,12 +827,21 @@ func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error {
continue
}
resp.Body.Close()

// Successfully joined
if resp.StatusCode == http.StatusOK {
return nil
}

// We tried to join a meta node that was not the leader, rety at the node
// they think is the leader.
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectServer = resp.Header.Get("Location")
continue
}

return nil
// Something failed, try the next node
currentServer++
}
}

Expand Down Expand Up @@ -1109,6 +1118,36 @@ func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {
return data, nil
}

// peers returns the TCPHost addresses of all the metaservers
func (c *Client) peers() []string {

var peers Peers
// query each server and keep track of who their peers are
for _, server := range c.metaServers {
url := c.url(server) + "/peers"
resp, err := http.Get(url)
if err != nil {
continue
}
defer resp.Body.Close()

// This meta-server might not be ready to answer, continue on
if resp.StatusCode != http.StatusOK {
continue
}

dec := json.NewDecoder(resp.Body)
var p []string
if err := dec.Decode(&p); err != nil {
continue
}
peers = peers.Append(p...)
}

// Return the unique set of peer addresses
return []string(peers.Unique())
}

func (c *Client) url(server string) string {
url := fmt.Sprintf("://%s", server)

Expand Down Expand Up @@ -1170,6 +1209,36 @@ func (c *Client) updateAuthCache() {
c.authCache = newCache
}

type Peers []string

func (peers Peers) Append(p ...string) Peers {
peers = append(peers, p...)

return peers.Unique()
}

func (peers Peers) Unique() Peers {
distinct := map[string]struct{}{}
for _, p := range peers {
distinct[p] = struct{}{}
}

var u Peers
for k := range distinct {
u = append(u, k)
}
return u
}

func (peers Peers) Contains(peer string) bool {
for _, p := range peers {
if p == peer {
return true
}
}
return false
}

type errRedirect struct {
host string
}
Expand Down
18 changes: 16 additions & 2 deletions services/meta/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -38,6 +39,7 @@ type handler struct {
apply(b []byte) error
join(n *NodeInfo) error
otherMetaServersHTTP() []string
peers() []string
}
s *Service

Expand Down Expand Up @@ -84,6 +86,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.WrapHandler("ping", h.servePing).ServeHTTP(w, r)
case "/lease":
h.WrapHandler("lease", h.serveLease).ServeHTTP(w, r)
case "/peers":
h.WrapHandler("peers", h.servePeers).ServeHTTP(w, r)
default:
h.WrapHandler("snapshot", h.serveSnapshot).ServeHTTP(w, r)
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (h *handler) isClosed() bool {
// serveExec executes the requested command.
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
if h.isClosed() {
h.httpError(fmt.Errorf("server closed"), w, http.StatusInternalServerError)
h.httpError(fmt.Errorf("server closed"), w, http.StatusServiceUnavailable)
return
}

Expand All @@ -137,6 +141,7 @@ func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
h.httpError(err, w, http.StatusInternalServerError)
return
}

err := h.store.join(n)
if err == raft.ErrNotLeader {
l := h.store.leaderHTTP()
Expand Down Expand Up @@ -307,6 +312,13 @@ func (h *handler) servePing(w http.ResponseWriter, r *http.Request) {
h.httpError(fmt.Errorf("one or more metaservers not up"), w, http.StatusInternalServerError)
}

func (h *handler) servePeers(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)
if err := enc.Encode(h.store.peers()); err != nil {
h.httpError(err, w, http.StatusInternalServerError)
}
}

// serveLease
func (h *handler) serveLease(w http.ResponseWriter, r *http.Request) {
var name, nodeIDStr string
Expand Down Expand Up @@ -442,8 +454,10 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler

defer func() {
if err := recover(); err != nil {
b := make([]byte, 1024)
runtime.Stack(b, false)
logLine := buildLogLine(l, r, start)
logLine = fmt.Sprintf(`%s [panic:%s]`, logLine, err)
logLine = fmt.Sprintf("%s [panic:%s]\n%s", logLine, err, string(b))
weblog.Println(logLine)
}
}()
Expand Down
51 changes: 33 additions & 18 deletions services/meta/raft_state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package meta

import (
"errors"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -73,7 +72,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)

// Create peer storage.
r.peerStore = raft.NewJSONPeers(r.path, r.transport)
r.peerStore = &peerStore{}

// This server is joining the raft cluster for the first time if initializePeers are passed in
if len(initializePeers) > 0 {
Expand All @@ -88,17 +87,20 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er
}

// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(peers) <= 1 {
if len(initializePeers) <= 1 {
config.EnableSingleNode = true

// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false

// For single-node clusters, we can update the raft peers before we start the cluster
// just in case the hostname has changed.
if err := r.peerStore.SetPeers([]string{r.addr}); err != nil {
return err
// Make sure our peer address is here. This happens with either a single node cluster
// or a node joining the cluster, as no one else has that information yet.
if !raft.PeerContained(peers, r.addr) {
if err := r.peerStore.SetPeers([]string{r.addr}); err != nil {
return err
}
}

peers = []string{r.addr}
}

Expand All @@ -107,7 +109,7 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, r.addr) {
r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.addr, r.path)
r.logger.Printf("%s is not in the list of raft peers. Please ensure all nodes have the same meta nodes configured", r.addr, r.path)
return fmt.Errorf("peers out of sync: %v not in %v", r.addr, peers)
}

Expand Down Expand Up @@ -156,6 +158,9 @@ func (r *raftState) logLeaderChanges() {
}

func (r *raftState) close() error {
if r == nil {
return nil
}
if r.closing != nil {
close(r.closing)
}
Expand Down Expand Up @@ -214,15 +219,6 @@ func (r *raftState) snapshot() error {

// addPeer adds addr to the list of peers in the cluster.
func (r *raftState) addPeer(addr string) error {
// peers, err := r.peerStore.Peers()
// if err != nil {
// return err
// }
// peers = append(peers, addr)
// if fut := r.raft.SetPeers(peers); fut.Error() != nil {
// return fut.Error()
// }

peers, err := r.peerStore.Peers()
if err != nil {
return err
Expand All @@ -244,7 +240,7 @@ func (r *raftState) addPeer(addr string) error {
func (r *raftState) removePeer(addr string) error {
// Only do this on the leader
if !r.isLeader() {
return errors.New("not the leader")
return raft.ErrNotLeader
}
if fut := r.raft.RemovePeer(addr); fut.Error() != nil {
return fut.Error()
Expand Down Expand Up @@ -326,3 +322,22 @@ func (l *raftLayer) Accept() (net.Conn, error) { return l.ln.Accept() }

// Close closes the layer.
func (l *raftLayer) Close() error { return l.ln.Close() }

// peerStore is an in-memory implementation of raft.PeerStore
type peerStore struct {
mu sync.RWMutex
peers []string
}

func (m *peerStore) Peers() ([]string, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.peers, nil
}

func (m *peerStore) SetPeers(peers []string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.peers = peers
return nil
}
8 changes: 5 additions & 3 deletions services/meta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ func (s *Service) Open() error {

// Open the store
s.store = newStore(s.config, s.httpAddr, s.raftAddr)
if err := s.store.open(s.RaftListener); err != nil {
return err
}

handler := newHandler(s.config, s)
handler.logger = s.Logger
Expand All @@ -121,6 +118,11 @@ func (s *Service) Open() error {

// Begin listening for requests in a separate goroutine.
go s.serve()

if err := s.store.open(s.RaftListener); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit ef571fc

Please sign in to comment.