Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc updates for consistency, typos and better error tracking #33

Merged
merged 1 commit into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions surveyor/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,83 +67,93 @@ func (o *jsAdvisoryOptions) Validate() error {
var (
// API Audit
jsAPIAuditCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_audit"),
Name: prometheus.BuildFQName("nats", "jetstream", "api_audit"),
Help: "JetStream API access audit events",
}, []string{"server", "subject", "account"})

jsAPIErrorsCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_errors"),
Name: prometheus.BuildFQName("nats", "jetstream", "api_errors"),
Help: "JetStream API Errors Count",
}, []string{"server", "subject", "account"})

// Delivery Exceeded
jsDeliveryExceededCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_exceeded_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "delivery_exceeded_count"),
Help: "Advisories about JetStream Consumer Delivery Exceeded events",
}, []string{"account", "stream", "consumer"})

jsDeliveryTerminatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_terminated_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "delivery_terminated_count"),
Help: "Advisories about JetStream Consumer Delivery Terminated events",
}, []string{"account", "stream", "consumer"})

// Ack Samples
jsAckMetricDelay = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_duration"),
Help: "How long an Acknowledged message took to be Acknowledged",
}, []string{"account", "stream", "consumer"})

jsAckMetricDeliveries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_deliveries"),
Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_deliveries"),
Help: "How many times messages took to be delivered and Acknowledged",
}, []string{"account", "stream", "consumer"})

// Misc
jsAdvisoriesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_advisory_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "advisory_count"),
Help: "Number of JetStream Advisory listeners that are running",
})

jsUnknownAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_unknown_advisories"),
Name: prometheus.BuildFQName("nats", "jetstream", "unknown_advisories"),
Help: "Unsupported JetStream Advisory types received",
}, []string{"schema", "account"})

jsTotalAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "total_advisories"),
Help: "Unsupported JetStream Advisories handled in total",
}, []string{"account"})

jsAdvisoryParseErrorCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "advisory_parse_errors"),
Help: "Number of advisories that could not be parsed",
}, []string{"account"})

// Stream and Consumer actions
jsConsumerActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_consumer_actions"),
Name: prometheus.BuildFQName("nats", "jetstream", "consumer_actions"),
Help: "Actions performed on consumers",
}, []string{"account", "stream", "action"})

jsStreamActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_stream_actions"),
Name: prometheus.BuildFQName("nats", "jetstream", "stream_actions"),
Help: "Actions performed on streams",
}, []string{"account", "stream", "action"})

// Snapshot create
jsSnapshotSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_size_bytes"),
Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_size_bytes"),
Help: "The size of snapshots being created",
}, []string{"account", "stream"})

jsSnapthotDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_duration"),
Help: "How long a snapshot takes to be processed",
}, []string{"account", "stream"})

// Restore
jsRestoreCreatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_created_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_created_count"),
Help: "How many restore operations were started",
}, []string{"account", "stream"})

jsRestoreSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_size_bytes"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_size_bytes"),
Help: "The size of restores that was completed",
}, []string{"account", "stream"})

jsRestoreDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_duration"),
Help: "How long a restore took to be processed",
}, []string{"account", "stream"})
)
Expand All @@ -164,6 +174,8 @@ func init() {
prometheus.MustRegister(jsRestoreSizeCtr)
prometheus.MustRegister(jsRestoreDuration)
prometheus.MustRegister(jsRestoreCreatedCtr)
prometheus.MustRegister(jsTotalAdvisoryCtr)
prometheus.MustRegister(jsAdvisoryParseErrorCtr)
}

// NewJetStreamAdvisoryListener creates a new JetStream advisory reporter
Expand All @@ -186,7 +198,7 @@ func NewJetStreamAdvisoryListener(f string, sopts Options) (*JSAdvisoryListener,

sopts.Name = fmt.Sprintf("%s (jetstream %s)", sopts.Name, opts.AccountName)
sopts.Credentials = opts.Credentials
nc, err := connect(&sopts)
nc, err := connect(&sopts, fmt.Sprintf("%s JetStream %s", sopts.Name, opts.AccountName))
if err != nil {
return nil, fmt.Errorf("nats connection failed: %s", err)
}
Expand Down Expand Up @@ -222,10 +234,13 @@ func (o *JSAdvisoryListener) Start() error {
func (o *JSAdvisoryListener) advisoryHandler(m *nats.Msg) {
schema, event, err := jsm.ParseEvent(m.Data)
if err != nil {
jsAdvisoryParseErrorCtr.WithLabelValues(o.opts.AccountName).Inc()
log.Printf("Could not parse JetStream API Audit Advisory: %s", err)
return
}

jsTotalAdvisoryCtr.WithLabelValues(o.opts.AccountName).Inc()

switch event := event.(type) {
case *advisory.JetStreamAPIAuditV1:
if strings.HasPrefix(event.Response, api.ErrPrefix) {
Expand Down
20 changes: 10 additions & 10 deletions surveyor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,47 @@ func (o *serviceObsOptions) Validate() error {

var (
observationsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "survey", "observerations_count"),
Name: prometheus.BuildFQName("nats", "latency", "observations_count"),
Help: "Number of Service Latency listeners that are running",
})

observationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_count",
Name: prometheus.BuildFQName("nats", "latency", "observations_received_count"),
Help: "Number of observations received by this surveyor across all services",
}, []string{"service", "app"})

serviceRequestStatus = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_status_count",
Name: prometheus.BuildFQName("nats", "latency", "observation_status_count"),
Help: "The status result codes for requests to a service",
}, []string{"service", "status"})

invalidObservationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_error_count",
Name: prometheus.BuildFQName("nats", "latency", "observation_error_count"),
Help: "Number of observations received by this surveyor across all services that could not be handled",
}, []string{"service"})

serviceLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_service_duration",
Name: prometheus.BuildFQName("nats", "latency", "service_duration"),
Help: "Time spent serving the request in the service",
}, []string{"service", "app"})

totalLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_total_duration",
Name: prometheus.BuildFQName("nats", "latency", "total_duration"),
Help: "Total time spent serving a service including network overheads",
}, []string{"service", "app"})

requestorRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_requestor_rtt",
Name: prometheus.BuildFQName("nats", "latency", "requestor_rtt"),
Help: "The RTT to the client making a request",
}, []string{"service", "app"})

responderRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_responder_rtt",
Name: prometheus.BuildFQName("nats", "latency", "responder_rtt"),
Help: "The RTT to the service serving the request",
}, []string{"service", "app"})

systemRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_system_rtt",
Name: prometheus.BuildFQName("nats", "latency", "system_rtt"),
Help: "The RTT within the NATS system - time traveling clusters, gateways and leaf nodes",
}, []string{"service", "app"})
)
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewServiceObservation(f string, sopts Options) (*ServiceObsListener, error)

sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, opts.ServiceName)
sopts.Credentials = opts.Credentials
nc, err := connect(&sopts)
nc, err := connect(&sopts, fmt.Sprintf("%s Service %s", sopts.Name, opts.ServiceName))
if err != nil {
return nil, fmt.Errorf("nats connection failed: %s", err)
}
Expand Down
34 changes: 18 additions & 16 deletions surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,41 +103,43 @@ type Surveyor struct {
jsAPIAudits []*JSAdvisoryListener
}

func connect(opts *Options) (*nats.Conn, error) {
reconnCtr := prometheus.NewCounter(prometheus.CounterOpts{
func connect(opts *Options, name string) (*nats.Conn, error) {
reconnCtr := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"),
Help: "Number of times the surveyor reconnected to the NATS cluster",
})
}, []string{"name"})
prometheus.Register(reconnCtr)

nopts := []nats.Option{nats.Name(opts.Name)}
if opts.Credentials != "" {
nopts := []nats.Option{nats.Name(name)}

switch {
case opts.Credentials != "":
nopts = append(nopts, nats.UserCredentials(opts.Credentials))
} else if opts.Nkey != "" {
case opts.Nkey != "":
o, err := nats.NkeyOptionFromSeed(opts.Nkey)
if err != nil {
return nil, fmt.Errorf("unable to load nkey: %v", err)
}
nopts = append(nopts, o)
} else if opts.NATSUser != "" {
case opts.NATSUser != "":
nopts = append(nopts, nats.UserInfo(opts.NATSUser, opts.NATSPassword))
}

nopts = append(nopts, nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
log.Printf("Disconnected, will possibly miss replies: %s", err)
nopts = append(nopts, nats.DisconnectErrHandler(func(c *nats.Conn, err error) {
log.Printf("%q disconnected, will possibly miss replies: %s", c.Opts.Name, err)
}))
nopts = append(nopts, nats.ReconnectHandler(func(c *nats.Conn) {
reconnCtr.Inc()
log.Printf("Reconnected to %v", c.ConnectedAddr())
reconnCtr.WithLabelValues(c.Opts.Name).Inc()
log.Printf("%q reconnected to %v", c.Opts.Name, c.ConnectedAddr())
}))
nopts = append(nopts, nats.ClosedHandler(func(_ *nats.Conn) {
log.Println("Connection permanently lost!")
nopts = append(nopts, nats.ClosedHandler(func(c *nats.Conn) {
log.Printf("%q connection permanently lost!", c.Opts.Name)
}))
nopts = append(nopts, nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
if s != nil {
log.Printf("Error: err=%v", err)
log.Printf("Error: name=%q err=%v", c.Opts.Name, err)
} else {
log.Printf("Error: subject=%s, err=%v", s.Subject, err)
log.Printf("Error: name=%q, subject=%s, err=%v", c.Opts.Name, s.Subject, err)
}
}))
nopts = append(nopts, nats.MaxReconnects(10240))
Expand All @@ -161,7 +163,7 @@ func connect(opts *Options) (*nats.Conn, error) {

// NewSurveyor creates a surveyor
func NewSurveyor(opts *Options) (*Surveyor, error) {
nc, err := connect(opts)
nc, err := connect(opts, fmt.Sprintf("%s Messaging", opts.Name))
if err != nil {
return nil, err
}
Expand Down