diff --git a/cmd/monitoring/README.md b/cmd/monitoring/README.md index 9425aec80..14d2d6aa8 100644 --- a/cmd/monitoring/README.md +++ b/cmd/monitoring/README.md @@ -61,9 +61,10 @@ KAFKA_TRANSMISSION_TOPIC="transmission_topic" \ SCHEMA_REGISTRY_URL="http://localhost:8989" \ SCHEMA_REGISTRY_USERNAME="" \ SCHEMA_REGISTRY_PASSWORD="" \ -FEEDS_FILE_PATH="/tmp/feeds.json" \ HTTP_ADDRESS="localhost:3000" \ -FEATURE_TEST_MODE=true \ +FEEDS_URL="http://localhost:4000" \ +FEATURE_TEST_ONLY_FAKE_READERS=true \ +FEATURE_TEST_ONLY_FAKE_RDD=true \ go run ./cmd/monitoring/main.go ``` diff --git a/cmd/monitoring/main.go b/cmd/monitoring/main.go index 25d4a9c76..71f9a04da 100644 --- a/cmd/monitoring/main.go +++ b/cmd/monitoring/main.go @@ -45,7 +45,7 @@ func main() { if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalw("failed to start http server", "address", cfg.Http.Address, "error", err) } else { - log.Info("http server closed") + log.Info("http server stopped") } }() @@ -71,7 +71,7 @@ func main() { } var transmissionReader, stateReader monitoring.AccountReader - if cfg.Feature.TestMode { + if cfg.Feature.TestOnlyFakeReaders { transmissionReader = monitoring.NewRandomDataReader(bgCtx, wg, "transmission", log.With("component", "rand-reader", "account", "transmissions")) stateReader = monitoring.NewRandomDataReader(bgCtx, wg, "state", log.With("component", "rand-reader", "account", "state")) } else { @@ -81,7 +81,6 @@ func main() { monitor := monitoring.NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, log, transmissionReader, stateReader, @@ -96,11 +95,40 @@ func main() { configSetSimplifiedSchema, transmissionSchema, ) - wg.Add(1) - go func() { - defer wg.Done() - monitor.Start(bgCtx, wg) - }() + + if cfg.Feeds.FilePath != "" { + wg.Add(1) + go func() { + defer wg.Done() + monitor.Start(bgCtx, wg, cfg.Feeds.Feeds) + }() + } else if cfg.Feeds.URL != "" { + source := monitoring.NewRDDSource(cfg.Feeds.URL) + if cfg.Feature.TestOnlyFakeRdd { + source = monitoring.NewFakeRDDSource(2, 10) + } + rddPoller := monitoring.NewSourcePoller( + source, + log.With("component", "rdd-poller"), + cfg.Feeds.RDDPollInterval, + cfg.Feeds.RDDReadTimeout, + 0, // no buffering! + ) + wg.Add(1) + go func() { + defer wg.Done() + rddPoller.Start(bgCtx) + }() + manager := monitoring.NewManager( + log.With("component", "manager"), + rddPoller, + ) + wg.Add(1) + go func() { + defer wg.Done() + manager.Start(bgCtx, wg, monitor.Start) + }() + } // the config package makes sure there is either a FilePath or a URL set! osSignalsCh := make(chan os.Signal, 1) signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM) @@ -112,7 +140,7 @@ func main() { log.Errorw("failed to shut http server down", "error", err) } wg.Wait() - log.Info("monitor stopped") + log.Info("process stopped") } // logger config diff --git a/pkg/monitoring/config/config.go b/pkg/monitoring/config/config.go index a004b4e6c..18749b657 100644 --- a/pkg/monitoring/config/config.go +++ b/pkg/monitoring/config/config.go @@ -104,26 +104,33 @@ func parseEnvVars(cfg *Config) error { if err != nil { return fmt.Errorf("failed to parse env var FEEDS_RDD_READ_TIMEOUT, see https://pkg.go.dev/time#ParseDuration: %w", err) } - cfg.Feeds.RddReadTimeout = readTimeout + cfg.Feeds.RDDReadTimeout = readTimeout } if value, isPresent := os.LookupEnv("FEEDS_RDD_POLL_INTERVAL"); isPresent { pollInterval, err := time.ParseDuration(value) if err != nil { return fmt.Errorf("failed to parse env var FEEDS_RDD_POLL_INTERVAL, see https://pkg.go.dev/time#ParseDuration: %w", err) } - cfg.Feeds.RddPollInterval = pollInterval + cfg.Feeds.RDDPollInterval = pollInterval } if value, isPresent := os.LookupEnv("HTTP_ADDRESS"); isPresent { cfg.Http.Address = value } - if value, isPresent := os.LookupEnv("FEATURE_TEST_MODE"); isPresent { + if value, isPresent := os.LookupEnv("FEATURE_TEST_ONLY_FAKE_READERS"); isPresent { isTestMode, err := strconv.ParseBool(value) if err != nil { - return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_MODE") + return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_ONLY_FAKE_READERS") } - cfg.Feature.TestMode = isTestMode + cfg.Feature.TestOnlyFakeReaders = isTestMode + } + if value, isPresent := os.LookupEnv("FEATURE_TEST_ONLY_FAKE_RDD"); isPresent { + isTestMode, err := strconv.ParseBool(value) + if err != nil { + return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_ONLY_FAKE_RDD") + } + cfg.Feature.TestOnlyFakeRdd = isTestMode } return nil diff --git a/pkg/monitoring/config/defaults.go b/pkg/monitoring/config/defaults.go index 978af5e5e..a56eb1cde 100644 --- a/pkg/monitoring/config/defaults.go +++ b/pkg/monitoring/config/defaults.go @@ -9,10 +9,10 @@ func applyDefaults(cfg *Config) { if cfg.Solana.PollInterval == 0 { cfg.Solana.PollInterval = 5 * time.Second } - if cfg.Feeds.RddReadTimeout == 0 { - cfg.Feeds.RddReadTimeout = 1 * time.Second + if cfg.Feeds.RDDReadTimeout == 0 { + cfg.Feeds.RDDReadTimeout = 1 * time.Second } - if cfg.Feeds.RddPollInterval == 0 { - cfg.Feeds.RddPollInterval = 10 * time.Second + if cfg.Feeds.RDDPollInterval == 0 { + cfg.Feeds.RDDPollInterval = 10 * time.Second } } diff --git a/pkg/monitoring/config/feeds.go b/pkg/monitoring/config/feeds.go index 024961d30..267e54ff8 100644 --- a/pkg/monitoring/config/feeds.go +++ b/pkg/monitoring/config/feeds.go @@ -1,74 +1,65 @@ package config import ( - "context" "encoding/json" "fmt" - "net/http" "os" "github.com/gagliardetto/solana-go" ) func populateFeeds(cfg *Config) error { - feeds := []jsonFeedConfig{} - if cfg.Feeds.URL != "" { - rddCtx, cancel := context.WithTimeout(context.Background(), cfg.Feeds.RddReadTimeout) - defer cancel() - readFeedsReq, err := http.NewRequestWithContext(rddCtx, http.MethodGet, cfg.Feeds.URL, nil) - if err != nil { - return fmt.Errorf("unable to build a request to the RDD URL '%s': %w", cfg.Feeds.URL, err) - } - httpClient := &http.Client{} - res, err := httpClient.Do(readFeedsReq) - if err != nil { - return fmt.Errorf("unable to fetch RDD data from URL '%s': %w", cfg.Feeds.URL, err) - } - defer res.Body.Close() - decoder := json.NewDecoder(res.Body) - if err := decoder.Decode(&feeds); err != nil { - return fmt.Errorf("unable to unmarshal feeds config from RDD URL '%s': %w", cfg.Feeds.URL, err) - } - } else if cfg.Feeds.FilePath != "" { - contents, err := os.ReadFile(cfg.Feeds.FilePath) - if err != nil { - return fmt.Errorf("unable to read feeds file '%s': %w", cfg.Feeds.FilePath, err) - } - if err = json.Unmarshal(contents, &feeds); err != nil { - return fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.Feeds.FilePath, err) - } + if cfg.Feeds.FilePath == "" { + return nil + } + contents, err := os.ReadFile(cfg.Feeds.FilePath) + if err != nil { + return fmt.Errorf("unable to read feeds file '%s': %w", cfg.Feeds.FilePath, err) + } + rawFeeds := []RawFeedConfig{} + if err = json.Unmarshal(contents, &rawFeeds); err != nil { + return fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.Feeds.FilePath, err) + } + feeds, err := NewFeeds(rawFeeds) + if err != nil { + return err } + cfg.Feeds.Feeds = feeds + return nil +} - cfg.Feeds.Feeds = make([]Feed, len(feeds)) - for i, feed := range feeds { - contractAddress, err := solana.PublicKeyFromBase58(feed.ContractAddressBase58) +func NewFeeds(rawFeeds []RawFeedConfig) ([]Feed, error) { + feeds := make([]Feed, len(rawFeeds)) + for i, rawFeed := range rawFeeds { + contractAddress, err := solana.PublicKeyFromBase58(rawFeed.ContractAddressBase58) if err != nil { - return fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", feed.ContractAddressBase58, i, err) + return nil, fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", rawFeed.ContractAddressBase58, i, err) } - transmissionsAccount, err := solana.PublicKeyFromBase58(feed.TransmissionsAccountBase58) + transmissionsAccount, err := solana.PublicKeyFromBase58(rawFeed.TransmissionsAccountBase58) if err != nil { - return fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", feed.TransmissionsAccountBase58, i, err) + return nil, fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", rawFeed.TransmissionsAccountBase58, i, err) } - stateAccount, err := solana.PublicKeyFromBase58(feed.StateAccountBase58) + stateAccount, err := solana.PublicKeyFromBase58(rawFeed.StateAccountBase58) if err != nil { - return fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", feed.StateAccountBase58, i, err) + return nil, fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", rawFeed.StateAccountBase58, i, err) } - cfg.Feeds.Feeds[i] = Feed{ - feed.FeedName, - feed.FeedPath, - feed.Symbol, - feed.Heartbeat, - feed.ContractType, - feed.ContractStatus, + feeds[i] = Feed{ + rawFeed.FeedName, + rawFeed.FeedPath, + rawFeed.Symbol, + rawFeed.Heartbeat, + rawFeed.ContractType, + rawFeed.ContractStatus, contractAddress, transmissionsAccount, stateAccount, } } - return nil + return feeds, nil } -type jsonFeedConfig struct { +// RawFeedConfig should only be used for deserializing responses from the RDD. +type RawFeedConfig struct { FeedName string `json:"name,omitempty"` FeedPath string `json:"path,omitempty"` Symbol string `json:"symbol,omitempty"` diff --git a/pkg/monitoring/config/types.go b/pkg/monitoring/config/types.go index 5ed925166..dbbb99f14 100644 --- a/pkg/monitoring/config/types.go +++ b/pkg/monitoring/config/types.go @@ -47,11 +47,12 @@ type SchemaRegistry struct { } type Feeds struct { - URL string + // If URL is set, the RDD tracker will start and override any feed configs extracted from FilePath! FilePath string + URL string + RDDReadTimeout time.Duration + RDDPollInterval time.Duration Feeds []Feed - RddReadTimeout time.Duration - RddPollInterval time.Duration } type Feed struct { @@ -74,5 +75,8 @@ type Http struct { } type Feature struct { - TestMode bool + // If set, the monitor will not read from a chain instead from a source of random state snapshots. + TestOnlyFakeReaders bool + // If set, the monitor will not read from the RDD, instead it will get data from a local source of random feeds configurations. + TestOnlyFakeRdd bool } diff --git a/pkg/monitoring/feed_monitor_test.go b/pkg/monitoring/feed_monitor_test.go index bdcc6161d..4046a24e0 100644 --- a/pkg/monitoring/feed_monitor_test.go +++ b/pkg/monitoring/feed_monitor_test.go @@ -25,15 +25,21 @@ func TestFeedMonitor(t *testing.T) { readTimeout := 1 * time.Second var bufferCapacity uint32 = 0 // no buffering - transmissionPoller := NewPoller( + transmissionPoller := NewSourcePoller( + NewSolanaSource( + transmissionAccount, + transmissionReader, + ), logger.NewNullLogger(), - transmissionAccount, transmissionReader, pollInterval, readTimeout, bufferCapacity, ) - statePoller := NewPoller( + statePoller := NewSourcePoller( + NewSolanaSource( + stateAccount, + stateReader, + ), logger.NewNullLogger(), - stateAccount, stateReader, pollInterval, readTimeout, bufferCapacity, ) diff --git a/pkg/monitoring/manager.go b/pkg/monitoring/manager.go new file mode 100644 index 000000000..5de5f48e1 --- /dev/null +++ b/pkg/monitoring/manager.go @@ -0,0 +1,93 @@ +package monitoring + +import ( + "context" + "sync" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/stretchr/testify/assert" +) + +// Manager restarts the managed function with a new list of updates whenever something changed. +type Manager interface { + Start(backgroundCtx context.Context, backgroundWg *sync.WaitGroup, managed ManagedFunc) +} + +type ManagedFunc func(localCtx context.Context, localWg *sync.WaitGroup, feeds []config.Feed) + +func NewManager( + log logger.Logger, + rddPoller Poller, +) Manager { + return &managerImpl{ + log, + rddPoller, + []config.Feed{}, + sync.Mutex{}, + } +} + +type managerImpl struct { + log logger.Logger + rddPoller Poller + + currentFeeds []config.Feed + currentFeedsMu sync.Mutex +} + +func (m *managerImpl) Start(backgroundCtx context.Context, backgroundWg *sync.WaitGroup, managed ManagedFunc) { + var localCtx context.Context + var localCtxCancel context.CancelFunc + var localWg *sync.WaitGroup + for { + select { + case rawUpdatedFeeds := <-m.rddPoller.Updates(): + updatedFeeds, ok := rawUpdatedFeeds.([]config.Feed) + if !ok { + m.log.Errorf("unexpected type (%T) for rdd updates", updatedFeeds) + continue + } + shouldRestartMonitor := false + func() { + m.currentFeedsMu.Lock() + defer m.currentFeedsMu.Unlock() + shouldRestartMonitor = isDifferentFeeds(m.currentFeeds, updatedFeeds) + if shouldRestartMonitor { + m.currentFeeds = updatedFeeds + } + }() + if !shouldRestartMonitor { + continue + } + m.log.Infow("change in feeds configuration detected", "feeds", updatedFeeds) + // Terminate previous managed function if not the first run. + if localCtxCancel != nil && localWg != nil { + localCtxCancel() + localWg.Wait() + } + // Start new managed function + localCtx, localCtxCancel = context.WithCancel(backgroundCtx) + localWg = new(sync.WaitGroup) + localWg.Add(1) + go func() { + defer localWg.Done() + managed(localCtx, localWg, updatedFeeds) + }() + case <-backgroundCtx.Done(): + if localCtxCancel != nil { + localCtxCancel() + } + if localWg != nil { + localWg.Wait() + } + m.log.Info("manager closed") + return + } + } +} + +// isDifferentFeeds checks whether there is a difference between the current list of feeds and the new feeds - Manager +func isDifferentFeeds(current, updated []config.Feed) bool { + return !assert.ObjectsAreEqual(current, updated) +} diff --git a/pkg/monitoring/manager_benchmark_test.go b/pkg/monitoring/manager_benchmark_test.go new file mode 100644 index 000000000..32aafdb71 --- /dev/null +++ b/pkg/monitoring/manager_benchmark_test.go @@ -0,0 +1,101 @@ +package monitoring + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" +) + +// This benchmark measures how many messages end up in the kafka client given +// that the chain readers respond immediately with random data and the rdd poller +// will generate a new set of 5 random feeds every second. + +//goos: darwin +//goarch: amd64 +//pkg: github.com/smartcontractkit/chainlink-solana/pkg/monitoring +//cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +// (10jan2022) +// 5719 184623 ns/op 91745 B/op 1482 allocs/op +func BenchmarkManager(b *testing.B) { + wg := &sync.WaitGroup{} + defer wg.Wait() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + log := logger.NewNullLogger() + + cfg := config.Config{} + cfg.Solana.PollInterval = 0 // poll as quickly as possible. + cfg.Feeds.URL = "http://some-fake-url-just-to-trigger-rdd-polling.com" + cfg.Feeds.RDDPollInterval = 1 * time.Second + cfg.Feeds.RDDReadTimeout = 1 * time.Second + + transmissionSchema := fakeSchema{transmissionCodec} + configSetSchema := fakeSchema{configSetCodec} + configSetSimplifiedSchema := fakeSchema{configSetCodec} + + producer := fakeProducer{make(chan producerMessage), ctx} + + transmissionReader := NewRandomDataReader(ctx, wg, "transmission", log) + stateReader := NewRandomDataReader(ctx, wg, "state", log) + + monitor := NewMultiFeedMonitor( + cfg.Solana, + + log, + transmissionReader, stateReader, + producer, + &devnullMetrics{}, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, + ) + + source := NewFakeRDDSource(5, 6) // Always produce 5 random feeds. + rddPoller := NewSourcePoller( + source, + log, + cfg.Feeds.RDDPollInterval, + cfg.Feeds.RDDReadTimeout, + 0, // no buffering! + ) + + manager := NewManager( + log, + rddPoller, + ) + + wg.Add(1) + go func() { + defer wg.Done() + rddPoller.Start(ctx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + manager.Start(ctx, wg, monitor.Start) + }() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + select { + case <-producer.sendCh: + // Drain the producer channel. + case <-ctx.Done(): + continue + } + } +} diff --git a/pkg/monitoring/manager_test.go b/pkg/monitoring/manager_test.go new file mode 100644 index 000000000..bfbecc76a --- /dev/null +++ b/pkg/monitoring/manager_test.go @@ -0,0 +1,63 @@ +package monitoring + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/stretchr/testify/require" +) + +const numPollerUpdates = 10 +const numGoroutinesPerManaged = 10 + +func TestManager(t *testing.T) { + t.Run("all goroutines are stopped before the new ones begin", func(t *testing.T) { + // Poller fires 10 rounds of updates. + // The manager identifies these updates, terminates the current running managed function and starts a new one. + // The managed function in turn runs 10 noop goroutines and increments/decrements a goroutine counter. + + var goRoutineCounter int64 = 0 + wg := &sync.WaitGroup{} + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + poller := &fakePoller{ + numPollerUpdates, + make(chan interface{}), + } + wg.Add(1) + go func() { + defer wg.Done() + poller.Start(ctx) + }() + + manager := NewManager( + logger.NewNullLogger(), + poller, + ) + managed := func(ctx context.Context, localWg *sync.WaitGroup, _ []config.Feed) { + localWg.Add(numGoroutinesPerManaged) + for i := 0; i < numGoroutinesPerManaged; i++ { + go func(i int, ctx context.Context) { + defer localWg.Done() + atomic.AddInt64(&goRoutineCounter, 1) + <-ctx.Done() + atomic.AddInt64(&goRoutineCounter, -1) + }(i, ctx) + } + } + wg.Add(1) + go func() { + defer wg.Done() + manager.Start(ctx, wg, managed) + }() + + wg.Wait() + require.Equal(t, int64(0), goRoutineCounter, "all child goroutines are gone") + }) +} diff --git a/pkg/monitoring/multi_feed_monitor.go b/pkg/monitoring/multi_feed_monitor.go index 0f45df4ac..0eb89c1f9 100644 --- a/pkg/monitoring/multi_feed_monitor.go +++ b/pkg/monitoring/multi_feed_monitor.go @@ -9,12 +9,11 @@ import ( ) type MultiFeedMonitor interface { - Start(ctx context.Context, wg *sync.WaitGroup) + Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) } func NewMultiFeedMonitor( solanaConfig config.Solana, - feeds []config.Feed, log logger.Logger, transmissionReader, stateReader AccountReader, @@ -31,7 +30,6 @@ func NewMultiFeedMonitor( ) MultiFeedMonitor { return &multiFeedMonitor{ solanaConfig, - feeds, log, transmissionReader, stateReader, @@ -50,7 +48,6 @@ func NewMultiFeedMonitor( type multiFeedMonitor struct { solanaConfig config.Solana - feeds []config.Feed log logger.Logger transmissionReader AccountReader @@ -70,9 +67,9 @@ type multiFeedMonitor struct { const bufferCapacity = 100 // Start should be executed as a goroutine. -func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(len(m.feeds)) - for _, feedConfig := range m.feeds { +func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) { + wg.Add(len(feeds)) + for _, feedConfig := range feeds { go func(feedConfig config.Feed) { defer wg.Done() @@ -81,18 +78,22 @@ func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { "network", m.solanaConfig.NetworkName, ) - transmissionPoller := NewPoller( + transmissionPoller := NewSourcePoller( + NewSolanaSource( + feedConfig.TransmissionsAccount, + m.transmissionReader, + ), feedLogger.With("component", "transmissions-poller", "address", feedConfig.TransmissionsAccount.String()), - feedConfig.TransmissionsAccount, - m.transmissionReader, m.solanaConfig.PollInterval, m.solanaConfig.ReadTimeout, bufferCapacity, ) - statePoller := NewPoller( + statePoller := NewSourcePoller( + NewSolanaSource( + feedConfig.StateAccount, + m.stateReader, + ), feedLogger.With("component", "state-poller", "address", feedConfig.StateAccount.String()), - feedConfig.StateAccount, - m.stateReader, m.solanaConfig.PollInterval, m.solanaConfig.ReadTimeout, bufferCapacity, diff --git a/pkg/monitoring/benchmark_test.go b/pkg/monitoring/multi_feed_monitor_benchmark_test.go similarity index 97% rename from pkg/monitoring/benchmark_test.go rename to pkg/monitoring/multi_feed_monitor_benchmark_test.go index 799dcef6e..481c65157 100644 --- a/pkg/monitoring/benchmark_test.go +++ b/pkg/monitoring/multi_feed_monitor_benchmark_test.go @@ -45,7 +45,6 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) { monitor := NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -60,7 +59,7 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, cfg.Feeds.Feeds) state, err := generateStateEnvelope() if err != nil { @@ -112,7 +111,6 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) { monitor := NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -127,7 +125,7 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, cfg.Feeds.Feeds) transmission := generateTransmissionEnvelope() diff --git a/pkg/monitoring/multi_feed_monitor_test.go b/pkg/monitoring/multi_feed_monitor_test.go index 96ad9633c..ea88a117c 100644 --- a/pkg/monitoring/multi_feed_monitor_test.go +++ b/pkg/monitoring/multi_feed_monitor_test.go @@ -36,7 +36,6 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) { monitor := NewMultiFeedMonitor( cfg.Solana, - feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -51,7 +50,7 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, feeds) trCount, stCount := 0, 0 messages := []producerMessage{} @@ -104,7 +103,6 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) { monitor := NewMultiFeedMonitor( cfg.Solana, - feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -119,7 +117,7 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, feeds) trCount, stCount := 0, 0 messages := []producerMessage{} diff --git a/pkg/monitoring/poller.go b/pkg/monitoring/poller.go index a700e9537..1a9c7962b 100644 --- a/pkg/monitoring/poller.go +++ b/pkg/monitoring/poller.go @@ -4,53 +4,59 @@ import ( "context" "time" - "github.com/gagliardetto/solana-go" "github.com/smartcontractkit/chainlink/core/logger" ) type Poller interface { Start(context.Context) + // You should never close the channel returned by Updates()! + // You should always read from the channel returned by Updates() in a select statement with the same context you passed to Start() Updates() <-chan interface{} } -func NewPoller( +type Source interface { + Name() string + Fetch(context.Context) (interface{}, error) +} + +func NewSourcePoller( + source Source, log logger.Logger, - account solana.PublicKey, - reader AccountReader, pollInterval time.Duration, - readTimeout time.Duration, + fetchTimeout time.Duration, bufferCapacity uint32, ) Poller { - return &solanaPollerImpl{ - log, - account, - reader, - pollInterval, - readTimeout, - bufferCapacity, + return &sourcePoller{ + log.With("source", source.Name()), + source, make(chan interface{}, bufferCapacity), + pollInterval, + fetchTimeout, } } -type solanaPollerImpl struct { - log logger.Logger - account solana.PublicKey - reader AccountReader - pollInterval time.Duration - readTimeout time.Duration - bufferCapacity uint32 - updates chan interface{} +type sourcePoller struct { + log logger.Logger + source Source + updates chan interface{} + + pollInterval time.Duration + fetchTimeout time.Duration } // Start should be executed as a goroutine -func (s *solanaPollerImpl) Start(ctx context.Context) { +func (s *sourcePoller) Start(ctx context.Context) { s.log.Debug("poller started") - // Fetch initial data - data, err := s.reader.Read(ctx, s.account) + // Initial fetch. + data, err := s.source.Fetch(ctx) if err != nil { - s.log.Errorw("failed initial fetch of account contents", "error", err) + s.log.Errorw("failed initial fetch", "error", err) } else { - s.updates <- data + select { + case s.updates <- data: + case <-ctx.Done(): + return + } } reusedTimer := time.NewTimer(s.pollInterval) @@ -60,15 +66,19 @@ func (s *solanaPollerImpl) Start(ctx context.Context) { var data interface{} var err error func() { - ctx, cancel := context.WithTimeout(ctx, s.readTimeout) + ctx, cancel := context.WithTimeout(ctx, s.fetchTimeout) defer cancel() - data, err = s.reader.Read(ctx, s.account) + data, err = s.source.Fetch(ctx) }() if err != nil { - s.log.Errorw("failed to read account contents", "error", err) + s.log.Errorw("failed to fetch from source", "error", err) continue } - s.updates <- data + select { + case s.updates <- data: + case <-ctx.Done(): + return + } reusedTimer.Reset(s.pollInterval) case <-ctx.Done(): if !reusedTimer.Stop() { @@ -80,6 +90,6 @@ func (s *solanaPollerImpl) Start(ctx context.Context) { } } -func (s *solanaPollerImpl) Updates() <-chan interface{} { +func (s *sourcePoller) Updates() <-chan interface{} { return s.updates } diff --git a/pkg/monitoring/poller_source_rdd.go b/pkg/monitoring/poller_source_rdd.go new file mode 100644 index 000000000..3dce6dd00 --- /dev/null +++ b/pkg/monitoring/poller_source_rdd.go @@ -0,0 +1,47 @@ +package monitoring + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" +) + +// rddSource produces a list of feeds to monitor. +type rddSource struct { + rddURL string + httpClient *http.Client +} + +func NewRDDSource( + rddURL string, +) Source { + return &rddSource{ + rddURL, + &http.Client{}, + } +} + +func (r *rddSource) Name() string { + return "rdd" +} + +func (r *rddSource) Fetch(ctx context.Context) (interface{}, error) { + readFeedsReq, err := http.NewRequestWithContext(ctx, http.MethodGet, r.rddURL, nil) + if err != nil { + return nil, fmt.Errorf("unable to build a request to the RDD: %w", err) + } + res, err := r.httpClient.Do(readFeedsReq) + if err != nil { + return nil, fmt.Errorf("unable to fetch RDD data: %w", err) + } + defer res.Body.Close() + rawFeeds := []config.RawFeedConfig{} + decoder := json.NewDecoder(res.Body) + if err := decoder.Decode(&rawFeeds); err != nil { + return nil, fmt.Errorf("unable to unmarshal feeds config data: %w", err) + } + return config.NewFeeds(rawFeeds) +} diff --git a/pkg/monitoring/poller_source_solana.go b/pkg/monitoring/poller_source_solana.go new file mode 100644 index 000000000..28e3f122b --- /dev/null +++ b/pkg/monitoring/poller_source_solana.go @@ -0,0 +1,24 @@ +package monitoring + +import ( + "context" + + "github.com/gagliardetto/solana-go" +) + +type solanaSource struct { + account solana.PublicKey + reader AccountReader +} + +func NewSolanaSource(account solana.PublicKey, reader AccountReader) Source { + return &solanaSource{account, reader} +} + +func (s *solanaSource) Name() string { + return "solana" +} + +func (s *solanaSource) Fetch(ctx context.Context) (interface{}, error) { + return s.reader.Read(ctx, s.account) +} diff --git a/pkg/monitoring/poller_test.go b/pkg/monitoring/poller_test.go index 43aa6b736..706f5600f 100644 --- a/pkg/monitoring/poller_test.go +++ b/pkg/monitoring/poller_test.go @@ -61,9 +61,9 @@ func TestPoller(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testCase.duration) defer cancel() reader := fakeReaderWithWait{testCase.waitOnRead} - poller := NewPoller( + poller := NewSourcePoller( + NewSolanaSource(account, reader), logger.NewNullLogger(), - account, reader, testCase.pollInterval, testCase.readTimeout, testCase.bufferCapacity) diff --git a/pkg/monitoring/testutils.go b/pkg/monitoring/testutils.go index 072a8cef0..3278ac766 100644 --- a/pkg/monitoring/testutils.go +++ b/pkg/monitoring/testutils.go @@ -383,6 +383,48 @@ func (k *keepLatestMetrics) SetOffchainAggregatorSubmissionReceivedValues(value k.latestTransmitter = sender } +func NewFakeRDDSource(minFeeds, maxFeeds uint8) Source { + return &fakeRddSource{minFeeds, maxFeeds} +} + +type fakeRddSource struct { + minFeeds, maxFeeds uint8 +} + +func (f *fakeRddSource) Name() string { + return "fake-rdd" +} + +func (f *fakeRddSource) Fetch(_ context.Context) (interface{}, error) { + numFeeds := int(f.minFeeds) + rand.Intn(int(f.maxFeeds-f.minFeeds)) + feeds := make([]config.Feed, numFeeds) + for i := 0; i < numFeeds; i++ { + feeds[i] = generateFeedConfig() + } + return feeds, nil +} + +type fakePoller struct { + numUpdates int + ch chan interface{} +} + +func (f *fakePoller) Start(ctx context.Context) { + source := &fakeRddSource{1, 2} + for i := 0; i < f.numUpdates; i++ { + updates, _ := source.Fetch(ctx) + select { + case f.ch <- updates: + case <-ctx.Done(): + return + } + } +} + +func (f *fakePoller) Updates() <-chan interface{} { + return f.ch +} + // This utilities are used primarely in tests but are present in the monitoring package because they are not inside a file ending in _test.go. // This is done in order to expose NewRandomDataReader for use in cmd/monitoring. // The following code is added to comply with the "unused" linter: @@ -392,4 +434,5 @@ var ( _ = fakeProducer{} _ = fakeSchema{} _ = keepLatestMetrics{} + _ = fakePoller{} )