Skip to content

Commit

Permalink
implement prometheus.Gather interface
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
  • Loading branch information
Caleb Lloyd committed Aug 25, 2022
1 parent 6a523da commit ebc942b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 75 deletions.
148 changes: 88 additions & 60 deletions surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package surveyor

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/bcrypt"
)
Expand Down Expand Up @@ -73,7 +76,8 @@ type Options struct {
JetStreamConfigDir string
Accounts bool
Logger *logrus.Logger
ConstLabels prometheus.Labels
ConstLabels prometheus.Labels // not exposed by CLI
DisableHTTPServer bool // not exposed by CLI
}

// GetDefaultOptions returns the default set of options
Expand All @@ -100,8 +104,8 @@ type Surveyor struct {
sync.Mutex
opts Options
logger *logrus.Logger
nc *nats.Conn
http net.Listener
listener net.Listener
httpServer *http.Server
promRegistry *prometheus.Registry
reconnectCtr *prometheus.CounterVec
statzC *StatzCollector
Expand Down Expand Up @@ -167,26 +171,23 @@ func connect(opts *Options, reconnectCtr *prometheus.CounterVec) (*nats.Conn, er

// NewSurveyor creates a surveyor
func NewSurveyor(opts *Options) (*Surveyor, error) {
if opts.URLs == "" {
return nil, fmt.Errorf("surveyor URLs field is required")
}

promRegistry := prometheus.NewRegistry()
reconnectCtr := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"),
Help: "Number of times the surveyor reconnected to the NATS cluster",
ConstLabels: opts.ConstLabels,
}, []string{"name"})
promRegistry.MustRegister(reconnectCtr)
nc, err := connect(opts, reconnectCtr)
if err != nil {
return nil, err
}
return &Surveyor{
nc: nc,
opts: *opts,
logger: opts.Logger,
promRegistry: promRegistry,
reconnectCtr: reconnectCtr,
observations: []*ServiceObsListener{},
observationMetrics: NewServiceObservationMetrics(promRegistry, opts.ConstLabels),
jsAPIAudits: []*JSAdvisoryListener{},
jsAPIMetrics: NewJetStreamAdvisoryMetrics(promRegistry, opts.ConstLabels),
}, nil
}
Expand All @@ -196,26 +197,18 @@ func (s *Surveyor) createStatszCollector() error {
return nil
}

nc, err := connect(&s.opts, s.reconnectCtr)
if err != nil {
return err
}

if !s.opts.Accounts {
s.logger.Debugln("Skipping per-account exports")
}

s.Lock()
s.statzC = NewStatzCollector(s.nc, s.logger, s.opts.ExpectedServers, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels)
s.Unlock()

err := s.promRegistry.Register(s.statzC)
for i := 0; i < 50 && err != nil; i++ {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
// ignore
return nil
}

s.logger.Warnf("Error registering statsz collector, will retry after 500ms: %v", err)
time.Sleep(500 * time.Millisecond)
err = s.promRegistry.Register(s.statzC)
}
return err
s.statzC = NewStatzCollector(nc, s.logger, s.opts.ExpectedServers, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels)
s.promRegistry.MustRegister(s.statzC)
return nil
}

