Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement prometheus.Gather interface #101

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 86 additions & 60 deletions surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package surveyor

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/bcrypt"
)
Expand Down Expand Up @@ -73,7 +76,8 @@ type Options struct {
JetStreamConfigDir string
Accounts bool
Logger *logrus.Logger
ConstLabels prometheus.Labels
ConstLabels prometheus.Labels // not exposed by CLI
DisableHTTPServer bool // not exposed by CLI
}

// GetDefaultOptions returns the default set of options
Expand All @@ -100,8 +104,8 @@ type Surveyor struct {
sync.Mutex
opts Options
logger *logrus.Logger
nc *nats.Conn
http net.Listener
listener net.Listener
httpServer *http.Server
promRegistry *prometheus.Registry
reconnectCtr *prometheus.CounterVec
statzC *StatzCollector
Expand Down Expand Up @@ -167,26 +171,23 @@ func connect(opts *Options, reconnectCtr *prometheus.CounterVec) (*nats.Conn, er

// NewSurveyor creates a surveyor
func NewSurveyor(opts *Options) (*Surveyor, error) {
if opts.URLs == "" {
return nil, fmt.Errorf("surveyor URLs field is required")
}

promRegistry := prometheus.NewRegistry()
reconnectCtr := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"),
Help: "Number of times the surveyor reconnected to the NATS cluster",
ConstLabels: opts.ConstLabels,
}, []string{"name"})
promRegistry.MustRegister(reconnectCtr)
nc, err := connect(opts, reconnectCtr)
if err != nil {
return nil, err
}
return &Surveyor{
nc: nc,
opts: *opts,
logger: opts.Logger,
promRegistry: promRegistry,
reconnectCtr: reconnectCtr,
observations: []*ServiceObsListener{},
observationMetrics: NewServiceObservationMetrics(promRegistry, opts.ConstLabels),
jsAPIAudits: []*JSAdvisoryListener{},
jsAPIMetrics: NewJetStreamAdvisoryMetrics(promRegistry, opts.ConstLabels),
}, nil
}
Expand All @@ -196,26 +197,18 @@ func (s *Surveyor) createStatszCollector() error {
return nil
}

nc, err := connect(&s.opts, s.reconnectCtr)
if err != nil {
return err
}

if !s.opts.Accounts {
s.logger.Debugln("Skipping per-account exports")
}

s.Lock()
s.statzC = NewStatzCollector(s.nc, s.logger, s.opts.ExpectedServers, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels)
s.Unlock()

err := s.promRegistry.Register(s.statzC)
for i := 0; i < 50 && err != nil; i++ {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
// ignore
return nil
}

s.logger.Warnf("Error registering statsz collector, will retry after 500ms: %v", err)
time.Sleep(500 * time.Millisecond)
err = s.promRegistry.Register(s.statzC)
}
return err
s.statzC = NewStatzCollector(nc, s.logger, s.opts.ExpectedServers, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels)
s.promRegistry.MustRegister(s.statzC)
return nil
}

