Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logs are forwarded to a processor in slot and trx order #953

Merged
merged 8 commits into from
Dec 19, 2024
25 changes: 18 additions & 7 deletions pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error {
}

type eventDetail struct {
blockNumber uint64
slotNumber uint64
blockHeight uint64
blockHash solana.Hash
trxIdx int
trxSig solana.Signature
Expand All @@ -54,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error {
return j.parser.Process(j.event)
}

type wrappedParser interface {
ProgramEventProcessor
ExpectBlock(uint64)
ExpectTxs(uint64, int)
}

// getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads
// the job queue with getTransactionLogsJobs for each transaction found in the block.
type getTransactionsFromBlockJob struct {
slotNumber uint64
client RPCClient
parser ProgramEventProcessor
parser wrappedParser
chJobs chan Job
}

Expand Down Expand Up @@ -103,17 +110,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error {
}

detail := eventDetail{
blockHash: block.Blockhash,
slotNumber: j.slotNumber,
blockHash: block.Blockhash,
}

if block.BlockHeight != nil {
detail.blockNumber = *block.BlockHeight
detail.blockHeight = *block.BlockHeight
}

if len(block.Transactions) != len(blockSigsOnly.Signatures) {
return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures))
}

j.parser.ExpectTxs(j.slotNumber, len(block.Transactions))