// generates the TLS config for https
Expand Down Expand Up @@ -300,9 +293,7 @@ func (s *Surveyor) httpAuthMiddleware(next http.Handler) http.Handler {

func (s *Surveyor) httpConcurrentPollBlockMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
s.Lock()
sz := s.statzC
s.Unlock()

if sz == nil {
next.ServeHTTP(rw, r)
Expand Down Expand Up @@ -336,64 +327,60 @@ func (s *Surveyor) startHTTP() error {
var err error
var proto string
var config *tls.Config
var listener net.Listener

hp = net.JoinHostPort(s.opts.ListenAddress, strconv.Itoa(s.opts.ListenPort))

// If a certificate file has been specified, setup TLS with the
// key provided.
if s.opts.HTTPCertFile != "" {
proto = "https"
// debug
s.logger.Debugln("Certificate file specfied; using https.")
s.logger.Debugln("Certificate file specified; using https.")
config, err = s.generateHTTPTLSConfig()
if err != nil {
return err
}
s.http, err = tls.Listen("tcp", hp, config)
listener, err = tls.Listen("tcp", hp, config)
} else {
proto = "http"

// debug
s.logger.Debugln("No certificate file specified; using http.")
s.http, err = net.Listen("tcp", hp)
s.logger.Debugln("No certificate file specified; using listener.")
listener, err = net.Listen("tcp", hp)
}

s.logger.Infof("Prometheus exporter listening at %s://%s/metrics", proto, hp)

if err != nil {
s.logger.Errorf("can't start HTTP listener: %v", err)
return err
}

s.listener = listener
s.logger.Infof("Prometheus exporter listening at %s://%s/metrics", proto, hp)

mux := http.NewServeMux()
mux.Handle("/metrics", s.getScrapeHandler())
mux.HandleFunc("/healthz", func(resp http.ResponseWriter, req *http.Request) {
resp.Write([]byte("ok"))
})

srv := &http.Server{
httpServer := &http.Server{
Addr: hp,
Handler: mux,
MaxHeaderBytes: 1 << 20,
TLSConfig: config,
}
s.httpServer = httpServer

sHTTP := s.http
go func() {
for i := 0; i < 10; i++ {
var err error
if err = srv.Serve(sHTTP); err != nil {
// In a test environment, this can fail because the server is already running.
// debugf
s.logger.Errorf("Unable to start HTTP server (may already be running): %v", err)
}
err := httpServer.Serve(listener)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
s.logger.Errorf("Unable to start HTTP server (may already be running): %v", err)
}
}()

return nil
}

func (s *Surveyor) startJetStreamAdvisories() error {
s.jsAPIAudits = []*JSAdvisoryListener{}
s.jsAPIMetrics.jsAdvisoriesGauge.Set(0)

dir := s.opts.JetStreamConfigDir
Expand Down Expand Up @@ -439,6 +426,7 @@ func (s *Surveyor) startJetStreamAdvisories() error {
}

func (s *Surveyor) startObservations() error {
s.observations = []*ServiceObsListener{}
s.observationMetrics.observationsGauge.Set(0)

dir := s.opts.ObservationConfigDir
Expand Down Expand Up @@ -493,20 +481,31 @@ func (s *Surveyor) startObservations() error {

// Start starts the surveyor
func (s *Surveyor) Start() error {
if err := s.startHTTP(); err != nil {
return err
s.Lock()
defer s.Unlock()

if s.statzC == nil {
if err := s.createStatszCollector(); err != nil {
return err
}
}

if err := s.createStatszCollector(); err != nil {
return err
if s.observations == nil {
if err := s.startObservations(); err != nil {
return err
}
}

if err := s.startObservations(); err != nil {
return err
if s.jsAPIAudits == nil {
if err := s.startJetStreamAdvisories(); err != nil {
return err
}
}

if err := s.startJetStreamAdvisories(); err != nil {
return err
if !s.opts.DisableHTTPServer && s.listener == nil && s.httpServer == nil {
if err := s.startHTTP(); err != nil {
return err
}
}

return nil
Expand All @@ -515,13 +514,42 @@ func (s *Surveyor) Start() error {
// Stop stops the surveyor
func (s *Surveyor) Stop() {
s.Lock()
defer s.Unlock()

for _, o := range s.observations {
o.nc.Close()
if !s.opts.DisableHTTPServer {
if s.httpServer != nil {
_ = s.httpServer.Shutdown(context.Background())
s.httpServer = nil
}

if s.listener != nil {
_ = s.listener.Close()
s.listener = nil
}
}

if s.statzC != nil {
s.promRegistry.Unregister(s.statzC)
s.statzC.nc.Close()
s.statzC = nil
}

if s.observations != nil {
for _, o := range s.observations {
o.nc.Close()
}
s.observations = nil
}

if s.jsAPIAudits != nil {
for _, j := range s.jsAPIAudits {
j.nc.Close()
}
s.jsAPIAudits = nil
}
}

s.promRegistry.Unregister(s.statzC)
s.http.Close()
s.nc.Drain()
s.Unlock()
// Gather implements the prometheus.Gatherer interface
func (s *Surveyor) Gather() ([]*dto.MetricFamily, error) {
return s.promRegistry.Gather()
}
Loading

0 comments on commit ebc942b

Please sign in to comment.