Skip to content

Commit

Permalink
Track HTTP client requests for /write and /query with /debug/requests
Browse files Browse the repository at this point in the history
After using `/debug/requests`, the client will wait for 30 seconds
(configurable by specifying `seconds=` in the query parameters) and the
HTTP handler will track every incoming query and write to the system.
After that time period has passed, it will output a JSON blob that looks
very similar to `/debug/vars` that shows every IP address and user
account (if authentication is used) that connected to the host during
that time.

In the future, we can add more metrics to track. This is an initial
start to aid with debugging machines that connect too often by looking
at a sample of time (like `/debug/pprof`).
  • Loading branch information
jsternberg committed May 9, 2017
1 parent e6f31c3 commit 2780630
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1
- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits
- [#8366](https://github.com/influxdata/influxdb/pull/8366): Add TSI support tooling.
- [#8350](https://github.com/influxdata/influxdb/pull/8350): Track HTTP client requests for /write and /query with /debug/requests.

### Bugfixes

Expand Down
79 changes: 74 additions & 5 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
//
// This has no relation to the number of bytes that are returned.
DefaultChunkSize = 10000

DefaultDebugRequestsInterval = 10 * time.Second

MaxDebugRequestsInterval = 6 * time.Hour
)

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -99,16 +103,19 @@ type Handler struct {
Logger zap.Logger
CLFLogger *log.Logger
stats *Statistics

requestTracker *RequestTracker
}

// NewHandler returns a new instance of handler with routes.
func NewHandler(c Config) *Handler {
h := &Handler{
mux: pat.New(),
Config: &c,
Logger: zap.New(zap.NullEncoder()),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
stats: &Statistics{},
mux: pat.New(),
Config: &c,
Logger: zap.New(zap.NullEncoder()),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
stats: &Statistics{},
requestTracker: NewRequestTracker(),
}

h.AddRoutes([]Route{
Expand Down Expand Up @@ -257,6 +264,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
} else if strings.HasPrefix(r.URL.Path, "/debug/vars") {
h.serveExpvar(w, r)
} else if strings.HasPrefix(r.URL.Path, "/debug/requests") {
h.serveDebugRequests(w, r)
} else {
h.mux.ServeHTTP(w, r)
}
Expand All @@ -282,6 +291,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
defer func(start time.Time) {
atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())
h.requestTracker.Add(r, user)

// Retrieve the underlying ResponseWriter or initialize our own.
rw, ok := w.(ResponseWriter)
Expand Down Expand Up @@ -584,6 +594,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())
h.requestTracker.Add(r, user)

database := r.URL.Query().Get("db")
if database == "" {
Expand Down Expand Up @@ -834,6 +845,64 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "\n}")
}

// serveDebugRequests will track requests for a period of time.
func (h *Handler) serveDebugRequests(w http.ResponseWriter, r *http.Request) {
var d time.Duration
if s := r.URL.Query().Get("seconds"); s == "" {
d = DefaultDebugRequestsInterval
} else if seconds, err := strconv.ParseInt(s, 10, 64); err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
} else {
d = time.Duration(seconds) * time.Second
if d > MaxDebugRequestsInterval {
h.httpError(w, fmt.Sprintf("exceeded maximum interval time: %s > %s",
influxql.FormatDuration(d),
influxql.FormatDuration(MaxDebugRequestsInterval)),
http.StatusBadRequest)
return
}
}

var closing <-chan bool
if notifier, ok := w.(http.CloseNotifier); ok {
closing = notifier.CloseNotify()
}

profile := h.requestTracker.TrackRequests()

timer := time.NewTimer(d)
select {
case <-timer.C:
profile.Stop()
case <-closing:
// Connection was closed early.
profile.Stop()
timer.Stop()
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Add("Connection", "close")

fmt.Fprintln(w, "{")
first := true
for req, st := range profile.Requests {
val, err := json.Marshal(st)
if err != nil {
continue
}

if !first {
fmt.Fprintln(w, ",")
}
first = false
fmt.Fprintf(w, "%q: ", req.String())
w.Write(bytes.TrimSpace(val))
}
fmt.Fprintln(w, "\n}")
}

// parseSystemDiagnostics converts the system diagnostics into an appropriate
// format for marshaling to JSON in the /debug/vars format.
func parseSystemDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, error) {
Expand Down
140 changes: 140 additions & 0 deletions services/httpd/requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package httpd

import (
"container/list"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"

"github.com/influxdata/influxdb/services/meta"
)

type RequestInfo struct {
IPAddr string
Username string
}

type RequestStats struct {
Writes int64 `json:"writes"`
Queries int64 `json:"queries"`
}

func (r *RequestInfo) String() string {
if r.Username != "" {
return fmt.Sprintf("%s:%s", r.Username, r.IPAddr)
}
return r.IPAddr
}

type RequestProfile struct {
tracker *RequestTracker
elem *list.Element

mu sync.RWMutex
Requests map[RequestInfo]*RequestStats
}

func (p *RequestProfile) AddWrite(info RequestInfo) {
p.add(info, p.addWrite)
}

func (p *RequestProfile) AddQuery(info RequestInfo) {
p.add(info, p.addQuery)
}

func (p *RequestProfile) add(info RequestInfo, fn func(*RequestStats)) {
// Look for a request entry for this request.
p.mu.RLock()
st, ok := p.Requests[info]
p.mu.RUnlock()
if ok {
fn(st)
return
}

// There is no entry in the request tracker. Create one.
p.mu.Lock()
if st, ok := p.Requests[info]; ok {
// Something else created this entry while we were waiting for the lock.
p.mu.Unlock()
fn(st)
return
}

st = &RequestStats{}
p.Requests[info] = st
p.mu.Unlock()
fn(st)
}

func (p *RequestProfile) addWrite(st *RequestStats) {
atomic.AddInt64(&st.Writes, 1)
}

func (p *RequestProfile) addQuery(st *RequestStats) {
atomic.AddInt64(&st.Queries, 1)
}

// Stop informs the RequestTracker to stop collecting statistics for this
// profile.
func (p *RequestProfile) Stop() {
p.tracker.mu.Lock()
p.tracker.profiles.Remove(p.elem)
p.tracker.mu.Unlock()
}

type RequestTracker struct {
mu sync.RWMutex
profiles *list.List
}

func NewRequestTracker() *RequestTracker {
return &RequestTracker{
profiles: list.New(),
}
}

func (rt *RequestTracker) TrackRequests() *RequestProfile {
// Perform the memory allocation outside of the lock.
profile := &RequestProfile{
Requests: make(map[RequestInfo]*RequestStats),
tracker: rt,
}

rt.mu.Lock()
profile.elem = rt.profiles.PushBack(profile)
rt.mu.Unlock()
return profile
}

func (rt *RequestTracker) Add(req *http.Request, user *meta.UserInfo) {
rt.mu.RLock()
if rt.profiles.Len() == 0 {
rt.mu.RUnlock()
return
}
defer rt.mu.RUnlock()

var info RequestInfo
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return
}

info.IPAddr = host
if user != nil {
info.Username = user.Name
}

// Add the request info to the profiles.
for p := rt.profiles.Front(); p != nil; p = p.Next() {
profile := p.Value.(*RequestProfile)
if req.URL.Path == "/query" {
profile.AddQuery(info)
} else if req.URL.Path == "/write" {
profile.AddWrite(info)
}
}
}

0 comments on commit 2780630

Please sign in to comment.