Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Dec 19, 2024
1 parent 7cee215 commit 81185bd
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 43 deletions.
67 changes: 55 additions & 12 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,36 @@ package logpoller

import (
"context"
"encoding/base64"
"errors"
"fmt"
"iter"
"maps"
"sync"
"sync/atomic"

"github.com/gagliardetto/solana-go"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/utils"
)

type filters struct {
orm ORM
lggr logger.SugaredLogger

filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature]map[int64]Filter
filtersToBackfill map[int64]Filter
filtersToDelete map[int64]Filter
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
eventCodecs map[int64]types.RemoteCodec
filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature]map[int64]Filter
filtersToBackfill map[int64]Filter
filtersToDelete map[int64]Filter
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
eventCodecs map[int64]types.RemoteCodec
knownPrograms map[string]struct{} // fast lookup to see if a base58-encoded ProgramID matches any registered filters
knownDiscriminators map[string]struct{} // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator
}

func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
Expand Down Expand Up @@ -79,15 +85,13 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {
return fmt.Errorf("failed to load filters: %w", err)
}

// TODO: need to include all type dependencies, but saving entire contract IDL with each filter to db seems like
// a waste. Can we just extract the specific types we need?
idl := codec.IDL{Events: []codec.IdlEvent{filter.EventIDL}}

eventCodec, err := codec.NewIDLEventCodec(idl, lp.builder)
eventCodec, err := codec.NewIDLEventCodec(filter.EventIDL, config.BuilderForEncoding(config.EncodingTypeBorsh))
if err != nil {
return fmt.Errorf("invalid event IDL for filter %s: %w", filter.Name, err)
}

filter.EventSig = utils.Discriminator("event", filter.EventName)

fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()

Expand Down Expand Up @@ -122,6 +126,9 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {
fl.filtersToBackfill[filterID] = filter

fl.eventCodecs[filter.ID] = eventCodec
fl.knownPrograms[filter.Address.ToSolana().String()] = struct{}{}
discriminator := base64.StdEncoding.EncodeToString(filter.EventSig[:])
fl.knownDiscriminators[discriminator[:10]] = struct{}{}

return nil
}
Expand Down Expand Up @@ -203,6 +210,42 @@ func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature
}
}

func (fl *filters) EventCodec(ID int64) types.RemoteCodec {
return fl.eventCodecs[ID]
}

// MatchchingFiltersForEncodedEvent - similar to MatchingFilters but accepts a raw encoded event. Under normal operation,
// this will be called on every new event that happens on the blockchain, so it's important it returns immediately if it
// doesn't match any registered filters.
func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] {
if _, ok := fl.knownPrograms[event.Program]; !ok {
return nil
}

// The first 64-bits of the event data is the event sig. Because it's base64 encoded, this corresponds to
// the first 10 characters plus 4 bits of the 11th character. We can quickly rule it out as not matching any known
// discriminators if the first 10 characters don't match. If it passes that initial test, we base64-decode the
// first 11 characters, and use the first 8 bytes of that as the event sig to call MatchingFilters. The address
// also needs to be base58-decoded to pass to MatchingFilters
if _, ok := fl.knownDiscriminators[event.Data[:10]]; !ok {
return nil
}

addr, err := solana.PublicKeyFromBase58(event.Program)
if err != nil {
fl.lggr.Errorw("failed to parse Program ID for event", "EventProgram", event)
return nil
}
decoded, err := base64.StdEncoding.DecodeString(event.Data[:11])
if err != nil {
fl.lggr.Errorw("failed to decode event data", "EventProgram", event)
return nil
}
eventSig := EventSignature(decoded[:8])

return fl.MatchingFilters(PublicKey(addr), eventSig)
}

// ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller.
// Requires LoadFilters to be called at least once.
func (fl *filters) ConsumeFiltersToBackfill() map[int64]Filter {
Expand Down
95 changes: 65 additions & 30 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package logpoller

import (
"context"
"encoding/base64"
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/utils"
)

var (
Expand All @@ -26,9 +26,9 @@ var (
//go:generate mockery --name ORM --inpackage --structname mockORM --filename mock_orm.go
type ORM interface {
ChainID() string
InsertFilter(context.Context, filter Filter) (id int64, err error)
SelectFilters(context.Context) ([]Filter, error)
DeleteFilters(context.Context, filters map[int64]Filter) error
InsertFilter(ctx context.Context, filter Filter) (id int64, err error)
SelectFilters(ctx context.Context) ([]Filter, error)
DeleteFilters(ctx context.Context, filters map[int64]Filter) error
MarkFilterDeleted(ctx context.Context, id int64) (err error)
InsertLogs(context.Context, []Log) (err error)
}
Expand All @@ -47,10 +47,10 @@ type LogPoller struct {
client internal.Loader[client.Reader]
collector *EncodedLogCollector

filters *filters
filters *filters
discriminatorLookup map[string]string
events []ProgramEvent
codec commontypes.RemoteCodec
events []ProgramEvent
codec commontypes.RemoteCodec

chStop services.StopChan
wg sync.WaitGroup
Expand All @@ -71,39 +71,74 @@ func makeLogIndex(txIndex int, txLogIndex uint) int64 {
if txIndex < 0 || txIndex > math.MaxUint32 || txLogIndex > math.MaxUint32 {
panic(fmt.Sprintf("txIndex or txLogIndex out of range: txIndex=%d, txLogIndex=%d", txIndex, txLogIndex))
}
return int64(math.MaxUint32 * uint32(txIndex) + uint32(txLogIndex))
return int64(math.MaxUint32*uint32(txIndex) + uint32(txLogIndex))
}

func (lp *LogPoller) Process(event ProgramEvent) error {
// process stream of events coming from event loader
// Process - process stream of events coming from log ingester
func (lp *LogPoller) Process(programEvent ProgramEvent) (err error) {
ctx, cancel := utils.ContextFromChan(lp.chStop)
defer cancel()

filterName, ok := lp.discriminatorLookup[event.Data[:8]] // TODO: add base64-encoded descriminator to lookup in RegisterFilter
if !ok {
// Discriminator doesn't match the event type of any registered filters, ignore
return nil
}
lp.eventCodec.Decode(event, filters.filtersByName[filterName].EventName)
blockData := programEvent.BlockData

var logs []Log
for filter := range lp.filters.MatchingFiltersForEncodedEvent(programEvent) {
log := Log{
FilterID: filter.ID,
ChainID: lp.orm.ChainID(),
LogIndex: makeLogIndex(blockData.TransactionIndex, blockData.TransactionLogIndex),
BlockHash: Hash(blockData.BlockHash),
BlockNumber: int64(blockData.BlockHeight),
BlockTimestamp: blockData.BlockTime.Time(), // TODO: is this a timezone safe conversion?
Address: filter.Address,
EventSig: filter.EventSig,
TxHash: Signature(blockData.TransactionHash),
}

log.Data, err = base64.StdEncoding.DecodeString(programEvent.Data)
if err != nil {
return err
}

log := Log{
FilterID: filterID,
ChainID: lp.orm.ChainID(),
LogIndex: makeLogIndex(event.TransactionIndex, event.TransactionLogIndex),
BlockHash: Hash(event.BlockHash),
BlockNumber: int64(event.BlockHeight),
BlockTimestamp: ,
Address:,
EventSig:,
var event any
err = lp.filters.EventCodec(filter.ID).Decode(ctx, log.Data, &event, filter.EventName)
if err != nil {
return err
}

}
err = lp.ExtractSubkeys(reflect.TypeOf(event), filter.SubkeyPaths)
if err != nil {
return err
}

logs := []Log{log}
// TODO: fill in, and keep track of SequenceNumber for each filter. (Initialize from db on LoadFilters, then increment each time?)

lp.events = append(lp.events, event)
logs = append(logs, log)
}

lp.orm.InsertLogs(ctx, logs)
return nil
}

func (lp *LogPoller) ExtractSubkeys(t reflect.Type, paths SubkeyPaths) error {
s := reflect.TypeOf(event)
if s.Kind() != reflect.Struct {
return fmt.Errorf("event type must be struct, got %v. event=%v", t, event)
}

for _, path := range paths[0] {
field, err := s.FieldByName(path)
for depth := 0; depth < len(paths); depth++ {
for _, path := range paths[depth] {
field, err = field.Type.FieldByName(path)
}
}
}

}

func get

func (lp *LogPoller) Start(context.Context) error {
cl, err := lp.client.Get()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Filter struct {
EventName string
EventSig EventSignature
StartingBlock int64
EventIDL codec.IdlEvent
EventIDL codec.IDL
SubkeyPaths SubkeyPaths
Retention time.Duration
MaxLogsKept int64
Expand Down
Loading

0 comments on commit 81185bd

Please sign in to comment.