Skip to content

Commit

Permalink
chore(http): fixup middleware stack and normalize metrics into same b…
Browse files Browse the repository at this point in the history
…uckets for id fields
  • Loading branch information
jsteenb2 committed Jan 8, 2020
1 parent 71c8751 commit c8aabaf
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 221 deletions.
41 changes: 22 additions & 19 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,27 +835,30 @@ func (m *Launcher) run(ctx context.Context) (err error) {
pkgHTTPServer = http.NewHandlerPkg(pkgServerLogger, m.apibackend.HTTPErrorHandler, pkgSVC)
}

// HTTP server
var platformHandler nethttp.Handler = http.NewPlatformHandler(m.apibackend, http.WithResourceHandler(pkgHTTPServer))
m.reg.MustRegister(platformHandler.(*http.PlatformHandler).PrometheusCollectors()...)
httpLogger := m.log.With(zap.String("service", "http"))
if logconf.Level == zap.DebugLevel {
platformHandler = http.LoggingMW(httpLogger)(platformHandler)
}

handler := http.NewHandlerFromRegistry(httpLogger, "platform", m.reg)
handler.Handler = platformHandler
{
platformHandler := http.NewPlatformHandler(m.apibackend, http.WithResourceHandler(pkgHTTPServer))

httpLogger := m.log.With(zap.String("service", "http"))
m.httpServer.Handler = http.NewHandlerFromRegistry(
"platform",
m.reg,
http.WithLog(httpLogger),
http.WithAPIHandler(platformHandler),
)

m.httpServer.Handler = handler
// If we are in testing mode we allow all data to be flushed and removed.
if m.testing {
m.httpServer.Handler = http.DebugFlush(ctx, handler, flushers)
if logconf.Level == zap.DebugLevel {
m.httpServer.Handler = http.LoggingMW(httpLogger)(m.httpServer.Handler)
}
// If we are in testing mode we allow all data to be flushed and removed.
if m.testing {
m.httpServer.Handler = http.DebugFlush(ctx, m.httpServer.Handler, flushers)
}
}

ln, err := net.Listen("tcp", m.httpBindAddress)
if err != nil {
httpLogger.Error("failed http listener", zap.Error(err))
httpLogger.Info("Stopping")
m.log.Error("failed http listener", zap.Error(err))
m.log.Info("Stopping")
return err
}

Expand All @@ -867,8 +870,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
cer, err = tls.LoadX509KeyPair(m.httpTLSCert, m.httpTLSKey)

if err != nil {
httpLogger.Error("failed to load x509 key pair", zap.Error(err))
httpLogger.Info("Stopping")
m.log.Error("failed to load x509 key pair", zap.Error(err))
m.log.Info("Stopping")
return err
}
transport = "https"
Expand All @@ -895,7 +898,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
}
log.Info("Stopping")
}(httpLogger)
}(m.log)

