Skip to content

Commit

Permalink
Fix term signal.
Browse files Browse the repository at this point in the history
This commit changes raft so that term changes are made immediately and
term change signals are made afterward. Previously, election timeouts
were invalidated by incoming term changes which caused an election loop.

Stale term was also fixed and http/pprof was added too.
  • Loading branch information
benbjohnson committed Apr 10, 2015
1 parent c757039 commit 3d9899b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ type Config struct {
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`

Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`

ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"

Expand All @@ -29,6 +30,21 @@ func NewHandler() *Handler {

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}

// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
Expand Down
8 changes: 4 additions & 4 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) {
if t := b.topics[topicID]; t != nil {
t.mu.Lock()
defer t.mu.Unlock()

// Track the highest replicated index per data node URL
t.indexByURL[u] = index

if t.index < index {
t.index = index
}
}
}

Expand Down Expand Up @@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error {
return fmt.Errorf("write segment: %s", err)
}

// Update index.
t.index = m.Index

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,12 @@ func (l *Log) mustSetTermIfHigher(term uint64) {
if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}

// Signal term change.
select {
case l.terms <- struct{}{}:
default:
}
}

// readConfig reads the configuration from disk.
Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
return ErrSeriesNotFound
}

Expand Down

0 comments on commit 3d9899b

Please sign in to comment.