Skip to content

Commit

Permalink
introduce the manager abstraction to handle updates in the feeds conf…
Browse files Browse the repository at this point in the history
…igurations
  • Loading branch information
topliceanu committed Jan 6, 2022
1 parent 3dfc8d6 commit 1f8b56b
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 21 deletions.
31 changes: 25 additions & 6 deletions cmd/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func main() {

monitor := monitoring.NewMultiFeedMonitor(
cfg.Solana,
cfg.Feeds.Feeds,

log,
transmissionReader, stateReader,
Expand All @@ -96,11 +95,31 @@ func main() {
configSetSimplifiedSchema,
transmissionSchema,
)
wg.Add(1)
go func() {
defer wg.Done()
monitor.Start(bgCtx, wg)
}()

if cfg.Feeds.FilePath != "" {
wg.Add(1)
go func() {
defer wg.Done()
monitor.Start(bgCtx, wg, cfg.Feeds.Feeds)
}()
} else if cfg.Feeds.URL != "" {
rddPoller := monitoring.NewSourcePoller(
monitoring.NewRDDSource(cfg.Feeds.URL),
log.With("component", "rdd-poller"),
cfg.Feeds.RDDPollInterval,
cfg.Feeds.RDDReadTimeout,
0, // no buffering!
)
manager := monitoring.NewManager(
log.With("component", "manager"),
rddPoller,
)
wg.Add(1)
go func() {
defer wg.Done()
manager.Start(bgCtx, wg, monitor.Start)
}()
} // the config package makes sure there is either a FilePath or a URL set!

osSignalsCh := make(chan os.Signal, 1)
signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM)
Expand Down
6 changes: 2 additions & 4 deletions pkg/monitoring/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) {

monitor := NewMultiFeedMonitor(
cfg.Solana,
cfg.Feeds.Feeds,

logger.NewNullLogger(),
transmissionReader, stateReader,
Expand All @@ -60,7 +59,7 @@ func BenchmarkMultichainMonitorStatePath(b *testing.B) {
configSetSimplifiedSchema,
transmissionSchema,
)
go monitor.Start(ctx, wg)
go monitor.Start(ctx, wg, cfg.Feeds.Feeds)

state, err := generateStateEnvelope()
if err != nil {
Expand Down Expand Up @@ -112,7 +111,6 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) {

monitor := NewMultiFeedMonitor(
cfg.Solana,
cfg.Feeds.Feeds,

logger.NewNullLogger(),
transmissionReader, stateReader,
Expand All @@ -127,7 +125,7 @@ func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) {
configSetSimplifiedSchema,
transmissionSchema,
)
go monitor.Start(ctx, wg)
go monitor.Start(ctx, wg, cfg.Feeds.Feeds)

transmission := generateTransmissionEnvelope()

Expand Down
84 changes: 84 additions & 0 deletions pkg/monitoring/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package monitoring

import (
"context"
"sync"

"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/stretchr/testify/assert"
)

// Manager restarts the multi feed monitor with a new list of feeds whenever something changed.
type Manager interface {
Start(ctx context.Context, wg *sync.WaitGroup, callback ManagerCallback)
}

type ManagerCallback func(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed)

func NewManager(
log logger.Logger,
rddPoller Poller,
) Manager {
return &managerImpl{
log,
rddPoller,
[]config.Feed{},
sync.Mutex{},
}
}

type managerImpl struct {
log logger.Logger
rddPoller Poller

currentFeeds []config.Feed
currentFeedsMu sync.Mutex
}

func (m *managerImpl) Start(ctx context.Context, _ *sync.WaitGroup, callback ManagerCallback) {
localCtx, localCtxCancel := context.WithCancel(ctx)
defer localCtxCancel()
localWg := new(sync.WaitGroup)
for {
select {
case rawUpdatedFeeds := <-m.rddPoller.Updates():
updatedFeeds, ok := rawUpdatedFeeds.([]config.Feed)
if !ok {
m.log.Errorf("unexpected type (%T) for rdd updates", updatedFeeds)
continue
}
shouldRestartMonitor := false
func() {
m.currentFeedsMu.Lock()
defer m.currentFeedsMu.Unlock()
shouldRestartMonitor = isDifferentFeeds(m.currentFeeds, updatedFeeds)
if shouldRestartMonitor {
m.currentFeeds = updatedFeeds
}
}()
if !shouldRestartMonitor {
continue
}
// Terminate previous callback.
localCtxCancel()
localWg.Wait()

// Start new callback.
localCtx, localCtxCancel = context.WithCancel(ctx)
localWg = new(sync.WaitGroup)
localWg.Add(1)
go func() {
defer localWg.Done()
callback(localCtx, localWg, updatedFeeds)
}()
case <-ctx.Done():
return
}
}
}

// isDifferentFeeds checks whether there is a difference between the current list of feeds and the new feeds - Manager
func isDifferentFeeds(current, updated []config.Feed) bool {
return assert.ObjectsAreEqual(current, updated)
}
11 changes: 4 additions & 7 deletions pkg/monitoring/multi_feed_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
)

type MultiFeedMonitor interface {
Start(ctx context.Context, wg *sync.WaitGroup)
Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed)
}

func NewMultiFeedMonitor(
solanaConfig config.Solana,
feeds []config.Feed,

log logger.Logger,
transmissionReader, stateReader AccountReader,
Expand All @@ -31,7 +30,6 @@ func NewMultiFeedMonitor(
) MultiFeedMonitor {
return &multiFeedMonitor{
solanaConfig,
feeds,

log,
transmissionReader, stateReader,
Expand All @@ -50,7 +48,6 @@ func NewMultiFeedMonitor(

type multiFeedMonitor struct {
solanaConfig config.Solana
feeds []config.Feed

log logger.Logger
transmissionReader AccountReader
Expand All @@ -70,9 +67,9 @@ type multiFeedMonitor struct {
const bufferCapacity = 100

// Start should be executed as a goroutine.
func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(len(m.feeds))
for _, feedConfig := range m.feeds {
func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup, feeds []config.Feed) {
wg.Add(len(feeds))
for _, feedConfig := range feeds {
go func(feedConfig config.Feed) {
defer wg.Done()

Expand Down
6 changes: 2 additions & 4 deletions pkg/monitoring/multi_feed_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) {

monitor := NewMultiFeedMonitor(
cfg.Solana,
feeds,

logger.NewNullLogger(),
transmissionReader, stateReader,
Expand All @@ -51,7 +50,7 @@ func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) {
configSetSimplifiedSchema,
transmissionSchema,
)
go monitor.Start(ctx, wg)
go monitor.Start(ctx, wg, feeds)

trCount, stCount := 0, 0
messages := []producerMessage{}
Expand Down Expand Up @@ -104,7 +103,6 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) {

monitor := NewMultiFeedMonitor(
cfg.Solana,
feeds,

logger.NewNullLogger(),
transmissionReader, stateReader,
Expand All @@ -119,7 +117,7 @@ func TestMultiFeedMonitorForPerformance(t *testing.T) {
configSetSimplifiedSchema,
transmissionSchema,
)
go monitor.Start(ctx, wg)
go monitor.Start(ctx, wg, feeds)

trCount, stCount := 0, 0
messages := []producerMessage{}
Expand Down

0 comments on commit 1f8b56b

Please sign in to comment.