return nil
}
Expand Down
189 changes: 89 additions & 100 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ import (
"encoding/json"
"net/http"
_ "net/http/pprof" // used for debug pprof at the default path.
"strings"
"time"

"github.com/go-chi/chi"
"github.com/influxdata/influxdb/kit/prom"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/opentracing/opentracing-go"
kithttp "github.com/influxdata/influxdb/kit/transport/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

Expand All @@ -31,16 +28,7 @@ const (
// All other requests are passed down to the sub handler.
type Handler struct {
name string
// MetricsHandler handles metrics requests
MetricsHandler http.Handler
// ReadyHandler handles readiness checks
ReadyHandler http.Handler
// HealthHandler handles health requests
HealthHandler http.Handler
// DebugHandler handles debug requests
DebugHandler http.Handler
// Handler handles all other requests
Handler http.Handler
r chi.Router

requests *prometheus.CounterVec
requestDur *prometheus.HistogramVec
Expand All @@ -49,104 +37,97 @@ type Handler struct {
log *zap.Logger
}

// NewHandler creates a new handler with the given name.
// The name is used to tag the metrics produced by this handler.
//
// The MetricsHandler is set to the default prometheus handler.
// It is the caller's responsibility to call prometheus.MustRegister(h.PrometheusCollectors()...).
// In most cases, you want to use NewHandlerFromRegistry instead.
func NewHandler(name string) *Handler {
h := &Handler{
name: name,
MetricsHandler: promhttp.Handler(),
DebugHandler: http.DefaultServeMux,
type (
handlerOpts struct {
log *zap.Logger
apiHandler http.Handler
debugHandler http.Handler
healthHandler http.Handler
metricsHandler http.Handler
readyHandler http.Handler
}

HandlerOptFn func(opts *handlerOpts)
)

func WithLog(l *zap.Logger) HandlerOptFn {
return func(opts *handlerOpts) {
opts.log = l
}
}

func WithAPIHandler(h http.Handler) HandlerOptFn {
return func(opts *handlerOpts) {
opts.apiHandler = h
}
}

func WithDebugHandler(h http.Handler) HandlerOptFn {
return func(opts *handlerOpts) {
opts.debugHandler = h
}
}

func WithHealthHandler(h http.Handler) HandlerOptFn {
return func(opts *handlerOpts) {
opts.healthHandler = h
}
}

func WithMetricsHandler(h http.Handler) HandlerOptFn {
return func(opts *handlerOpts) {
opts.metricsHandler = h
}
}

func WithReadyHandler(h http.Handler) HandlerOptFn {
return func(opts *handlerOpts) {
opts.readyHandler = h
}
h.initMetrics()
return h
}

// NewHandlerFromRegistry creates a new handler with the given name,
// and sets the /metrics endpoint to use the metrics from the given registry,
// after self-registering h's metrics.
func NewHandlerFromRegistry(log *zap.Logger, name string, reg *prom.Registry) *Handler {
func NewHandlerFromRegistry(name string, reg *prom.Registry, opts ...HandlerOptFn) *Handler {
opt := handlerOpts{
log: zap.NewNop(),
debugHandler: http.DefaultServeMux,
healthHandler: http.HandlerFunc(HealthHandler),
metricsHandler: reg.HTTPHandler(),
readyHandler: ReadyHandler(),
}
for _, o := range opts {
o(&opt)
}

h := &Handler{
name: name,
MetricsHandler: reg.HTTPHandler(),
ReadyHandler: http.HandlerFunc(ReadyHandler),
HealthHandler: http.HandlerFunc(HealthHandler),
DebugHandler: http.DefaultServeMux,
log: log,
name: name,
log: opt.log,
}
h.initMetrics()

r := chi.NewRouter()
r.Use(
kithttp.Trace(name),
kithttp.Metrics(name, h.requests, h.requestDur),
)
{
r.Mount(MetricsPath, opt.metricsHandler)
r.Mount(ReadyPath, opt.readyHandler)
r.Mount(HealthPath, opt.healthHandler)
r.Mount(DebugPath, opt.debugHandler)
r.Mount("/", opt.apiHandler)
}
h.r = r

reg.MustRegister(h.PrometheusCollectors()...)
return h
}

// ServeHTTP delegates a request to the appropriate subhandler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var span opentracing.Span
span, r = tracing.ExtractFromHTTPRequest(r, h.name)

statusW := newStatusResponseWriter(w)
w = statusW

// record prometheus metrics and finish traces
defer func(start time.Time) {
duration := time.Since(start)
statusClass := statusW.statusCodeClass()
ua := userAgent(r)

h.requests.With(prometheus.Labels{
"handler": h.name,
"method": r.Method,
"path": r.URL.Path,
"status": statusClass,
"user_agent": ua,
}).Inc()
h.requestDur.With(prometheus.Labels{
"handler": h.name,
"method": r.Method,
"path": r.URL.Path,
"status": statusClass,
"user_agent": ua,
}).Observe(duration.Seconds())

span.LogKV("user_agent", ua)
for k, v := range r.Header {
if len(v) == 0 {
continue
}

// yeah, we don't need these
if k == "Authorization" || k == "User-Agent" {
continue
}

// If header has multiple values, only the first value will be logged on the trace.
span.LogKV(k, v[0])
}
span.Finish()
}(time.Now())

switch {
case r.URL.Path == MetricsPath:
h.MetricsHandler.ServeHTTP(w, r)
case r.URL.Path == ReadyPath:
h.ReadyHandler.ServeHTTP(w, r)
case r.URL.Path == HealthPath:
h.HealthHandler.ServeHTTP(w, r)
case strings.HasPrefix(r.URL.Path, DebugPath):
h.DebugHandler.ServeHTTP(w, r)
default:
h.Handler.ServeHTTP(w, r)
}
}

func encodeResponse(ctx context.Context, w http.ResponseWriter, code int, res interface{}) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)

return json.NewEncoder(w).Encode(res)
h.r.ServeHTTP(w, r)
}

// PrometheusCollectors satisifies prom.PrometheusCollector.
Expand All @@ -161,19 +142,27 @@ func (h *Handler) initMetrics() {
const namespace = "http"
const handlerSubsystem = "api"

labelNames := []string{"handler", "method", "path", "status", "user_agent"}
h.requests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: handlerSubsystem,
Name: "requests_total",
Help: "Number of http requests received",
}, []string{"handler", "method", "path", "status", "user_agent"})
}, labelNames)

h.requestDur = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: handlerSubsystem,
Name: "request_duration_seconds",
Help: "Time taken to respond to HTTP request",
}, []string{"handler", "method", "path", "status", "user_agent"})
}, labelNames)
}

func encodeResponse(ctx context.Context, w http.ResponseWriter, code int, res interface{}) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)

return json.NewEncoder(w).Encode(res)
}

func logEncodingError(log *zap.Logger, r *http.Request, err error) {
Expand Down
13 changes: 6 additions & 7 deletions http/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ func TestHandler_ServeHTTP(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &Handler{
name: tt.fields.name,
Handler: tt.fields.handler,
log: tt.fields.log,
}
h.initMetrics()
reg := prom.NewRegistry(zaptest.NewLogger(t))
reg.MustRegister(h.PrometheusCollectors()...)
h := NewHandlerFromRegistry(
tt.fields.name,
reg,
WithLog(tt.fields.log),
WithAPIHandler(tt.fields.handler),
)

tt.args.r.Header.Set("User-Agent", "ua1")
h.ServeHTTP(tt.args.w, tt.args.r)
Expand Down
Loading

0 comments on commit c8aabaf

Please sign in to comment.