diff --git a/surveyor/jetstream.go b/surveyor/jetstream.go index 220444c..dd71239 100644 --- a/surveyor/jetstream.go +++ b/surveyor/jetstream.go @@ -67,83 +67,93 @@ func (o *jsAdvisoryOptions) Validate() error { var ( // API Audit jsAPIAuditCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_audit"), + Name: prometheus.BuildFQName("nats", "jetstream", "api_audit"), Help: "JetStream API access audit events", }, []string{"server", "subject", "account"}) jsAPIErrorsCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_errors"), + Name: prometheus.BuildFQName("nats", "jetstream", "api_errors"), Help: "JetStream API Errors Count", }, []string{"server", "subject", "account"}) // Delivery Exceeded jsDeliveryExceededCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_exceeded_count"), + Name: prometheus.BuildFQName("nats", "jetstream", "delivery_exceeded_count"), Help: "Advisories about JetStream Consumer Delivery Exceeded events", }, []string{"account", "stream", "consumer"}) jsDeliveryTerminatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_terminated_count"), + Name: prometheus.BuildFQName("nats", "jetstream", "delivery_terminated_count"), Help: "Advisories about JetStream Consumer Delivery Terminated events", }, []string{"account", "stream", "consumer"}) // Ack Samples jsAckMetricDelay = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_duration"), + Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_duration"), Help: "How long an Acknowledged message took to be Acknowledged", }, []string{"account", "stream", "consumer"}) jsAckMetricDeliveries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_deliveries"), + Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_deliveries"), Help: "How many times messages took to be delivered and Acknowledged", }, []string{"account", "stream", "consumer"}) // Misc jsAdvisoriesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_advisory_count"), + Name: prometheus.BuildFQName("nats", "jetstream", "advisory_count"), Help: "Number of JetStream Advisory listeners that are running", }) jsUnknownAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_unknown_advisories"), + Name: prometheus.BuildFQName("nats", "jetstream", "unknown_advisories"), Help: "Unsupported JetStream Advisory types received", }, []string{"schema", "account"}) + jsTotalAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("nats", "jetstream", "total_advisories"), + Help: "Unsupported JetStream Advisories handled in total", + }, []string{"account"}) + + jsAdvisoryParseErrorCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("nats", "jetstream", "advisory_parse_errors"), + Help: "Number of advisories that could not be parsed", + }, []string{"account"}) + // Stream and Consumer actions jsConsumerActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_consumer_actions"), + Name: prometheus.BuildFQName("nats", "jetstream", "consumer_actions"), Help: "Actions performed on consumers", }, []string{"account", "stream", "action"}) jsStreamActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_stream_actions"), + Name: prometheus.BuildFQName("nats", "jetstream", "stream_actions"), Help: "Actions performed on streams", }, []string{"account", "stream", "action"}) // Snapshot create jsSnapshotSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_size_bytes"), + Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_size_bytes"), Help: "The size of snapshots being created", }, []string{"account", "stream"}) jsSnapthotDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_duration"), + Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_duration"), Help: "How long a snapshot takes to be processed", }, []string{"account", "stream"}) // Restore jsRestoreCreatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_created_count"), + Name: prometheus.BuildFQName("nats", "jetstream", "restore_created_count"), Help: "How many restore operations were started", }, []string{"account", "stream"}) jsRestoreSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_size_bytes"), + Name: prometheus.BuildFQName("nats", "jetstream", "restore_size_bytes"), Help: "The size of restores that was completed", }, []string{"account", "stream"}) jsRestoreDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_duration"), + Name: prometheus.BuildFQName("nats", "jetstream", "restore_duration"), Help: "How long a restore took to be processed", }, []string{"account", "stream"}) ) @@ -164,6 +174,8 @@ func init() { prometheus.MustRegister(jsRestoreSizeCtr) prometheus.MustRegister(jsRestoreDuration) prometheus.MustRegister(jsRestoreCreatedCtr) + prometheus.MustRegister(jsTotalAdvisoryCtr) + prometheus.MustRegister(jsAdvisoryParseErrorCtr) } // NewJetStreamAdvisoryListener creates a new JetStream advisory reporter @@ -186,7 +198,7 @@ func NewJetStreamAdvisoryListener(f string, sopts Options) (*JSAdvisoryListener, sopts.Name = fmt.Sprintf("%s (jetstream %s)", sopts.Name, opts.AccountName) sopts.Credentials = opts.Credentials - nc, err := connect(&sopts) + nc, err := connect(&sopts, fmt.Sprintf("%s JetStream %s", sopts.Name, opts.AccountName)) if err != nil { return nil, fmt.Errorf("nats connection failed: %s", err) } @@ -222,10 +234,13 @@ func (o *JSAdvisoryListener) Start() error { func (o *JSAdvisoryListener) advisoryHandler(m *nats.Msg) { schema, event, err := jsm.ParseEvent(m.Data) if err != nil { + jsAdvisoryParseErrorCtr.WithLabelValues(o.opts.AccountName).Inc() log.Printf("Could not parse JetStream API Audit Advisory: %s", err) return } + jsTotalAdvisoryCtr.WithLabelValues(o.opts.AccountName).Inc() + switch event := event.(type) { case *advisory.JetStreamAPIAuditV1: if strings.HasPrefix(event.Response, api.ErrPrefix) { diff --git a/surveyor/observation.go b/surveyor/observation.go index c921d27..5dccfab 100644 --- a/surveyor/observation.go +++ b/surveyor/observation.go @@ -71,47 +71,47 @@ func (o *serviceObsOptions) Validate() error { var ( observationsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName("nats", "survey", "observerations_count"), + Name: prometheus.BuildFQName("nats", "latency", "observations_count"), Help: "Number of Service Latency listeners that are running", }) observationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "nats_latency_observation_count", + Name: prometheus.BuildFQName("nats", "latency", "observations_received_count"), Help: "Number of observations received by this surveyor across all services", }, []string{"service", "app"}) serviceRequestStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "nats_latency_observation_status_count", + Name: prometheus.BuildFQName("nats", "latency", "observation_status_count"), Help: "The status result codes for requests to a service", }, []string{"service", "status"}) invalidObservationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "nats_latency_observation_error_count", + Name: prometheus.BuildFQName("nats", "latency", "observation_error_count"), Help: "Number of observations received by this surveyor across all services that could not be handled", }, []string{"service"}) serviceLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "nats_latency_service_duration", + Name: prometheus.BuildFQName("nats", "latency", "service_duration"), Help: "Time spent serving the request in the service", }, []string{"service", "app"}) totalLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "nats_latency_total_duration", + Name: prometheus.BuildFQName("nats", "latency", "total_duration"), Help: "Total time spent serving a service including network overheads", }, []string{"service", "app"}) requestorRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "nats_latency_requestor_rtt", + Name: prometheus.BuildFQName("nats", "latency", "requestor_rtt"), Help: "The RTT to the client making a request", }, []string{"service", "app"}) responderRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "nats_latency_responder_rtt", + Name: prometheus.BuildFQName("nats", "latency", "responder_rtt"), Help: "The RTT to the service serving the request", }, []string{"service", "app"}) systemRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "nats_latency_system_rtt", + Name: prometheus.BuildFQName("nats", "latency", "system_rtt"), Help: "The RTT within the NATS system - time traveling clusters, gateways and leaf nodes", }, []string{"service", "app"}) ) @@ -148,7 +148,7 @@ func NewServiceObservation(f string, sopts Options) (*ServiceObsListener, error) sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, opts.ServiceName) sopts.Credentials = opts.Credentials - nc, err := connect(&sopts) + nc, err := connect(&sopts, fmt.Sprintf("%s Service %s", sopts.Name, opts.ServiceName)) if err != nil { return nil, fmt.Errorf("nats connection failed: %s", err) } diff --git a/surveyor/surveyor.go b/surveyor/surveyor.go index 23db4eb..ab2d289 100644 --- a/surveyor/surveyor.go +++ b/surveyor/surveyor.go @@ -103,41 +103,43 @@ type Surveyor struct { jsAPIAudits []*JSAdvisoryListener } -func connect(opts *Options) (*nats.Conn, error) { - reconnCtr := prometheus.NewCounter(prometheus.CounterOpts{ +func connect(opts *Options, name string) (*nats.Conn, error) { + reconnCtr := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), Help: "Number of times the surveyor reconnected to the NATS cluster", - }) + }, []string{"name"}) prometheus.Register(reconnCtr) - nopts := []nats.Option{nats.Name(opts.Name)} - if opts.Credentials != "" { + nopts := []nats.Option{nats.Name(name)} + + switch { + case opts.Credentials != "": nopts = append(nopts, nats.UserCredentials(opts.Credentials)) - } else if opts.Nkey != "" { + case opts.Nkey != "": o, err := nats.NkeyOptionFromSeed(opts.Nkey) if err != nil { return nil, fmt.Errorf("unable to load nkey: %v", err) } nopts = append(nopts, o) - } else if opts.NATSUser != "" { + case opts.NATSUser != "": nopts = append(nopts, nats.UserInfo(opts.NATSUser, opts.NATSPassword)) } - nopts = append(nopts, nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { - log.Printf("Disconnected, will possibly miss replies: %s", err) + nopts = append(nopts, nats.DisconnectErrHandler(func(c *nats.Conn, err error) { + log.Printf("%q disconnected, will possibly miss replies: %s", c.Opts.Name, err) })) nopts = append(nopts, nats.ReconnectHandler(func(c *nats.Conn) { - reconnCtr.Inc() - log.Printf("Reconnected to %v", c.ConnectedAddr()) + reconnCtr.WithLabelValues(c.Opts.Name).Inc() + log.Printf("%q reconnected to %v", c.Opts.Name, c.ConnectedAddr()) })) - nopts = append(nopts, nats.ClosedHandler(func(_ *nats.Conn) { - log.Println("Connection permanently lost!") + nopts = append(nopts, nats.ClosedHandler(func(c *nats.Conn) { + log.Printf("%q connection permanently lost!", c.Opts.Name) })) nopts = append(nopts, nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { if s != nil { - log.Printf("Error: err=%v", err) + log.Printf("Error: name=%q err=%v", c.Opts.Name, err) } else { - log.Printf("Error: subject=%s, err=%v", s.Subject, err) + log.Printf("Error: name=%q, subject=%s, err=%v", c.Opts.Name, s.Subject, err) } })) nopts = append(nopts, nats.MaxReconnects(10240)) @@ -161,7 +163,7 @@ func connect(opts *Options) (*nats.Conn, error) { // NewSurveyor creates a surveyor func NewSurveyor(opts *Options) (*Surveyor, error) { - nc, err := connect(opts) + nc, err := connect(opts, fmt.Sprintf("%s Messaging", opts.Name)) if err != nil { return nil, err }