for idx, trx := range block.Transactions {
detail.trxIdx = idx
if len(blockSigsOnly.Signatures)-1 <= idx {
Expand All @@ -130,14 +140,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev
var logIdx uint
for _, outputs := range parseProgramLogs(messages) {
for _, event := range outputs.Events {
logIdx++

event.BlockNumber = detail.blockNumber
event.SlotNumber = detail.slotNumber
event.BlockHeight = detail.blockHeight
event.BlockHash = detail.blockHash
event.TransactionHash = detail.trxSig
event.TransactionIndex = detail.trxIdx
event.TransactionLogIndex = logIdx

logIdx++

chJobs <- &processEventJob{
parser: parser,
event: event,
Expand Down
229 changes: 220 additions & 9 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package logpoller

import (
"container/list"
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -40,7 +44,8 @@ type EncodedLogCollector struct {

// dependencies and configuration
client RPCClient
parser ProgramEventProcessor
ordered *orderedParser
unordered *unorderedParser
lggr logger.Logger
rpcTimeLimit time.Duration

Expand All @@ -62,7 +67,7 @@ func NewEncodedLogCollector(
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
parser: parser,
unordered: newUnorderedParser(parser),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
Expand All @@ -74,8 +79,9 @@ func NewEncodedLogCollector(
Name: "EncodedLogCollector",
NewSubServices: func(lggr logger.Logger) []services.Service {
c.workers = NewWorkerGroup(DefaultWorkerCount, lggr)
c.ordered = newOrderedParser(parser, lggr)

return []services.Service{c.workers}
return []services.Service{c.workers, c.ordered}
},
Start: c.start,
Close: c.close,
Expand Down Expand Up @@ -127,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: sig.Slot,
client: c.client,
parser: c.parser,
parser: c.unordered,
chJobs: c.chJobs,
}); err != nil {
return err
Expand All @@ -138,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
return nil
}

func (c *EncodedLogCollector) start(ctx context.Context) error {
func (c *EncodedLogCollector) start(_ context.Context) error {
c.engine.Go(c.runSlotPolling)
c.engine.Go(c.runSlotProcessing)
c.engine.Go(c.runBlockProcessing)
Expand Down Expand Up @@ -201,10 +207,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
continue
}

from := c.highestSlot.Load() + 1
if c.highestSlot.Load() == 0 {
from = slot
}

c.highestSlot.Store(slot)

// load blocks in slot range
c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot)
c.loadRange(ctx, from, slot)
}
}
}
Expand All @@ -214,11 +225,11 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
select {
case <-ctx.Done():
return
case block := <-c.chBlock:
case slot := <-c.chBlock:
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: block,
slotNumber: slot,
client: c.client,
parser: c.parser,
parser: c.ordered,
chJobs: c.chJobs,
}); err != nil {
c.lggr.Errorf("failed to add job to queue: %s", err)
Expand Down Expand Up @@ -269,7 +280,21 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
return err
}

// as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// that the RPC changes and results get jumbled.
slices.SortFunc(result, func(a, b uint64) int {
if a < b {
return -1
} else if a > b {
return 1
}

return 0
})

for _, block := range result {
c.ordered.ExpectBlock(block)

select {
case <-ctx.Done():
return nil
Expand All @@ -279,3 +304,189 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en

return nil
}

type unorderedParser struct {
parser ProgramEventProcessor
}

func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser {
return &unorderedParser{parser: parser}
}

func (p *unorderedParser) ExpectBlock(_ uint64) {}
func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {}
func (p *unorderedParser) Process(evt ProgramEvent) error {
return p.parser.Process(evt)
}

type orderedParser struct {
// service state management
services.Service
engine *services.Engine

// internal state
parser ProgramEventProcessor
mu sync.Mutex
blocks *list.List
expect map[uint64]int
actual map[uint64][]ProgramEvent
}

func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser {
op := &orderedParser{
parser: parser,
blocks: list.New(),
expect: make(map[uint64]int),
actual: make(map[uint64][]ProgramEvent),
}

op.Service, op.engine = services.Config{
Name: "OrderedParser",
Start: op.start,
Close: op.close,
}.NewServiceEngine(lggr)

return op
}

// ExpectBlock should be called in block order to preserve block progression.
func (p *orderedParser) ExpectBlock(block uint64) {
p.mu.Lock()
defer p.mu.Unlock()

p.blocks.PushBack(block)
}
reductionista marked this conversation as resolved.
Show resolved Hide resolved

func (p *orderedParser) ExpectTxs(block uint64, quantity int) {
p.mu.Lock()
defer p.mu.Unlock()

p.expect[block] = quantity
p.actual[block] = make([]ProgramEvent, 0, quantity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if quantity is 0 - signal that bock is ready

}

func (p *orderedParser) Process(event ProgramEvent) error {
p.mu.Lock()
defer p.mu.Unlock()

if err := p.addToExpectations(event); err != nil {
// TODO: log error because this is an unrecoverable error
return nil
}

return p.sendReadySlots()
}

func (p *orderedParser) start(_ context.Context) error {
p.engine.GoTick(services.NewTicker(time.Second), p.run)

return nil
}

func (p *orderedParser) close() error {
return nil
}

func (p *orderedParser) addToExpectations(evt ProgramEvent) error {
_, ok := p.expect[evt.SlotNumber]
if !ok {
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

evts, ok := p.actual[evt.SlotNumber]
if !ok {
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

p.actual[evt.SlotNumber] = append(evts, evt)

return nil
}

func (p *orderedParser) expectations(block uint64) (int, bool, error) {
expectations, ok := p.expect[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

evts, ok := p.actual[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

return expectations, expectations == len(evts), nil
}

func (p *orderedParser) clearExpectations(block uint64) {
delete(p.expect, block)
delete(p.actual, block)
}

func (p *orderedParser) run(_ context.Context) {
p.mu.Lock()
defer p.mu.Unlock()

_ = p.sendReadySlots()
}

func (p *orderedParser) sendReadySlots() error {
// start at the lowest block and find ready blocks
for element := p.blocks.Front(); element != nil; element = element.Next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for element := p.blocks.Front(); element != nil; element = element.Next() {
for element := p.blocks.Front(); element != nil; element = p.blocks.Front() {

To drop

                temp := element.Prev()

		if temp == nil {
			break
		}

		element = temp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also due to break, there is a leak of expectations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh ok. I was just using the iteration method in the docs. This works better. Thanks

block := element.Value.(uint64)
// if no expectations are set, we are still waiting on information for the block.
// if expectations set and not met, we are still waiting on information for the block
// no other block data should be sent until this is resolved
exp, met, err := p.expectations(block)
if err != nil || !met {
break
}

// if expectations are 0 -> remove and continue
if exp == 0 {
p.clearExpectations(block)

temp := element.Prev()
p.blocks.Remove(element)
if temp == nil {
break
}

element = temp

continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... you'll probably want to delay the heap.Pop call I have above till here, so that if you break or return early it's not already removed. Instead of the initial heap.Pop you can "peak" at the minimum block number on the heap with block := (*p.blocks)[0]. Even though the rest may be out of order, this should always be guaranteed to be the min element.

I think the two places you'll need the heap.Pop are in place of the 2 instances of rmvIdx = append(rmvIndex, idx), does that sound right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the heap.Pop call is useful in this instance though. Pop will remove the first element, but all I want to do is iterate through them in order without alterations. Then, when I want to remove specific indexes, Pop will not work. I need to use heap.Remove to specify which index to remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but is there a reason why you can't just Pop each block as soon as you've verified that the expectations are met? I don't see any case where you go back and look at old blocks or look ahead at future blocks, so I'm not sure why we have to flag the block for removal and then go back through later and locate all the ones flagged for removal? If we're done with it, why not just discard it and move on?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Domino's comment. I do not see a reason to defer removal from blocks once we've verified that block met expectations.

}

evts, ok := p.actual[block]
if !ok {
return errInvalidState
}

var errs error
for _, evt := range evts {
errs = errors.Join(errs, p.parser.Process(evt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass to the processor the whole block and all related events instead of a single event.
This way, it knows that the whole block was processed instead of a single event. (On the startup we won't need to refetch data for the block of latest log).
Also, for finality detection work, it would be helpful to store even empty blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Domino brought this up as well in conversation and she will be doing the interface change as a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't even thought of the reasons @dhaidashenko gives, just was worried that writing 1 event at a time would put more strain on the db than writing batches at a time as they become available.

}

// need possible retry
if errs != nil {
return errs
}

temp := element.Prev()
p.blocks.Remove(element)

if temp == nil {
break
}

element = temp

p.clearExpectations(block)
}

return nil
}

var (
errExpectationsNotSet = errors.New("expectations not set")
errInvalidState = errors.New("invalid state")
)
Loading
Loading