Skip to content

Commit

Permalink
Merge pull request #2236 from influxdb/term-signal
Browse files Browse the repository at this point in the history
Term signal
  • Loading branch information
benbjohnson committed Apr 10, 2015
2 parents cbc2d8b + eaf4bfc commit 3404386
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
- [#2236](https://github.com/influxdb/influxdb/pull/2236): Immediate term changes, fix stale write issue, net/http/pprof

## v0.9.0-rc22 [2015-04-09]

Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ type Config struct {
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`

Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`

ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"

Expand All @@ -29,6 +30,21 @@ func NewHandler() *Handler {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}

// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
Expand Down
8 changes: 4 additions & 4 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) {
if t := b.topics[topicID]; t != nil {
t.mu.Lock()
defer t.mu.Unlock()

// Track the highest replicated index per data node URL
t.indexByURL[u] = index

if t.index < index {
t.index = index
}
}
}

Expand Down Expand Up @@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error {
return fmt.Errorf("write segment: %s", err)
}

// Update index.
t.index = m.Index

return nil
}

Expand Down
3 changes: 0 additions & 3 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatalf("apply error: %s", err)
}

if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
Expand Down
111 changes: 44 additions & 67 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Log struct {
// Incoming heartbeats and term changes go to these channels
// and are picked up by the current state.
heartbeats chan heartbeat
terms chan uint64
terms chan struct{}

// Close notification and wait.
wg sync.WaitGroup
Expand Down Expand Up @@ -201,7 +201,7 @@ func NewLog() *Log {
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 10),
terms: make(chan uint64, 10),
terms: make(chan struct{}, 1),
Logger: log.New(os.Stderr, "[raft] ", log.LstdFlags),
}
l.updateLogPrefix()
Expand Down Expand Up @@ -538,10 +538,20 @@ func (l *Log) setTerm(term uint64) error {
}

// mustSetTerm sets the current term and clears the vote. Panic on error.
func (l *Log) mustSetTerm(term uint64) {
func (l *Log) mustSetTermIfHigher(term uint64) {
if term <= l.term {
return
}

if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}

// Signal term change.
select {
case l.terms <- struct{}{}:
default:
}
}

// readConfig reads the configuration from disk.
Expand Down Expand Up @@ -863,21 +873,12 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {

// Update term, commit index & leader.
l.mu.Lock()
if hb.term > l.term {
l.mustSetTerm(hb.term)
}
l.mustSetTermIfHigher(hb.term)
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
}
l.leaderID = hb.leaderID
l.mu.Unlock()

case term := <-l.terms:
l.mu.Lock()
if term > l.term {
l.mustSetTerm(term)
}
l.mu.Unlock()
}
}
}
Expand Down Expand Up @@ -960,9 +961,14 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {

// Increment term and request votes.
l.mu.Lock()
l.mustSetTerm(l.term + 1)
l.mustSetTermIfHigher(l.term + 1)
l.votedFor = l.id
term := l.term

select {
case <-l.terms:
default:
}
l.mu.Unlock()

// Ensure all candidate goroutines complete before transitioning to another state.
Expand All @@ -981,27 +987,13 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
return Stopped
case hb := <-l.heartbeats:
l.mu.Lock()
if hb.term >= term {
l.mustSetTerm(hb.term)
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.leaderID = hb.leaderID
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case newTerm := <-l.terms:
// Ignore if it's not after this current term.
if newTerm <= term {
continue
}

// Check against the current term since that may have changed.
l.mu.Lock()
if newTerm > l.term {
l.mustSetTerm(newTerm)
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case <-l.terms:
return Follower
case <-elected:
return Leader
case ch := <-l.Clock.AfterElectionTimeout():
Expand Down Expand Up @@ -1034,12 +1026,11 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)

// If an error occured then send the peer's term.
// If an error occured then update term.
if err != nil {
select {
case l.terms <- peerTerm:
default:
}
l.mu.Lock()
l.mustSetTermIfHigher(peerTerm)
l.mu.Unlock()
return
}
votes <- struct{}{}
Expand Down Expand Up @@ -1078,6 +1069,11 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
// Retrieve leader's term.
l.mu.Lock()
term := l.term

select {
case <-l.terms:
default:
}
l.mu.Unlock()

// Read log from leader in a separate goroutine.
Expand All @@ -1092,25 +1088,16 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case <-closing: // wait for state change.
return Stopped

case newTerm := <-l.terms: // step down on higher term
if newTerm > term {
l.mu.Lock()
l.mustSetTerm(newTerm)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case <-l.terms: // step down on higher term
l.mu.Lock()
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower

case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.mustSetTerm(hb.term)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case hb := <-l.heartbeats: // update term, if necessary
l.mu.Lock()
l.mustSetTermIfHigher(hb.term)
l.mu.Unlock()

case commitIndex, ok := <-committed:
// Quorum not reached, try again.
Expand Down Expand Up @@ -1614,14 +1601,7 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
}

// Notify term change.
l.term = term
l.votedFor = 0
if term > l.term {
select {
case l.terms <- term:
default:
}
}
l.mustSetTermIfHigher(term)

// Reject request if log is out of date.
if lastLogTerm < l.lastLogTerm {
Expand Down Expand Up @@ -1675,10 +1655,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error
if l.state != Leader {
return nil, ErrNotLeader
} else if term > l.term {
select {
case l.terms <- term:
default:
}
l.mustSetTermIfHigher(term)
return nil, ErrNotLeader
}

Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
return ErrSeriesNotFound
}

Expand Down

0 comments on commit 3404386

Please sign in to comment.