// generates the TLS config for https
Expand Down Expand Up @@ -300,9 +293,7 @@ func (s *Surveyor) httpAuthMiddleware(next http.Handler) http.Handler {

func (s *Surveyor) httpConcurrentPollBlockMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
s.Lock()
sz := s.statzC
s.Unlock()

if sz == nil {
next.ServeHTTP(rw, r)
Expand Down Expand Up @@ -336,64 +327,60 @@ func (s *Surveyor) startHTTP() error {
var err error
var proto string
var config *tls.Config
var listener net.Listener

hp = net.JoinHostPort(s.opts.ListenAddress, strconv.Itoa(s.opts.ListenPort))

// If a certificate file has been specified, setup TLS with the
// key provided.
if s.opts.HTTPCertFile != "" {
proto = "https"
// debug
s.logger.Debugln("Certificate file specfied; using https.")
s.logger.Debugln("Certificate file specified; using https.")
config, err = s.generateHTTPTLSConfig()
if err != nil {
return err
}
s.http, err = tls.Listen("tcp", hp, config)
listener, err = tls.Listen("tcp", hp, config)
} else {
proto = "http"

// debug
s.logger.Debugln("No certificate file specified; using http.")
s.http, err = net.Listen("tcp", hp)
s.logger.Debugln("No certificate file specified; using listener.")
listener, err = net.Listen("tcp", hp)
}

s.logger.Infof("Prometheus exporter listening at %s://%s/metrics", proto, hp)

if err != nil {
s.logger.Errorf("can't start HTTP listener: %v", err)
return err
}

s.listener = listener
s.logger.Infof("Prometheus exporter listening at %s://%s/metrics", proto, hp)

mux := http.NewServeMux()
mux.Handle("/metrics", s.getScrapeHandler())
mux.HandleFunc("/healthz", func(resp http.ResponseWriter, req *http.Request) {
resp.Write([]byte("ok"))
})

srv := &http.Server{
httpServer := &http.Server{
Addr: hp,
Handler: mux,
MaxHeaderBytes: 1 << 20,
TLSConfig: config,
}
s.httpServer = httpServer

sHTTP := s.http
go func() {
for i := 0; i < 10; i++ {
var err error
if err = srv.Serve(sHTTP); err != nil {
// In a test environment, this can fail because the server is already running.
// debugf
s.logger.Errorf("Unable to start HTTP server (may already be running): %v", err)
}
err := httpServer.Serve(listener)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
s.logger.Errorf("Unable to start HTTP server (may already be running): %v", err)
}
}()

return nil
}

func (s *Surveyor) startJetStreamAdvisories() error {
s.jsAPIAudits = []*JSAdvisoryListener{}
s.jsAPIMetrics.jsAdvisoriesGauge.Set(0)

dir := s.opts.JetStreamConfigDir
Expand Down Expand Up @@ -439,6 +426,7 @@ func (s *Surveyor) startJetStreamAdvisories() error {
}

func (s *Surveyor) startObservations() error {
s.observations = []*ServiceObsListener{}
s.observationMetrics.observationsGauge.Set(0)

dir := s.opts.ObservationConfigDir
Expand Down Expand Up @@ -493,20 +481,31 @@ func (s *Surveyor) startObservations() error {

// Start starts the surveyor
func (s *Surveyor) Start() error {
if err := s.startHTTP(); err != nil {
return err
s.Lock()
defer s.Unlock()

if s.statzC == nil {
if err := s.createStatszCollector(); err != nil {
return err
}
}

if err := s.createStatszCollector(); err != nil {
return err
if s.observations == nil {
if err := s.startObservations(); err != nil {
return err
}
}

if err := s.startObservations(); err != nil {
return err
if s.jsAPIAudits == nil {
if err := s.startJetStreamAdvisories(); err != nil {
return err
}
}

if err := s.startJetStreamAdvisories(); err != nil {
return err
if !s.opts.DisableHTTPServer && s.listener == nil && s.httpServer == nil {
if err := s.startHTTP(); err != nil {
return err
}
}

return nil
Expand All @@ -515,13 +514,40 @@ func (s *Surveyor) Start() error {
// Stop stops the surveyor
func (s *Surveyor) Stop() {
s.Lock()
defer s.Unlock()

if s.httpServer != nil {
_ = s.httpServer.Shutdown(context.Background())
s.httpServer = nil
}

if s.listener != nil {
_ = s.listener.Close()
s.listener = nil
}

if s.statzC != nil {
s.promRegistry.Unregister(s.statzC)
s.statzC.nc.Close()
s.statzC = nil
}

if s.observations != nil {
for _, o := range s.observations {
o.nc.Close()
}
s.observations = nil
}

for _, o := range s.observations {
o.nc.Close()
if s.jsAPIAudits != nil {
for _, j := range s.jsAPIAudits {
j.nc.Close()
}
s.jsAPIAudits = nil
}
}

s.promRegistry.Unregister(s.statzC)
s.http.Close()
s.nc.Drain()
s.Unlock()
// Gather implements the prometheus.Gatherer interface
func (s *Surveyor) Gather() ([]*dto.MetricFamily, error) {
return s.promRegistry.Gather()
}
Loading