Skip to content

Commit

Permalink
fix startup from 0 with no-downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Oct 19, 2024
1 parent 4767200 commit 7b2d520
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 10 deletions.
53 changes: 53 additions & 0 deletions polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,57 @@ type Bridge struct {
lastProcessedBlockInfo atomic.Pointer[ProcessedBlockInfo]
synchronizeMu sync.Mutex
unwindMu sync.Mutex
ready ready
}

type ready struct {
mu sync.Mutex
on chan struct{}
state bool
inited bool
}

func (me *ready) On() <-chan struct{} {
me.mu.Lock()
defer me.mu.Unlock()
me.init()
return me.on
}

func (me *ready) init() {
if me.inited {
return
}
me.on = make(chan struct{})
me.inited = true
}

func (me *ready) set() {
me.mu.Lock()
defer me.mu.Unlock()
me.init()
if me.state {
return
}
me.state = true
close(me.on)
}

func (b *Bridge) Ready(ctx context.Context) <-chan error {
errc := make(chan error)

go func() {
select {
case <-ctx.Done():
errc <- ctx.Err()
case <-b.ready.On():
errc <- nil
}

close(errc)
}()

return errc
}

func (b *Bridge) Run(ctx context.Context) error {
Expand Down Expand Up @@ -127,6 +178,8 @@ func (b *Bridge) Run(ctx context.Context) error {
"lastProcessedBlockTime", lastProcessedBlockInfo.BlockTime,
)

b.ready.set()

logTicker := time.NewTicker(30 * time.Second)
defer logTicker.Stop()

Expand Down
1 change: 1 addition & 0 deletions polygon/bridge/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type PolygonBridgeReader interface {
type Service interface {
PolygonBridge
Run(ctx context.Context) error
Ready(ctx context.Context) <-chan error
}

type ReaderService interface {
Expand Down
55 changes: 55 additions & 0 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Service interface {
SynchronizeCheckpoints(ctx context.Context) (latest *Checkpoint, err error)
SynchronizeMilestones(ctx context.Context) (latest *Milestone, err error)
SynchronizeSpans(ctx context.Context, blockNum uint64) error
Ready(ctx context.Context) <-chan error
}

type service struct {
Expand All @@ -58,6 +60,7 @@ type service struct {
milestoneScraper *scraper[*Milestone]
spanScraper *scraper[*Span]
spanBlockProducersTracker *spanBlockProducersTracker
ready ready
}

func AssembleService(config ServiceConfig) Service {
Expand Down Expand Up @@ -257,13 +260,65 @@ func (s *service) RegisterSpanObserver(callback func(*Span), opts ...ObserverOpt
})
}

type ready struct {
mu sync.Mutex
on chan struct{}
state bool
inited bool
}

func (me *ready) On() <-chan struct{} {
me.mu.Lock()
defer me.mu.Unlock()
me.init()
return me.on
}

func (me *ready) init() {
if me.inited {
return
}
me.on = make(chan struct{})
me.inited = true
}

func (me *ready) set() {
me.mu.Lock()
defer me.mu.Unlock()
me.init()
if me.state {
return
}
me.state = true
close(me.on)
}

func (s *service) Ready(ctx context.Context) <-chan error {
errc := make(chan error)

go func() {
select {
case <-ctx.Done():
errc <- ctx.Err()
case <-s.ready.On():
errc <- nil
}

close(errc)
}()

return errc
}

func (s *service) Run(ctx context.Context) error {
defer s.store.Close()

if err := s.store.Prepare(ctx); err != nil {
return nil
}

s.ready.set()

if err := s.replayUntrackedSpans(ctx); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions polygon/sync/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (s *executionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Cont
return err
}

if executionTip == nil {
return nil
}

executionTipNum := executionTip.Number.Uint64()
if executionTipNum <= initialBlockNum {
return nil
Expand Down
34 changes: 26 additions & 8 deletions polygon/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ type heimdallSynchronizer interface {
SynchronizeCheckpoints(ctx context.Context) (latest *heimdall.Checkpoint, err error)
SynchronizeMilestones(ctx context.Context) (latest *heimdall.Milestone, err error)
SynchronizeSpans(ctx context.Context, blockNum uint64) error
Ready(ctx context.Context) <-chan error
}

type bridgeSynchronizer interface {
LastProcessedBlock(ctx context.Context) (uint64, error)
Synchronize(ctx context.Context, blockNum uint64) error
Unwind(ctx context.Context, blockNum uint64) error
ProcessNewBlocks(ctx context.Context, blocks []*types.Block) error
Ready(ctx context.Context) <-chan error
}

type Sync struct {
Expand Down Expand Up @@ -602,6 +604,14 @@ func (s *Sync) maybePenalizePeerOnBadBlockEvent(ctx context.Context, event Event
func (s *Sync) Run(ctx context.Context) error {
s.logger.Info(syncLogPrefix("waiting for execution client"))

if err := <-s.bridgeSync.Ready(ctx); err != nil {
return err
}

if err := <-s.heimdallSync.Ready(ctx); err != nil {
return err
}

if err := s.execution.Prepare(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -718,16 +728,18 @@ func (s *Sync) syncToTip(ctx context.Context) (syncToTipResult, error) {
// in the events if published so its highly likely that the
// recovered events will be consistent

lastBridgeBlock, err := s.bridgeSync.LastProcessedBlock(ctx)
if err != nil {
return syncToTipResult{}, err
}

if lastBridgeBlock < latestTipOnStart.Number.Uint64() {
latestTipOnStart, err = s.execution.GetHeader(ctx, lastBridgeBlock)
if latestTipOnStart != nil {
lastBridgeBlock, err := s.bridgeSync.LastProcessedBlock(ctx)
if err != nil {
return syncToTipResult{}, err
}

if lastBridgeBlock < latestTipOnStart.Number.Uint64() {
latestTipOnStart, err = s.execution.GetHeader(ctx, lastBridgeBlock)
if err != nil {
return syncToTipResult{}, err
}
}
}

result, err := s.syncToTipUsingCheckpoints(ctx, latestTipOnStart)
Expand Down Expand Up @@ -787,8 +799,14 @@ type tipDownloaderFunc func(ctx context.Context, startBlockNum uint64) (syncToTi

func (s *Sync) sync(ctx context.Context, tip *types.Header, tipDownloader tipDownloaderFunc) (syncToTipResult, error) {
var latestWaypoint heimdall.Waypoint
var startBlockNum uint64 = 1

for {
newResult, err := tipDownloader(ctx, tip.Number.Uint64()+1)
if tip != nil {
startBlockNum = tip.Number.Uint64() + 1
}

newResult, err := tipDownloader(ctx, startBlockNum)
if err != nil {
return syncToTipResult{}, err
}
Expand Down
3 changes: 1 addition & 2 deletions turbo/snapshotsync/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sync/atomic"
"time"

"github.com/anacrolix/chansync/events"
"github.com/erigontech/erigon-lib/chain"
"github.com/erigontech/erigon-lib/chain/snapcfg"
common2 "github.com/erigontech/erigon-lib/common"
Expand Down Expand Up @@ -673,7 +672,7 @@ type ready struct {
inited bool
}

func (me *ready) On() events.Active {
func (me *ready) On() <-chan struct{} {
me.mu.Lock()
defer me.mu.Unlock()
me.init()
Expand Down

0 comments on commit 7b2d520

Please sign in to comment.