From 3d9899b32c16740a82f874504de7cc14cc552d83 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 10 Apr 2015 12:59:55 -0600 Subject: [PATCH] Fix term signal. This commit changes raft so that term changes are made immediately and term change signals are made afterward. Previously, election timeouts were invalidated by incoming term changes which caused an election loop. Stale term was also fixed and http/pprof was added too. --- cmd/influxd/config.go | 4 ++++ cmd/influxd/handler.go | 16 ++++++++++++++++ messaging/broker.go | 8 ++++---- raft/log.go | 6 ++++++ server.go | 1 + 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 7ccd8fc82cb..9c6734fde14 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -184,6 +184,10 @@ type Config struct { WriteInterval Duration `toml:"write-interval"` } `toml:"monitoring"` + Debugging struct { + PprofEnabled bool `toml:"pprof-enabled"` + } `toml:"debugging"` + ContinuousQuery struct { // when continuous queries are run we'll automatically recompute previous intervals // in case lagged data came in. Set to zero if you never have lagged data. We do diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go index e9bd987fdf8..e50b6fd5d9e 100644 --- a/cmd/influxd/handler.go +++ b/cmd/influxd/handler.go @@ -4,6 +4,7 @@ import ( "log" "math/rand" "net/http" + "net/http/pprof" "net/url" "strings" @@ -29,6 +30,21 @@ func NewHandler() *Handler { // ServeHTTP responds to HTTP request to the handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Debug routes. + if strings.HasPrefix(r.URL.Path, "/debug/pprof") { + switch r.URL.Path { + case "/debug/pprof/cmdline": + pprof.Cmdline(w, r) + case "/debug/pprof/profile": + pprof.Profile(w, r) + case "/debug/pprof/symbol": + pprof.Symbol(w, r) + default: + pprof.Index(w, r) + } + return + } + // FIXME: This is very brittle. Refactor to have common path prefix if strings.HasPrefix(r.URL.Path, "/raft") { h.serveRaft(w, r) diff --git a/messaging/broker.go b/messaging/broker.go index 8aa6eadd75b..d84cd87a028 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) { if t := b.topics[topicID]; t != nil { t.mu.Lock() defer t.mu.Unlock() + // Track the highest replicated index per data node URL t.indexByURL[u] = index - - if t.index < index { - t.index = index - } } } @@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error { return fmt.Errorf("write segment: %s", err) } + // Update index. + t.index = m.Index + return nil } diff --git a/raft/log.go b/raft/log.go index 5f22b34e50e..25138d6e573 100644 --- a/raft/log.go +++ b/raft/log.go @@ -546,6 +546,12 @@ func (l *Log) mustSetTermIfHigher(term uint64) { if err := l.setTerm(term); err != nil { panic("unable to set term: " + err.Error()) } + + // Signal term change. + select { + case l.terms <- struct{}{}: + default: + } } // readConfig reads the configuration from disk. diff --git a/server.go b/server.go index 3286bdf162b..d357b986e4f 100644 --- a/server.go +++ b/server.go @@ -1771,6 +1771,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( for _, p := range points { measurement, series := db.MeasurementAndSeries(p.Name, p.Tags) if series == nil { + s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags) return ErrSeriesNotFound }