diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 6422766..ebc93c1 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -51,6 +51,7 @@ jobs: --enable nakedret \ --enable prealloc \ --enable gocritic \ + --enable misspell \ ./... - name: Run tests diff --git a/go.mod b/go.mod index b5935ab..67f3dca 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,13 @@ go 1.18 require ( github.com/antonfisher/nested-logrus-formatter v1.3.1 + github.com/fsnotify/fsnotify v1.6.0 github.com/nats-io/jsm.go v0.0.33 github.com/nats-io/nats-server/v2 v2.8.4 github.com/nats-io/nats.go v1.16.0 github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.13.0 + github.com/prometheus/client_model v0.2.0 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 @@ -20,7 +22,6 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -33,7 +34,6 @@ require ( github.com/nats-io/nkeys v0.3.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect - github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/spf13/afero v1.8.2 // indirect @@ -41,7 +41,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.3.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 4a38e7d..144d54f 100644 --- a/go.sum +++ b/go.sum @@ -73,8 +73,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= -github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= -github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -427,9 +427,10 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/surveyor/observation.go b/surveyor/observation.go index e084960..61843cc 100644 --- a/surveyor/observation.go +++ b/surveyor/observation.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,13 +17,17 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" "os" + "path/filepath" "strconv" "strings" + "github.com/fsnotify/fsnotify" "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api/server/metric" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -110,23 +114,37 @@ func NewServiceObservationMetrics(registry *prometheus.Registry, constLabels pro return metrics } -// ServiceObsListener listens for observations from nats service latency checks +// ServiceObsListener listens for observations from nats service latency checks. type ServiceObsListener struct { - nc *nats.Conn - logger *logrus.Logger - opts *serviceObsOptions - metrics *ServiceObsMetrics - sopts *Options + nc *nats.Conn + logger *logrus.Logger + observation *ServiceObservation + metrics *ServiceObsMetrics + sopts *Options } -type serviceObsOptions struct { +type ServiceObservation struct { + ID string + ObservationConfig +} + +// ObservationConfig is used to set up new service observations. +type ObservationConfig struct { ServiceName string `json:"name"` Topic string `json:"topic"` Credentials string `json:"credential"` Nkey string `json:"nkey"` } -func (o *serviceObsOptions) Validate() error { +// ObservationsManager exposes methods to operate on service observations. +type ObservationsManager struct { + surveyor *Surveyor + addObservations chan addObservationsRequest + deleteObseravations chan deleteObservationsRequest + updateObservations chan updateObservationsRequest +} + +func (o *ObservationConfig) Validate() error { errs := []string{} if o.ServiceName == "" { @@ -161,46 +179,95 @@ func (o *serviceObsOptions) Validate() error { return errors.New(strings.Join(errs, ", ")) } -// NewServiceObservation creates a new performance observation listener -func NewServiceObservation(f string, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) (*ServiceObsListener, error) { +// NewServiceObservationFromFile creates a new performance observation listener. +func NewServiceObservationFromFile(f string, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) (*ServiceObsListener, error) { js, err := os.ReadFile(f) if err != nil { return nil, err } - opts := &serviceObsOptions{} + opts := &ObservationConfig{} err = json.Unmarshal(js, opts) if err != nil { return nil, fmt.Errorf("invalid service observation configuration: %s: %s", f, err) } - err = opts.Validate() if err != nil { return nil, fmt.Errorf("invalid service observation configuration: %s: %s", f, err) } - sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, opts.ServiceName) - sopts.Credentials = opts.Credentials - sopts.Nkey = opts.Nkey + serviceObservation := &ServiceObservation{ + ID: f, + ObservationConfig: *opts, + } + obs, err := newServiceObservation(*serviceObservation, sopts, metrics, reconnectCtr) + if err != nil { + return nil, err + } + + return obs, nil +} + +func newServiceObservation(serviceObservation ServiceObservation, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) (*ServiceObsListener, error) { + err := serviceObservation.Validate() + if err != nil { + return nil, fmt.Errorf("invalid service observation configuration: %s: %s", serviceObservation.ServiceName, err) + } + + sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, serviceObservation.ServiceName) + sopts.Credentials = serviceObservation.Credentials + sopts.Nkey = serviceObservation.Nkey nc, err := connect(&sopts, reconnectCtr) if err != nil { return nil, fmt.Errorf("nats connection failed: %s", err) } return &ServiceObsListener{ - nc: nc, - logger: sopts.Logger, - opts: opts, - metrics: metrics, - sopts: &sopts, + nc: nc, + logger: sopts.Logger, + observation: &serviceObservation, + metrics: metrics, + sopts: &sopts, }, nil } -// Start starts listening for observations +func (s *Surveyor) startObservationsInDir() fs.WalkDirFunc { + return func(path string, info fs.DirEntry, err error) error { + if err != nil { + return err + } + + // skip directories starting with '..' + // this prevents double observation loading when using kubernetes mounts + if info.IsDir() && strings.HasPrefix(info.Name(), "..") { + return filepath.SkipDir + } + + if filepath.Ext(info.Name()) != ".json" { + return nil + } + + obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr) + if err != nil { + return fmt.Errorf("could not create observation from %s: %s", path, err) + } + + err = obs.Start() + if err != nil { + return fmt.Errorf("could not start observation from %s: %s", path, err) + } + + s.observations = append(s.observations, obs) + + return nil + } +} + +// Start starts listening for observations. func (o *ServiceObsListener) Start() error { - _, err := o.nc.Subscribe(o.opts.Topic, o.observationHandler) + _, err := o.nc.Subscribe(o.observation.Topic, o.observationHandler) if err != nil { - return fmt.Errorf("could not subscribe to observation topic for %s (%s): %s", o.opts.ServiceName, o.opts.Topic, err) + return fmt.Errorf("could not subscribe to observation topic for %s (%s): %s", o.observation.ServiceName, o.observation.Topic, err) } err = o.nc.Flush() if err != nil { @@ -208,7 +275,7 @@ func (o *ServiceObsListener) Start() error { } o.metrics.observationsGauge.Inc() - o.logger.Infof("Started observing stats on %s for %s", o.opts.Topic, o.opts.ServiceName) + o.logger.Infof("Started observing stats on %s for %s", o.observation.Topic, o.observation.ServiceName) return nil } @@ -216,35 +283,441 @@ func (o *ServiceObsListener) Start() error { func (o *ServiceObsListener) observationHandler(m *nats.Msg) { kind, obs, err := jsm.ParseEvent(m.Data) if err != nil { - o.metrics.invalidObservationsReceived.WithLabelValues(o.opts.ServiceName).Inc() + o.metrics.invalidObservationsReceived.WithLabelValues(o.observation.ServiceName).Inc() o.logger.Warnf("data: %s", m.Data) - o.logger.Warnf("Unparsable observation received on %s: %s", o.opts.Topic, err) + o.logger.Warnf("Unparsable observation received on %s: %s", o.observation.Topic, err) return } switch obs := obs.(type) { case *metric.ServiceLatencyV1: - o.metrics.observationsReceived.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Inc() - o.metrics.serviceLatency.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Observe(obs.ServiceLatency.Seconds()) - o.metrics.totalLatency.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Observe(obs.TotalLatency.Seconds()) - o.metrics.requestorRTT.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Observe(obs.Requestor.RTT.Seconds()) - o.metrics.responderRTT.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Observe(obs.Responder.RTT.Seconds()) - o.metrics.systemRTT.WithLabelValues(o.opts.ServiceName, obs.Responder.Name).Observe(obs.SystemLatency.Seconds()) + o.metrics.observationsReceived.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Inc() + o.metrics.serviceLatency.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Observe(obs.ServiceLatency.Seconds()) + o.metrics.totalLatency.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Observe(obs.TotalLatency.Seconds()) + o.metrics.requestorRTT.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Observe(obs.Requestor.RTT.Seconds()) + o.metrics.responderRTT.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Observe(obs.Responder.RTT.Seconds()) + o.metrics.systemRTT.WithLabelValues(o.observation.ServiceName, obs.Responder.Name).Observe(obs.SystemLatency.Seconds()) if obs.Status == 0 { - o.metrics.serviceRequestStatus.WithLabelValues(o.opts.ServiceName, "500").Inc() + o.metrics.serviceRequestStatus.WithLabelValues(o.observation.ServiceName, "500").Inc() } else { - o.metrics.serviceRequestStatus.WithLabelValues(o.opts.ServiceName, strconv.Itoa(obs.Status)).Inc() + o.metrics.serviceRequestStatus.WithLabelValues(o.observation.ServiceName, strconv.Itoa(obs.Status)).Inc() } default: - o.metrics.invalidObservationsReceived.WithLabelValues(o.opts.ServiceName).Inc() - o.logger.Warnf("Unsupported observation received on %s: %s", o.opts.Topic, kind) + o.metrics.invalidObservationsReceived.WithLabelValues(o.observation.ServiceName).Inc() + o.logger.Warnf("Unsupported observation received on %s: %s", o.observation.Topic, kind) return } } -// Stop closes the connection to the network +// Stop closes the connection to the network. func (o *ServiceObsListener) Stop() { + o.metrics.observationsGauge.Dec() o.nc.Close() } + +func (s *Surveyor) watchObservations(dir string, depth int) error { + if depth == 0 { + return fmt.Errorf("exceeded observation dir max depth") + } + if dir == "" { + return nil + } + + go func() { + s.Mutex.Lock() + if _, ok := s.observationWatchers[dir]; ok { + return + } + watcher, err := fsnotify.NewWatcher() + if err != nil { + s.logger.Errorf("error creating watcher: %s", err) + s.Mutex.Unlock() + return + } + + if err := watcher.Add(dir); err != nil { + s.logger.Errorf("error adding dir to watcher: %s", err) + s.Mutex.Unlock() + return + } + defer watcher.Close() + s.observationWatchers[dir] = struct{}{} + s.Mutex.Unlock() + s.logger.Debugf("starting listener goroutine for %s", dir) + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if err := s.handleWatcherEvent(event, depth); err != nil { + s.logger.Warn(err) + } + case <-s.stop: + return + } + } + }() + return nil +} + +func (s *Surveyor) handleWatcherEvent(event fsnotify.Event, depth int) error { + path := event.Name + s.Lock() + defer s.Unlock() + + switch { + case event.Has(fsnotify.Create): + return s.handleCreateEvent(path, depth) + case event.Has(fsnotify.Write) && !event.Has(fsnotify.Remove): + return s.handleWriteEvent(path) + case event.Has(fsnotify.Remove): + return s.handleRemoveEvent(path) + } + return nil +} + +func (s *Surveyor) handleCreateEvent(path string, depth int) error { + stat, err := os.Stat(path) + if err != nil { + return fmt.Errorf("could not read observation file %s: %s", path, err) + } + // if a new directory was created, first start all observations already in it + // and then start watching for changes in this directory (fsnotify.Watcher is not recursive) + if stat.IsDir() && !strings.HasPrefix(stat.Name(), "..") { + depth-- + err = filepath.WalkDir(path, s.startObservationsInDir()) + if err != nil { + return fmt.Errorf("could not start observation from %s: %s", path, err) + } + if err := s.watchObservations(path, depth); err != nil { + return fmt.Errorf("could not start watcher in directory %s: %s", path, err) + } + } + // if not a directory and not a JSON, ignore + if filepath.Ext(stat.Name()) != ".json" { + return nil + } + + // create new observation from json + obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr) + if err != nil { + return fmt.Errorf("could not create observation from %s: %s", path, err) + } + + // multiple create events for 1 file when using symlinks + // in such case, subsequent events should be ignored + // https://github.com/fsnotify/fsnotify/issues/277 + for _, existingObservation := range s.observations { + if *existingObservation.observation == *obs.observation { + return nil + } + } + err = obs.Start() + if err != nil { + return fmt.Errorf("could not start observation from %s: %s", path, err) + } + + s.observations = append(s.observations, obs) + return nil +} + +func (s *Surveyor) handleWriteEvent(path string) error { + fs, err := os.Stat(path) + if err != nil { + return fmt.Errorf("could not read observation file %s: %s", path, err) + } + // if not a JSON, ignore + if filepath.Ext(fs.Name()) != ".json" { + return nil + } + obs, err := NewServiceObservationFromFile(path, s.opts, s.observationMetrics, s.reconnectCtr) + if err != nil { + return fmt.Errorf("could not create observation from %s: %s", path, err) + } + + err = obs.Start() + if err != nil { + return fmt.Errorf("could not start observation from %s: %s", path, err) + } + + // if observation is updated, stop previous observation and overwrite with new one + for i, existingObservation := range s.observations { + if existingObservation.observation.ID == obs.observation.ID { + existingObservation.Stop() + s.observations[i] = obs + return nil + } + } + s.observations = append(s.observations, obs) + return nil +} + +func (s *Surveyor) handleRemoveEvent(path string) error { + // directory removed, delete all observations inside and cancel watching this dir + if _, ok := s.observationWatchers[path]; ok { + for i := 0; ; i++ { + if i > len(s.observations)-1 { + break + } + if strings.HasPrefix(s.observations[i].observation.ID, path) { + s.observations = removeObservation(s.observations, i) + i-- + } + } + delete(s.observationWatchers, path) + return nil + } + // if not a directory and not a JSON, ignore + if filepath.Ext(path) != ".json" { + return nil + } + for i := 0; ; i++ { + if i > len(s.observations)-1 { + break + } + if s.observations[i].observation.ID == path { + s.observations = removeObservation(s.observations, i) + i-- + } + } + return nil +} + +func removeObservation(observations []*ServiceObsListener, i int) []*ServiceObsListener { + if i >= len(observations) { + return observations + } + observations[i].Stop() + if i < len(observations)-1 { + observations = append(observations[:i], observations[i+1:]...) + } else { + observations = observations[:i] + } + return observations +} + +// ServiceObservationResult contains the result of adding/removing service observations. +type ServiceObservationResult struct { + ServiceObservation *ServiceObservation + Err error +} + +// DeleteObservationResult contains the result of adding/removing service observations. +type DeleteObservationResult struct { + ObservationID string + Err error +} + +type addObservationsRequest struct { + configs []ObservationConfig + resp chan ServiceObservationResult +} + +type updateObservationsRequest struct { + obs []ServiceObservation + resp chan ServiceObservationResult +} + +type deleteObservationsRequest struct { + obsIDs []string + resp chan DeleteObservationResult +} + +// ManageObservations creates an ObservationManager, allowing for adding/deleting service observations to the surveyor. +func (s *Surveyor) ManageObservations() (*ObservationsManager, error) { + obsManager := &ObservationsManager{ + surveyor: s, + addObservations: make(chan addObservationsRequest, 100), + updateObservations: make(chan updateObservationsRequest, 100), + deleteObseravations: make(chan deleteObservationsRequest, 100), + } + go func() { + for { + select { + case req := <-obsManager.addObservations: + for _, config := range req.configs { + res, err := obsManager.addObservation(config) + if err != nil { + s.logger.Warnf("adding service observation: %s", err) + } + req.resp <- ServiceObservationResult{ + ServiceObservation: res, + Err: err, + } + } + close(req.resp) + case req := <-obsManager.updateObservations: + for _, observation := range req.obs { + res, err := obsManager.updateObservation(observation) + if err != nil { + s.logger.Warnf("updating service observation: %s", err) + } + req.resp <- ServiceObservationResult{ + ServiceObservation: res, + Err: err, + } + } + close(req.resp) + case req := <-obsManager.deleteObseravations: + for _, id := range req.obsIDs { + err := obsManager.deleteObservation(id) + if err != nil { + s.logger.Warnf("deleting service observation: %s", err) + } + req.resp <- DeleteObservationResult{ + ObservationID: id, + Err: err, + } + } + close(req.resp) + case <-s.stop: + return + } + } + }() + return obsManager, nil +} + +// AddObservations creates and starts new service observations. +// The returned channel is always closed and is safe to iterate over with "range". +// +// results := obsManager.AddObservations(observations...) +// for resp := range results { +// if resp.Err != nil { +// return err +// } +// fmt.Println("Created observation with ID: ", resp.ServiceObservation.ObservationID) +// } +func (om *ObservationsManager) AddObservations(observations ...ObservationConfig) <-chan ServiceObservationResult { + resp := make(chan ServiceObservationResult, len(observations)) + + req := addObservationsRequest{ + configs: observations, + resp: resp, + } + om.addObservations <- req + return resp +} + +// DeleteObservations deletes exisiting observations with provided service names. +// The returned channel is always closed and is safe to iterate over with "range". +// +// results := obsManager.DeleteObservations(ids...) +// for resp := range results { +// if resp.Err != nil { +// return err +// } +// fmt.Println("Deleted observation with ID: ", resp.ObservationID) +// } +func (om *ObservationsManager) DeleteObservations(ids ...string) <-chan DeleteObservationResult { + resp := make(chan DeleteObservationResult, len(ids)) + om.deleteObseravations <- deleteObservationsRequest{ + obsIDs: ids, + resp: resp, + } + return resp +} + +// UpdateObservations updates exisiting observations. +// Service observation with provided ID has to exist for the update to succeed. +// The returned channel is always closed and is safe to iterate over with "range". +// +// results := obsManager.UpdateObservations(observations...) +// for resp := range results { +// if resp.Err != nil { +// return err +// } +// fmt.Println("Updated observation with ID: ", resp.ObservationID) +// } +func (om *ObservationsManager) UpdateObservations(observations ...ServiceObservation) <-chan ServiceObservationResult { + resp := make(chan ServiceObservationResult) + req := updateObservationsRequest{ + obs: observations, + resp: resp, + } + om.updateObservations <- req + return resp +} + +// GetObservations returns configs of all running service observations. +func (om *ObservationsManager) GetObservations() []ServiceObservation { + om.surveyor.Lock() + defer om.surveyor.Unlock() + observations := make([]ServiceObservation, 0, len(om.surveyor.observations)) + for _, obs := range om.surveyor.observations { + observations = append(observations, *obs.observation) + } + return observations +} + +func (om *ObservationsManager) addObservation(req ObservationConfig) (*ServiceObservation, error) { + om.surveyor.Lock() + defer om.surveyor.Unlock() + serviceObservation := ServiceObservation{ + ID: nuid.Next(), + ObservationConfig: req, + } + obs, err := newServiceObservation(serviceObservation, om.surveyor.opts, om.surveyor.observationMetrics, om.surveyor.reconnectCtr) + if err != nil { + return nil, fmt.Errorf("could not create observation from config: %s: %s", req.ServiceName, err) + } + + if err := obs.Start(); err != nil { + return nil, fmt.Errorf("could not start observation for service: %s: %s", req.ServiceName, err) + } + + om.surveyor.observations = append(om.surveyor.observations, obs) + return obs.observation, nil +} + +func (om *ObservationsManager) updateObservation(req ServiceObservation) (*ServiceObservation, error) { + om.surveyor.Lock() + defer om.surveyor.Unlock() + var found bool + var obsIndex int + for i, existingObservation := range om.surveyor.observations { + if req.ID == existingObservation.observation.ID { + found = true + obsIndex = i + break + } + } + if !found { + return nil, fmt.Errorf("observation with provided ID does not exist: %s", req.ID) + } + obs, err := newServiceObservation(req, om.surveyor.opts, om.surveyor.observationMetrics, om.surveyor.reconnectCtr) + if err != nil { + return nil, fmt.Errorf("could not create observation from config: %s: %s", req.ServiceName, err) + } + if err := obs.Start(); err != nil { + return nil, fmt.Errorf("could not start observation for service: %s: %s", req.ServiceName, err) + } + + om.surveyor.observations[obsIndex].Stop() + om.surveyor.observations[obsIndex] = obs + return obs.observation, nil +} + +func (om *ObservationsManager) deleteObservation(id string) error { + om.surveyor.Lock() + defer om.surveyor.Unlock() + var found bool + for i, existingObservation := range om.surveyor.observations { + if id == existingObservation.observation.ID { + found = true + existingObservation.Stop() + if i < len(om.surveyor.observations)-1 { + om.surveyor.observations = append(om.surveyor.observations[:i], om.surveyor.observations[i+1:]...) + } else { + om.surveyor.observations = om.surveyor.observations[:i] + } + } + } + if !found { + return fmt.Errorf("observation with given ID does not exist: %s", id) + } + return nil +} diff --git a/surveyor/observation_test.go b/surveyor/observation_test.go index c1f44f5..646952b 100644 --- a/surveyor/observation_test.go +++ b/surveyor/observation_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -37,23 +37,56 @@ func TestServiceObservation_Load(t *testing.T) { Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) - obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + obs, err := NewServiceObservationFromFile("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) if err != nil { t.Fatalf("observation load error: %s", err) } obs.Stop() - _, err = NewServiceObservation("testdata/badobs/missing.json", *opt, metrics, reconnectCtr) + _, err = NewServiceObservationFromFile("testdata/badobs/missing.json", *opt, metrics, reconnectCtr) if err.Error() != "open testdata/badobs/missing.json: no such file or directory" { t.Fatalf("observation load error: %s", err) } - _, err = NewServiceObservation("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) + _, err = NewServiceObservationFromFile("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) if err.Error() != "invalid service observation configuration: testdata/badobs/bad.json: name is required, topic is required, jwt or nkey credentials is required" { t.Fatalf("observation load error: %s", err) } - _, err = NewServiceObservation("testdata/badobs/badauth.json", *opt, metrics, reconnectCtr) + _, err = NewServiceObservationFromFile("testdata/badobs/badauth.json", *opt, metrics, reconnectCtr) + if err.Error() != "nats connection failed: nats: Authorization Violation" { + t.Fatalf("observation load error: %s", err) + } +} + +func TestServiceObservation_LoadDynamically(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opt := getTestOptions() + metrics := NewServiceObservationMetrics(prometheus.NewRegistry(), nil) + reconnectCtr := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), + Help: "Number of times the surveyor reconnected to the NATS cluster", + }, []string{"name"}) + + obs, err := NewServiceObservationFromFile("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + if err != nil { + t.Fatalf("observation load error: %s", err) + } + obs.Stop() + + _, err = NewServiceObservationFromFile("testdata/badobs/missing.json", *opt, metrics, reconnectCtr) + if err.Error() != "open testdata/badobs/missing.json: no such file or directory" { + t.Fatalf("observation load error: %s", err) + } + + _, err = NewServiceObservationFromFile("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) + if err.Error() != "invalid service observation configuration: testdata/badobs/bad.json: name is required, topic is required, jwt or nkey credentials is required" { + t.Fatalf("observation load error: %s", err) + } + + _, err = NewServiceObservationFromFile("testdata/badobs/badauth.json", *opt, metrics, reconnectCtr) if err.Error() != "nats connection failed: nats: Authorization Violation" { t.Fatalf("observation load error: %s", err) } @@ -70,7 +103,7 @@ func TestServiceObservation_Handle(t *testing.T) { Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) - obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + obs, err := NewServiceObservationFromFile("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) if err != nil { t.Fatalf("observation load error: %s", err) } diff --git a/surveyor/surveyor.go b/surveyor/surveyor.go index b52d22a..58dc083 100644 --- a/surveyor/surveyor.go +++ b/surveyor/surveyor.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -102,17 +102,19 @@ func GetDefaultOptions() *Options { // A Surveyor instance type Surveyor struct { sync.Mutex - opts Options - logger *logrus.Logger - listener net.Listener - httpServer *http.Server - promRegistry *prometheus.Registry - reconnectCtr *prometheus.CounterVec - statzC *StatzCollector - observationMetrics *ServiceObsMetrics - observations []*ServiceObsListener - jsAPIMetrics *JSAdvisoryMetrics - jsAPIAudits []*JSAdvisoryListener + opts Options + logger *logrus.Logger + listener net.Listener + httpServer *http.Server + promRegistry *prometheus.Registry + reconnectCtr *prometheus.CounterVec + statzC *StatzCollector + observationMetrics *ServiceObsMetrics + observations []*ServiceObsListener + jsAPIMetrics *JSAdvisoryMetrics + jsAPIAudits []*JSAdvisoryListener + observationWatchers map[string]struct{} + stop chan struct{} } func connect(opts *Options, reconnectCtr *prometheus.CounterVec) (*nats.Conn, error) { @@ -400,6 +402,7 @@ func (s *Surveyor) startJetStreamAdvisories() error { return fmt.Errorf("JetStream API Audit dir %s is not a directory", dir) } + // TODO: new watcher should be created in each directory err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -429,6 +432,7 @@ func (s *Surveyor) startJetStreamAdvisories() error { func (s *Surveyor) startObservations() error { s.observations = []*ServiceObsListener{} + s.observationWatchers = make(map[string]struct{}) s.observationMetrics.observationsGauge.Set(0) dir := s.opts.ObservationConfigDir @@ -446,39 +450,19 @@ func (s *Surveyor) startObservations() error { return fmt.Errorf("observations dir %s is not a directory", dir) } - err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if filepath.Ext(info.Name()) != ".json" { - return nil - } - - obs, err := NewServiceObservation(path, s.opts, s.observationMetrics, s.reconnectCtr) - if err != nil { - return fmt.Errorf("could not create observation from %s: %s", path, err) - } - - // Prevent an equal observation to be loaded twice - // This is a problem that occurs with k8s mounts - for _, existingObservation := range s.observations { - if obs.opts.ServiceName == existingObservation.opts.ServiceName { - return nil - } - } - - err = obs.Start() - if err != nil { - return fmt.Errorf("could not start observation from %s: %s", path, err) - } - - s.observations = append(s.observations, obs) + err = filepath.WalkDir(dir, s.startObservationsInDir()) + if err != nil { + return err + } + if err := s.watchObservations(dir, 5); err != nil { + return err + } - return nil - }) + return nil +} - return err +func (s *Surveyor) Observations() []*ServiceObsListener { + return s.observations } // Start starts the surveyor @@ -486,6 +470,8 @@ func (s *Surveyor) Start() error { s.Lock() defer s.Unlock() + s.stop = make(chan struct{}) + if s.statzC == nil { if err := s.createStatszCollector(); err != nil { return err @@ -547,6 +533,7 @@ func (s *Surveyor) Stop() { } s.jsAPIAudits = nil } + close(s.stop) } // Gather implements the prometheus.Gatherer interface diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 4567047..4212fa5 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package surveyor import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io" "net/http" @@ -414,7 +415,7 @@ func TestSurveyor_MissingResponses(t *testing.T) { } } -func TestSurveyor_Observations(t *testing.T) { +func TestSurveyor_ObservationsFromFile(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown() @@ -435,6 +436,425 @@ func TestSurveyor_Observations(t *testing.T) { } } +func TestSurveyor_Observations(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + + expectedObservations := make(map[string]ObservationConfig) + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + obsManager, err := s.ManageObservations() + if err != nil { + t.Fatalf("Error creating observations manager: %s", err) + } + + observations := []ObservationConfig{ + { + ServiceName: "srv1", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + { + ServiceName: "srv2", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + { + ServiceName: "srv3", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + } + results := obsManager.AddObservations(observations...) + + obsIDs := make([]string, 0) + for resp := range results { + if resp.Err != nil { + t.Errorf("Unexpected error on observation add request: %s", resp.Err) + } + obsIDs = append(obsIDs, resp.ServiceObservation.ID) + expectedObservations[resp.ServiceObservation.ID] = resp.ServiceObservation.ObservationConfig + } + waitForMetricUpdate(t, obsManager, expectedObservations) + + updateObservationReq := ServiceObservation{ + ID: obsIDs[0], + ObservationConfig: ObservationConfig{ + ServiceName: "srv4", + Topic: "testing_updated.topic", + Credentials: "../test/myuser.creds", + }, + } + expectedObservations[obsIDs[0]] = updateObservationReq.ObservationConfig + results = obsManager.UpdateObservations(updateObservationReq) + for resp := range results { + if resp.Err != nil { + t.Fatalf("Unexpected error during observation update: %s", resp.Err) + } + } + waitForMetricUpdate(t, obsManager, expectedObservations) + var found bool + for _, obs := range obsManager.GetObservations() { + if obs.ServiceName == "srv4" { + found = true + break + } + } + + if !found { + t.Errorf("Expected updated service name in observations: %s", "srv4") + } + deleteID := obsIDs[0] + deleteResult := obsManager.DeleteObservations(deleteID) + delete(expectedObservations, deleteID) + resp := <-deleteResult + if resp.Err != nil { + t.Errorf("Unexpected error on observation delete request: %s", resp.Err) + } + waitForMetricUpdate(t, obsManager, expectedObservations) + + // observation no longer exists + deleteResult = obsManager.DeleteObservations(deleteID) + resp = <-deleteResult + if resp.Err == nil { + t.Error("Expected error; got nil") + } + waitForMetricUpdate(t, obsManager, expectedObservations) +} + +func TestSurveyor_ObservationsError(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + obsManager, err := s.ManageObservations() + if err != nil { + t.Fatalf("Error creating observations manager: %s", err) + } + + // add invalid observation (missing service name) + result := obsManager.AddObservations( + ObservationConfig{ + ServiceName: "", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + ) + + var addErr error + for resp := range result { + if resp.Err != nil { + addErr = resp.Err + } + } + if addErr == nil { + t.Errorf("Expected error; got nil") + } + + // valid observation, no error + result = obsManager.AddObservations( + ObservationConfig{ + ServiceName: "srv", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + ) + + addErr = nil + for resp := range result { + if resp.Err != nil { + addErr = resp.Err + } + } + if addErr != nil { + t.Errorf("Expected no error; got: %s", addErr) + } + + // update error, invalid config + result = obsManager.UpdateObservations( + ServiceObservation{ + ID: "srv", + ObservationConfig: ObservationConfig{ + ServiceName: "srv", + Topic: "", + Credentials: "../test/myuser.creds", + }, + }, + ) + + var updateErr error + for resp := range result { + if resp.Err != nil { + updateErr = resp.Err + } + } + if updateErr == nil { + t.Errorf("Expected error; got nil") + } +} + +func waitForMetricUpdate(t *testing.T, om *ObservationsManager, expectedObservations map[string]ObservationConfig) { + t.Helper() + ticker := time.NewTicker(50 * time.Millisecond) + timeout := time.After(5 * time.Second) + defer ticker.Stop() +Outer: + for { + select { + case <-ticker.C: + observationsNum := ptu.ToFloat64(om.surveyor.observationMetrics.observationsGauge) + if observationsNum == float64(len(expectedObservations)) { + break Outer + } + case <-timeout: + observationsNum := ptu.ToFloat64(om.surveyor.observationMetrics.observationsGauge) + t.Fatalf("process error: invalid number of observations; want: %d; got: %f\n", len(expectedObservations), observationsNum) + return + } + } + + existingObservations := om.GetObservations() + if len(existingObservations) != len(expectedObservations) { + t.Fatalf("Unexpected number of observations; want: %d; got: %d", len(expectedObservations), len(existingObservations)) + } + for _, existingObservation := range existingObservations { + obs, ok := expectedObservations[existingObservation.ID] + if !ok { + t.Fatalf("Missing observation with ID: %s", existingObservation.ID) + } + if obs != existingObservation.ObservationConfig { + t.Fatalf("Invalid observation config; want: %+v; got: %+v", obs, existingObservation.ObservationConfig) + } + } +} + +func TestSurveyor_ObservationsWatcher(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + dirName := fmt.Sprintf("testdata/obs%d", time.Now().UnixNano()) + if err := os.Mkdir(dirName, 0700); err != nil { + t.Fatalf("Error creating observations dir: %s", err) + } + defer os.RemoveAll(dirName) + opts.ObservationConfigDir = dirName + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + time.Sleep(200 * time.Millisecond) + + defer s.Stop() + + om, err := s.ManageObservations() + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + expectedObservations := make(map[string]ObservationConfig) + + t.Run("write observation file - create operation", func(t *testing.T) { + obsConfig := ObservationConfig{ + ServiceName: "testing1", + Topic: "testing1.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + }) + + t.Run("first create then write to file - write operation", func(t *testing.T) { + obsConfig := ObservationConfig{ + ServiceName: "testing2", + Topic: "testing2.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath := fmt.Sprintf("%s/write.json", dirName) + f, err := os.Create(obsPath) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Error closing file: %s", err) + } + time.Sleep(200 * time.Millisecond) + if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + }) + + t.Run("create observations in subfolder", func(t *testing.T) { + obsConfig := ObservationConfig{ + ServiceName: "testing3", + Topic: "testing3.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + if err := os.Mkdir(fmt.Sprintf("%s/subdir", dirName), 0700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + obsPath := fmt.Sprintf("%s/subdir/subobs.json", dirName) + + err = os.WriteFile(obsPath, obsConfigJSON, 0600) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + + obsConfig = ObservationConfig{ + ServiceName: "testing4", + Topic: "testing4.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err = json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath = fmt.Sprintf("%s/subdir/abc.json", dirName) + + if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + + obsConfig = ObservationConfig{ + ServiceName: "testing5", + Topic: "testing5.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err = json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + if err := os.Mkdir(fmt.Sprintf("%s/subdir/nested", dirName), 0700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + obsPath = fmt.Sprintf("%s/subdir/nested/nested.json", dirName) + err = os.WriteFile(obsPath, obsConfigJSON, 0600) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + }) + + t.Run("update observations", func(t *testing.T) { + obsConfig := ObservationConfig{ + ServiceName: "testing_updated", + Topic: "testing_updated.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + obsPath := fmt.Sprintf("%s/write.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + + // update file with invalid JSON - existing observation should not be impacted + if err := os.WriteFile(obsPath, []byte("abc"), 0600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + time.Sleep(100 * time.Millisecond) + waitForMetricUpdate(t, om, expectedObservations) + }) + + t.Run("remove observations", func(t *testing.T) { + // remove single observation + obsPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.Remove(obsPath); err != nil { + t.Fatalf("Error removing observation config: %s", err) + } + delete(expectedObservations, obsPath) + waitForMetricUpdate(t, om, expectedObservations) + + // remove whole subfolder + if err := os.RemoveAll(fmt.Sprintf("%s/subdir", dirName)); err != nil { + t.Fatalf("Error removing subdirectory: %s", err) + } + + delete(expectedObservations, fmt.Sprintf("%s/subdir/subobs.json", dirName)) + delete(expectedObservations, fmt.Sprintf("%s/subdir/abc.json", dirName)) + delete(expectedObservations, fmt.Sprintf("%s/subdir/nested/nested.json", dirName)) + waitForMetricUpdate(t, om, expectedObservations) + + obsConfig := ObservationConfig{ + ServiceName: "testing10", + Topic: "testing1.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + obsPath = fmt.Sprintf("%s/another.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + expectedObservations[obsPath] = obsConfig + waitForMetricUpdate(t, om, expectedObservations) + }) +} + func TestSurveyor_ConcurrentBlock(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown()