Skip to content

Commit

Permalink
Add ILogpoller interface and call NewLogPoller() from NewChain()
Browse files Browse the repository at this point in the history
Note: this will hopefully be renamed to LogPoller later, once we
find a better name for the struct. (logPoller, for consistency with evm?)
and the PR's this depends on have been merged, to avoid merge conflicts
  • Loading branch information
reductionista committed Dec 12, 2024
1 parent 484985c commit 9e75a78
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
15 changes: 13 additions & 2 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
)
Expand All @@ -37,6 +38,7 @@ type Chain interface {

ID() string
Config() config.Config
LogPoller() logpoller.ILogPoller
TxManager() TxManager
// Reader returns a new Reader from the available list of nodes (if there are multiple, it will randomly select one)
Reader() (client.Reader, error)
Expand Down Expand Up @@ -89,6 +91,7 @@ type chain struct {
services.StateMachine
id string
cfg *config.TOMLConfig
lp logpoller.ILogPoller
txm *txm.Txm
balanceMonitor services.Service
lggr logger.Logger
Expand Down Expand Up @@ -235,6 +238,7 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L
clientCache: map[string]*verifiedCachedClient{},
}

var lc internal.Loader[client.Reader] = utils.NewLazyLoad(func() (client.Reader, error) { return ch.getClient() })
var tc internal.Loader[client.ReaderWriter] = utils.NewLazyLoad(func() (client.ReaderWriter, error) { return ch.getClient() })
var bc internal.Loader[monitor.BalanceClient] = utils.NewLazyLoad(func() (monitor.BalanceClient, error) { return ch.getClient() })

Expand Down Expand Up @@ -303,10 +307,13 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L
return result.Signature(), result.Error()
}

tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() })
bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
// TODO: Can we just remove these? They nullify the lazy loaders initialized earlier, don't they?
//lc = internal.NewLoader[client.Reader](func() (client.Reader, error) { return ch.multiNode.SelectRPC() })
//tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() })
//bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
}

ch.lp = logpoller.NewLogPoller(lggr, logpoller.NewORM(ch.ID(), ds, lggr), lc)
ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
Expand Down Expand Up @@ -396,6 +403,10 @@ func (c *chain) Config() config.Config {
return c.cfg
}

func (c *chain) LogPoller() logpoller.ILogPoller {
return c.lp
}

func (c *chain) TxManager() TxManager {
return c.txm
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
)

var (
Expand All @@ -24,11 +25,18 @@ type ORM interface {
MarkFilterDeleted(ctx context.Context, id int64) (err error)
}

type ILogPoller interface {
Start(context.Context) error
Close() error
RegisterFilter(ctx context.Context, filter Filter) error
UnregisterFilter(ctx context.Context, name string) error
}

type LogPoller struct {
services.StateMachine
lggr logger.SugaredLogger
orm ORM
client client.Reader
client internal.Loader[client.Reader]
collector *EncodedLogCollector

filters *filters
Expand All @@ -38,24 +46,29 @@ type LogPoller struct {
wg sync.WaitGroup
}

func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl client.Reader) *LogPoller {
func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl internal.Loader[client.Reader]) ILogPoller {
lggr = logger.Sugared(logger.Named(lggr, "LogPoller"))
lp := LogPoller{
orm: orm,
client: cl,
lggr: lggr,
filters: newFilters(lggr, orm),
}
lp.collector = NewEncodedLogCollector(lp.client, lp, lp.lggr)
return &lp
}

func (lp LogPoller) Process(event ProgramEvent) error {
// process stream of events coming from event loader

return nil
}

func (lp *LogPoller) Start(context.Context) error {
cl, err := lp.client.Get()
if err != nil {
return err
}
lp.collector = NewEncodedLogCollector(cl, lp, lp.lggr)
return lp.StartOnce("LogPoller", func() error {
lp.wg.Add(2)
go lp.run()
Expand Down

0 comments on commit 9e75a78

Please sign in to comment.