diff --git a/cmd/monitoring/main.go b/cmd/monitoring/main.go index 25d4a9c76..d3ca30e25 100644 --- a/cmd/monitoring/main.go +++ b/cmd/monitoring/main.go @@ -81,7 +81,6 @@ func main() { monitor := monitoring.NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, log, transmissionReader, stateReader, @@ -96,11 +95,31 @@ func main() { configSetSimplifiedSchema, transmissionSchema, ) - wg.Add(1) - go func() { - defer wg.Done() - monitor.Start(bgCtx, wg) - }() + + if cfg.Feeds.FilePath != "" { + wg.Add(1) + go func() { + defer wg.Done() + monitor.Start(bgCtx, wg, cfg.Feeds.Feeds) + }() + } else if cfg.Feeds.URL != "" { + rddPoller := monitoring.NewSourcePoller( + monitoring.NewRDDSource(cfg.Feeds.URL), + log.With("component", "rdd-poller"), + cfg.Feeds.RDDPollInterval, + cfg.Feeds.RDDReadTimeout, + 0, // no buffering! + ) + manager := monitoring.NewManager( + log.With("component", "manager"), + rddPoller, + ) + wg.Add(1) + go func() { + defer wg.Done() + manager.Start(bgCtx, wg, monitor.Start) + }() + } // the config package makes sure there is either a FilePath or a URL set! osSignalsCh := make(chan os.Signal, 1) signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM) diff --git a/pkg/monitoring/benchmark_test.go b/pkg/monitoring/benchmark_test.go index 799dcef6e..481c65157 100644 --- a/pkg/monitoring/benchmark_test.go +++ b/pkg/monitoring/benchmark_test.go @@ -45,7 +45,6 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) { monitor := NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -60,7 +59,7 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, cfg.Feeds.Feeds) state, err := generateStateEnvelope() if err != nil { @@ -112,7 +111,6 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) { monitor := NewMultiFeedMonitor( cfg.Solana, - cfg.Feeds.Feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -127,7 +125,7 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, cfg.Feeds.Feeds) transmission := generateTransmissionEnvelope() diff --git a/pkg/monitoring/manager.go b/pkg/monitoring/manager.go new file mode 100644 index 000000000..52b90b113 --- /dev/null +++ b/pkg/monitoring/manager.go @@ -0,0 +1,84 @@ +package monitoring + +import ( + "context" + "sync" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/stretchr/testify/assert" +) + +// Manager restarts the multi feed monitor with a new list of feeds whenever something changed. +type Manager interface { + Start(ctx context.Context, wg *sync.WaitGroup, callback ManagerCallback) +} + +type ManagerCallback func(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) + +func NewManager( + log logger.Logger, + rddPoller Poller, +) Manager { + return &managerImpl{ + log, + rddPoller, + []config.Feed{}, + sync.Mutex{}, + } +} + +type managerImpl struct { + log logger.Logger + rddPoller Poller + + currentFeeds []config.Feed + currentFeedsMu sync.Mutex +} + +func (m *managerImpl) Start(ctx context.Context, _ *sync.WaitGroup, callback ManagerCallback) { + localCtx, localCtxCancel := context.WithCancel(ctx) + defer localCtxCancel() + localWg := new(sync.WaitGroup) + for { + select { + case rawUpdatedFeeds := <-m.rddPoller.Updates(): + updatedFeeds, ok := rawUpdatedFeeds.([]config.Feed) + if !ok { + m.log.Errorf("unexpected type (%T) for rdd updates", updatedFeeds) + continue + } + shouldRestartMonitor := false + func() { + m.currentFeedsMu.Lock() + defer m.currentFeedsMu.Unlock() + shouldRestartMonitor = isDifferentFeeds(m.currentFeeds, updatedFeeds) + if shouldRestartMonitor { + m.currentFeeds = updatedFeeds + } + }() + if !shouldRestartMonitor { + continue + } + // Terminate previous callback. + localCtxCancel() + localWg.Wait() + + // Start new callback. + localCtx, localCtxCancel = context.WithCancel(ctx) + localWg = new(sync.WaitGroup) + localWg.Add(1) + go func() { + defer localWg.Done() + callback(localCtx, localWg, updatedFeeds) + }() + case <-ctx.Done(): + return + } + } +} + +// isDifferentFeeds checks whether there is a difference between the current list of feeds and the new feeds - Manager +func isDifferentFeeds(current, updated []config.Feed) bool { + return assert.ObjectsAreEqual(current, updated) +} diff --git a/pkg/monitoring/multi_feed_monitor.go b/pkg/monitoring/multi_feed_monitor.go index df341b7f8..0eb89c1f9 100644 --- a/pkg/monitoring/multi_feed_monitor.go +++ b/pkg/monitoring/multi_feed_monitor.go @@ -9,12 +9,11 @@ import ( ) type MultiFeedMonitor interface { - Start(ctx context.Context, wg *sync.WaitGroup) + Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) } func NewMultiFeedMonitor( solanaConfig config.Solana, - feeds []config.Feed, log logger.Logger, transmissionReader, stateReader AccountReader, @@ -31,7 +30,6 @@ func NewMultiFeedMonitor( ) MultiFeedMonitor { return &multiFeedMonitor{ solanaConfig, - feeds, log, transmissionReader, stateReader, @@ -50,7 +48,6 @@ func NewMultiFeedMonitor( type multiFeedMonitor struct { solanaConfig config.Solana - feeds []config.Feed log logger.Logger transmissionReader AccountReader @@ -70,9 +67,9 @@ type multiFeedMonitor struct { const bufferCapacity = 100 // Start should be executed as a goroutine. -func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(len(m.feeds)) - for _, feedConfig := range m.feeds { +func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) { + wg.Add(len(feeds)) + for _, feedConfig := range feeds { go func(feedConfig config.Feed) { defer wg.Done() diff --git a/pkg/monitoring/multi_feed_monitor_test.go b/pkg/monitoring/multi_feed_monitor_test.go index 96ad9633c..ea88a117c 100644 --- a/pkg/monitoring/multi_feed_monitor_test.go +++ b/pkg/monitoring/multi_feed_monitor_test.go @@ -36,7 +36,6 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) { monitor := NewMultiFeedMonitor( cfg.Solana, - feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -51,7 +50,7 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, feeds) trCount, stCount := 0, 0 messages := []producerMessage{} @@ -104,7 +103,6 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) { monitor := NewMultiFeedMonitor( cfg.Solana, - feeds, logger.NewNullLogger(), transmissionReader, stateReader, @@ -119,7 +117,7 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) { configSetSimplifiedSchema, transmissionSchema, ) - go monitor.Start(ctx, wg) + go monitor.Start(ctx, wg, feeds) trCount, stCount := 0, 0 messages := []producerMessage{}