Skip to content

Commit

Permalink
Start cluster before broker.
Browse files Browse the repository at this point in the history
This commit fixes an issue where the second node joins but the first node
cannot commit because it doesn't have the HTTP endpoint running yet. This
is a side effect of streaming raft since we don't synchronize the quorum
set with the heartbeats. We should cache the config per term in the future.
  • Loading branch information
benbjohnson committed Apr 15, 2015
1 parent 03d5ffc commit 6e124f3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
28 changes: 13 additions & 15 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,21 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {
// Parse join urls from the --join flag.
joinURLs := parseURLs(join)

// Start the broker handler.
h := &Handler{Config: config}
if err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h); err != nil {
log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err)
}
log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr())

// Open broker & raft log, initialize or join as necessary.
if cmd.config.Broker.Enabled {
cmd.openBroker(joinURLs)
cmd.openBroker(joinURLs, h)
// If were running as a broker locally, always connect to it since it must
// be ready before we can start the data node.
joinURLs = []url.URL{cmd.node.Broker.URL()}
}

// Start the broker handler.
h := &Handler{
Config: config,
Broker: cmd.node.Broker,
Log: cmd.node.raftLog,
}

err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h)
if err != nil {
log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err)
}
log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr())

var s *influxdb.Server
// Open server, initialize or join as necessary.
if cmd.config.Data.Enabled {
Expand Down Expand Up @@ -469,7 +463,7 @@ func writePIDFile(path string) {
}

// creates and initializes a broker.
func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) {
path := cmd.config.BrokerDir()
u := cmd.config.ClusterURL()
raftTracing := cmd.config.Logging.RaftTracing
Expand Down Expand Up @@ -499,6 +493,10 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
log.Fatalf("raft: %s", err)
}

// Attached broker and log to handler.

This comment has been minimized.

Copy link
@otoolep

otoolep Apr 15, 2015

Contributor

Nit: Attached -> Attach

h.Broker = b
h.Log = l

// Checks to see if the raft index is 0. If it's 0, it might be the first
// node in the cluster and must initialize or join
index, _ := l.LastLogIndexTerm()
Expand Down
5 changes: 5 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,11 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
close(ch)

// Ignore timeout if we are snapshotting.
// Or if we haven't received confirmation of join.
if l.isSnapshotting() {
continue
} else if l.FSM.Index() == 0 {
continue
}

// TODO: Prevote before becoming candidate.
Expand Down Expand Up @@ -1349,6 +1352,7 @@ func (l *Log) appendToWriters(buf []byte) {

// If an error occurs then remove the writer and close it.
if _, err := w.Write(buf); err != nil {
l.Logger.Printf("append to writers error: %s", err)
l.removeWriter(w)
i--
continue
Expand Down Expand Up @@ -1797,6 +1801,7 @@ func (l *Log) removeWriter(writer *logWriter) {
l.writers[len(l.writers)-1] = nil
l.writers = l.writers[:len(l.writers)-1]
_ = w.Close()
l.Logger.Printf("writer removed: %#v", w)
break
}
}
Expand Down

0 comments on commit 6e124f3

Please sign in to comment.