From 63d462c5eaf9f891cd7107267afd08a0f0d3ecab Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Mon, 14 Aug 2023 09:52:01 -0400 Subject: [PATCH] Use Blocks from Block Sync in Block Manager (#1056) ## Overview Resolves: #1053, resolves: #1060, resolve #1061 ## Checklist - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [ ] Required CI checks are passing - [ ] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords --------- Co-authored-by: Ganesha Upadhyaya --- block/block_cache.go | 60 ++++++++ block/manager.go | 233 ++++++++++++++++++++++------- block/manager_test.go | 2 +- go.mod | 2 +- go.sum | 4 +- node/full.go | 14 +- node/full_client_test.go | 2 +- node/full_node_integration_test.go | 153 ++++++++++++------- node/helpers_test.go | 29 +++- node/test_helpers.go | 53 +++++-- types/signed_header.go | 13 +- 11 files changed, 435 insertions(+), 130 deletions(-) create mode 100644 block/block_cache.go diff --git a/block/block_cache.go b/block/block_cache.go new file mode 100644 index 00000000000..dcb9a999062 --- /dev/null +++ b/block/block_cache.go @@ -0,0 +1,60 @@ +package block + +import ( + "sync" + + "github.com/rollkit/rollkit/types" +) + +type BlockCache struct { + blocks map[uint64]*types.Block + hashes map[string]bool + hardConfirmations map[string]bool + mtx *sync.RWMutex +} + +func NewBlockCache() *BlockCache { + return &BlockCache{ + blocks: make(map[uint64]*types.Block), + hashes: make(map[string]bool), + hardConfirmations: make(map[string]bool), + mtx: new(sync.RWMutex), + } +} + +func (bc *BlockCache) getBlock(height uint64) (*types.Block, bool) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + block, ok := bc.blocks[height] + return block, ok +} + +func (bc *BlockCache) setBlock(height uint64, block *types.Block) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.blocks[height] = block +} + +func (bc *BlockCache) deleteBlock(height uint64) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + delete(bc.blocks, height) +} + +func (bc *BlockCache) isSeen(hash string) bool { + bc.mtx.Lock() + defer bc.mtx.Unlock() + return bc.hashes[hash] +} + +func (bc *BlockCache) setSeen(hash string) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.hashes[hash] = true +} + +func (bc *BlockCache) setHardConfirmed(hash string) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.hardConfirmations[hash] = true +} diff --git a/block/manager.go b/block/manager.go index c17121d1b26..3d2839c3d77 100644 --- a/block/manager.go +++ b/block/manager.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + goheaderstore "github.com/celestiaorg/go-header/store" abci "github.com/cometbft/cometbft/abci/types" cmcrypto "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/crypto/merkle" @@ -30,10 +31,16 @@ import ( // defaultDABlockTime is used only if DABlockTime is not configured for manager const defaultDABlockTime = 30 * time.Second +// defaultBlockTime is used only if BlockTime is not configured for manager +const defaultBlockTime = 1 * time.Second + // maxSubmitAttempts defines how many times Rollkit will re-try to publish block to DA layer. // This is temporary solution. It will be removed in future versions. const maxSubmitAttempts = 30 +// Applies to all channels, 100 is a large enough buffer to avoid blocking +const channelLength = 100 + // initialBackoff defines initial value for block submission backoff var initialBackoff = 100 * time.Millisecond @@ -61,10 +68,18 @@ type Manager struct { // daHeight is the height of the latest processed DA block daHeight uint64 - HeaderCh chan *types.SignedHeader - BlockCh chan *types.Block - blockInCh chan newBlockEvent - syncCache map[uint64]*types.Block + HeaderCh chan *types.SignedHeader + BlockCh chan *types.Block + + blockInCh chan newBlockEvent + blockStore *goheaderstore.Store[*types.Block] + + blockCache *BlockCache + + // blockStoreMtx is used by blockStoreCond + blockStoreMtx *sync.Mutex + // blockStoreCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve blocks from blockStore + blockStoreCond *sync.Cond // retrieveMtx is used by retrieveCond retrieveMtx *sync.Mutex @@ -77,6 +92,10 @@ type Manager struct { buildingBlock bool txsAvailable <-chan struct{} doneBuildingBlock chan struct{} + + // Maintains blocks that need to be published to DA layer + pendingBlocks []*types.Block + pendingBlocksMtx *sync.RWMutex } // getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. @@ -100,6 +119,7 @@ func NewManager( eventBus *cmtypes.EventBus, logger log.Logger, doneBuildingCh chan struct{}, + blockStore *goheaderstore.Store[*types.Block], ) (*Manager, error) { s, err := getInitialState(store, genesis) if err != nil { @@ -115,10 +135,15 @@ func NewManager( } if conf.DABlockTime == 0 { - logger.Info("WARNING: using default DA block time", "DABlockTime", defaultDABlockTime) + logger.Info("Using default DA block time", "DABlockTime", defaultDABlockTime) conf.DABlockTime = defaultDABlockTime } + if conf.BlockTime == 0 { + logger.Info("Using default block time", "BlockTime", defaultBlockTime) + conf.BlockTime = defaultBlockTime + } + exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger) if s.LastBlockHeight+1 == genesis.InitialHeight { res, err := exec.InitChain(genesis) @@ -150,18 +175,23 @@ func NewManager( retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) daHeight: s.DAHeight, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderCh: make(chan *types.SignedHeader, 100), - BlockCh: make(chan *types.Block, 100), - blockInCh: make(chan newBlockEvent, 100), + HeaderCh: make(chan *types.SignedHeader, channelLength), + BlockCh: make(chan *types.Block, channelLength), + blockInCh: make(chan newBlockEvent, channelLength), + blockStoreMtx: new(sync.Mutex), + blockStore: blockStore, retrieveMtx: new(sync.Mutex), lastStateMtx: new(sync.RWMutex), - syncCache: make(map[uint64]*types.Block), + blockCache: NewBlockCache(), logger: logger, txsAvailable: txsAvailableCh, doneBuildingBlock: doneBuildingCh, buildingBlock: false, + pendingBlocks: make([]*types.Block, 0), + pendingBlocksMtx: new(sync.RWMutex), } agg.retrieveCond = sync.NewCond(agg.retrieveMtx) + agg.blockStoreCond = sync.NewCond(agg.blockStoreMtx) return agg, nil } @@ -180,6 +210,11 @@ func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) { m.retriever = dalc.(da.BlockRetriever) } +// GetStoreHeight returns the manager's store height +func (m *Manager) GetStoreHeight() uint64 { + return m.store.Height() +} + // AggregationLoop is responsible for aggregating transactions into rollup-blocks. func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { initialHeight := uint64(m.genesis.InitialHeight) @@ -199,7 +234,6 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { time.Sleep(delay) } - //var timer *time.Timer timer := time.NewTimer(0) if !lazy { @@ -208,13 +242,13 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { case <-ctx.Done(): return case <-timer.C: - start := time.Now() - err := m.publishBlock(ctx) - if err != nil { - m.logger.Error("error while publishing block", "error", err) - } - timer.Reset(m.getRemainingSleep(start)) } + start := time.Now() + err := m.publishBlock(ctx) + if err != nil { + m.logger.Error("error while publishing block", "error", err) + } + timer.Reset(m.getRemainingSleep(start)) } } else { for { @@ -245,31 +279,60 @@ func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { } } +// BlockSubmissionLoop is responsible for submitting blocks to the DA layer. +func (m *Manager) BlockSubmissionLoop(ctx context.Context) { + timer := time.NewTicker(m.conf.DABlockTime) + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + } + err := m.submitBlocksToDA(ctx) + if err != nil { + m.logger.Error("error while submitting block to DA", "error", err) + } + } +} + // SyncLoop is responsible for syncing blocks. // -// SyncLoop processes headers gossiped in P2p network to know what's the latest block height, +// SyncLoop processes headers gossiped in P2P network to know what's the latest block height, // block data is retrieved from DA layer. func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) { daTicker := time.NewTicker(m.conf.DABlockTime) + blockTicker := time.NewTicker(m.conf.BlockTime) for { select { case <-daTicker.C: m.retrieveCond.Signal() + case <-blockTicker.C: + m.blockStoreCond.Signal() case blockEvent := <-m.blockInCh: block := blockEvent.block daHeight := blockEvent.daHeight + blockHash := block.Hash().String() + blockHeight := uint64(block.Height()) m.logger.Debug("block body retrieved from DALC", - "height", block.SignedHeader.Header.Height(), + "height", blockHeight, "daHeight", daHeight, - "hash", block.Hash(), + "hash", blockHash, ) - m.syncCache[block.SignedHeader.Header.BaseHeader.Height] = block + if m.blockCache.isSeen(blockHash) { + m.logger.Debug("block already seen", "height", blockHeight, "block hash", blockHash) + continue + } + m.blockCache.setBlock(blockHeight, block) + + m.blockStoreCond.Signal() m.retrieveCond.Signal() err := m.trySyncNextBlock(ctx, daHeight) if err != nil { m.logger.Info("failed to sync next block", "error", err) + continue } + m.blockCache.setSeen(blockHash) case <-ctx.Done(): return } @@ -285,7 +348,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { var commit *types.Commit currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory - b, ok := m.syncCache[currentHeight+1] + b, ok := m.blockCache.getBlock(currentHeight + 1) if !ok { return nil } @@ -296,7 +359,8 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { } if b != nil && commit != nil { - m.logger.Info("Syncing block", "height", b.SignedHeader.Header.Height()) + bHeight := uint64(b.Height()) + m.logger.Info("Syncing block", "height", bHeight) newState, responses, err := m.applyBlock(ctx, b) if err != nil { return fmt.Errorf("failed to ApplyBlock: %w", err) @@ -310,18 +374,18 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { return fmt.Errorf("failed to Commit: %w", err) } - err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses) + err = m.store.SaveBlockResponses(uint64(bHeight), responses) if err != nil { return fmt.Errorf("failed to save block responses: %w", err) } // SaveValidators commits the DB tx - err = m.saveValidatorsToStore(uint64(b.SignedHeader.Header.Height())) + err = m.saveValidatorsToStore(bHeight) if err != nil { return err } - m.store.SetHeight(uint64(b.SignedHeader.Header.Height())) + m.store.SetHeight(bHeight) if daHeight > newState.DAHeight { newState.DAHeight = daHeight @@ -330,12 +394,73 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { if err != nil { m.logger.Error("failed to save updated state", "error", err) } - delete(m.syncCache, currentHeight+1) + m.blockCache.deleteBlock(currentHeight + 1) } return nil } +// BlockStoreRetrieveLoop is responsible for retrieving blocks from the Block Store. +func (m *Manager) BlockStoreRetrieveLoop(ctx context.Context) { + // waitCh is used to signal the block store retrieve loop, that it should check block store for new blocks + // blockStoreCond can be signalled in completely async manner, and goroutine below + // works as some kind of "buffer" for those signals + waitCh := make(chan interface{}) + lastBlockStoreHeight := uint64(0) + go func() { + for { + // This infinite loop is expected to be stopped once the context is + // cancelled or throws an error and cleaned up by the GC. This is OK + // because it waits using a conditional which is only signaled periodically. + m.blockStoreMtx.Lock() + m.blockStoreCond.Wait() + waitCh <- nil + m.blockStoreMtx.Unlock() + if ctx.Err() != nil { + return + } + } + }() + for { + select { + case <-ctx.Done(): + return + case <-waitCh: + } + blockStoreHeight := m.blockStore.Height() + if blockStoreHeight > lastBlockStoreHeight { + blocks, err := m.getBlocksFromBlockStore(ctx, lastBlockStoreHeight+1, blockStoreHeight) + if err != nil { + m.logger.Error("failed to get blocks from Block Store", "lastBlockHeight", lastBlockStoreHeight, "blockStoreHeight", blockStoreHeight, "errors", err.Error()) + continue + } + daHeight := atomic.LoadUint64(&m.daHeight) + for _, block := range blocks { + m.blockInCh <- newBlockEvent{block, daHeight} + } + } + lastBlockStoreHeight = blockStoreHeight + } +} + +func (m *Manager) getBlocksFromBlockStore(ctx context.Context, startHeight, endHeight uint64) ([]*types.Block, error) { + if startHeight > endHeight { + return nil, fmt.Errorf("startHeight (%d) is greater than endHeight (%d)", startHeight, endHeight) + } + if startHeight == 0 { + startHeight++ + } + blocks := make([]*types.Block, endHeight-startHeight+1) + for i := startHeight; i <= endHeight; i++ { + block, err := m.blockStore.GetByHeight(ctx, i) + if err != nil { + return nil, err + } + blocks[i-startHeight] = block + } + return blocks, nil +} + // RetrieveLoop is responsible for interacting with DA layer. func (m *Manager) RetrieveLoop(ctx context.Context) { // waitCh is used to signal the retrieve loop, that it should process next blocks @@ -344,6 +469,9 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { waitCh := make(chan interface{}) go func() { for { + // This infinite loop is expected to be stopped once the context is + // cancelled or throws an error and cleaned up by the GC. This is OK + // because it waits using a conditional which is only signaled periodically. m.retrieveMtx.Lock() m.retrieveCond.Wait() waitCh <- nil @@ -356,25 +484,17 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { for { select { - case <-waitCh: - for { - select { - case <-ctx.Done(): - return - default: - } - daHeight := atomic.LoadUint64(&m.daHeight) - m.logger.Debug("retrieve", "daHeight", daHeight) - err := m.processNextDABlock(ctx) - if err != nil { - m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error()) - break - } - atomic.AddUint64(&m.daHeight, 1) - } case <-ctx.Done(): return + case <-waitCh: } + daHeight := atomic.LoadUint64(&m.daHeight) + err := m.processNextDABlock(ctx) + if err != nil { + m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error()) + continue + } + atomic.AddUint64(&m.daHeight, 1) } } @@ -394,6 +514,8 @@ func (m *Manager) processNextDABlock(ctx context.Context) error { } m.logger.Debug("retrieved potential blocks", "n", len(blockResp.Blocks), "daHeight", daHeight) for _, block := range blockResp.Blocks { + blockHash := block.Hash().String() + m.blockCache.setHardConfirmed(blockHash) m.blockInCh <- newBlockEvent{block, daHeight} } return nil @@ -526,6 +648,8 @@ func (m *Manager) publishBlock(ctx context.Context) error { if err != nil { return err } + blockHash := block.Hash().String() + m.blockCache.setSeen(blockHash) if commit == nil { commit, err = m.getCommit(block.SignedHeader.Header) @@ -540,11 +664,10 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } - err = m.submitBlockToDA(ctx, block) - if err != nil { - m.logger.Error("Failed to submit block to DA Layer") - return err - } + // Submit block to be published to the DA layer + m.pendingBlocksMtx.Lock() + m.pendingBlocks = append(m.pendingBlocks, block) + m.pendingBlocksMtx.Unlock() blockHeight := uint64(block.SignedHeader.Header.Height()) @@ -583,20 +706,20 @@ func (m *Manager) publishBlock(ctx context.Context) error { // Publish block to channel so that block exchange service can broadcast m.BlockCh <- block - m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", block.SignedHeader.Height()) + m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", blockHeight) return nil } -func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error { - m.logger.Info("submitting block to DA layer", "height", block.SignedHeader.Header.Height()) - +func (m *Manager) submitBlocksToDA(ctx context.Context) error { + m.pendingBlocksMtx.Lock() + defer m.pendingBlocksMtx.Unlock() submitted := false backoff := initialBackoff for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ { - res := m.dalc.SubmitBlocks(ctx, []*types.Block{block}) + res := m.dalc.SubmitBlocks(ctx, m.pendingBlocks) if res.Code == da.StatusSuccess { - m.logger.Info("successfully submitted Rollkit block to DA layer", "rollkitHeight", block.SignedHeader.Header.Height(), "daHeight", res.DAHeight) + m.logger.Info("successfully submitted Rollkit block to DA layer", "daHeight", res.DAHeight) submitted = true } else { m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt) @@ -606,9 +729,9 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error } if !submitted { - return fmt.Errorf("Failed to submit block to DA layer after %d attempts", maxSubmitAttempts) + return fmt.Errorf("failed to submit block to DA layer after %d attempts", maxSubmitAttempts) } - + m.pendingBlocks = make([]*types.Block, 0) return nil } diff --git a/block/manager_test.go b/block/manager_test.go index 9b271b8f0ae..50083ff6ca5 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -85,7 +85,7 @@ func TestInitialState(t *testing.T) { require.NoError(t, dalc.Stop()) }() dumbChan := make(chan struct{}) - agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan) + agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan, nil) assert.NoError(err) assert.NotNil(agg) agg.lastStateMtx.RLock() diff --git a/go.mod b/go.mod index 364846f5861..7de644639bf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( cosmossdk.io/math v1.0.1 - github.com/celestiaorg/go-header v0.2.12 + github.com/celestiaorg/go-header v0.2.13 github.com/celestiaorg/nmt v0.17.0 github.com/celestiaorg/rsmt2d v0.9.0 github.com/celestiaorg/utils v0.1.0 diff --git a/go.sum b/go.sum index 6d5aadb7b96..64f85d0caf6 100644 --- a/go.sum +++ b/go.sum @@ -191,8 +191,8 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n github.com/casbin/casbin/v2 v2.37.0/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg= github.com/celestiaorg/go-fraud v0.1.2 h1:Bf7yIN3lZ4IR/Vlu5OtmcVCVNESBKEJ/xwu28rRKGA8= github.com/celestiaorg/go-fraud v0.1.2/go.mod h1:kHZXQY+6gd1kYkoWRFFKgWyrLPWRgDN3vd1Ll9gE/oo= -github.com/celestiaorg/go-header v0.2.12 h1:3H9nir20+MTY1vXbLxOUOV05ZspotR6JOiZGKxACHCQ= -github.com/celestiaorg/go-header v0.2.12/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0= +github.com/celestiaorg/go-header v0.2.13 h1:sUJLXYs8ViPpxLXyIIaW3h4tPFgtVYMhzsLC4GHfS8I= +github.com/celestiaorg/go-header v0.2.13/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= diff --git a/node/full.go b/node/full.go index e097fd5414c..f3694916147 100644 --- a/node/full.go +++ b/node/full.go @@ -151,12 +151,6 @@ func newFullNode( mpIDs := newMempoolIDs() mp.EnableTxsAvailable() - doneBuildingChannel := make(chan struct{}) - blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel) - if err != nil { - return nil, fmt.Errorf("BlockManager initialization error: %w", err) - } - headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService")) if err != nil { return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) @@ -167,6 +161,12 @@ func newFullNode( return nil, fmt.Errorf("BlockExchangeService initialization error: %w", err) } + doneBuildingChannel := make(chan struct{}) + blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel, blockExchangeService.blockStore) + if err != nil { + return nil, fmt.Errorf("BlockManager initialization error: %w", err) + } + ctx, cancel := context.WithCancel(ctx) node := &FullNode{ @@ -283,10 +283,12 @@ func (n *FullNode) OnStart() error { if n.conf.Aggregator { n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime) go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator) + go n.blockManager.BlockSubmissionLoop(n.ctx) go n.headerPublishLoop(n.ctx) go n.blockPublishLoop(n.ctx) } go n.blockManager.RetrieveLoop(n.ctx) + go n.blockManager.BlockStoreRetrieveLoop(n.ctx) go n.blockManager.SyncLoop(n.ctx, n.cancel) return nil } diff --git a/node/full_client_test.go b/node/full_client_test.go index 6a0ede85158..4fcafeaaef5 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -970,7 +970,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(node2.Stop()) }() - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 3*time.Second) defer timeoutCancel() diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index 32558f9f964..3a35160c735 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -169,68 +169,120 @@ func TestLazyAggregator(t *testing.T) { }, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) assert.False(node.IsRunning()) assert.NoError(err) - err = node.Start() - assert.NoError(err) + + assert.NoError(node.Start()) defer func() { require.NoError(node.Stop()) }() assert.True(node.IsRunning()) - require.NoError(err) - - require.NoError(waitForFirstBlock(node.(*FullNode), false)) + require.NoError(waitForFirstBlock(node.(*FullNode), Header)) client := node.GetClient() _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 1}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 2, false)) + require.NoError(waitForAtLeastNBlocks(node, 2, Header)) _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 2}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 3, false)) + require.NoError(waitForAtLeastNBlocks(node, 3, Header)) _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 3}) assert.NoError(err) - require.NoError(waitForAtLeastNBlocks(node, 4, false)) + require.NoError(waitForAtLeastNBlocks(node, 4, Header)) +} + +// TestSingleAggregatorTwoFullNodesBlockSyncSpeed tests the scenario where the chain's block time is much faster than the DA's block time. In this case, the full nodes should be able to use block sync to sync blocks much faster than syncing from the DA layer, and the test should conclude within block time +func TestSingleAggregatorTwoFullNodesBlockSyncSpeed(t *testing.T) { + require := require.New(t) + aggCtx, aggCancel := context.WithCancel(context.Background()) + defer aggCancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + clientNodes := 3 + bmConfig := getBMConfig() + bmConfig.BlockTime = 1 * time.Second + bmConfig.DABlockTime = 10 * time.Second + const numberOfBlocksTSyncTill = 5 + + ch := make(chan struct{}) + defer close(ch) + timer := time.NewTimer(numberOfBlocksTSyncTill * bmConfig.BlockTime) + + go func() { + select { + case <-ch: + // Channel closed before timer expired. + return + case <-timer.C: + // Timer expired before channel closed. + t.Error("nodes did not sync before DA Block time") + return + } + }() + nodes, _ := createNodes(aggCtx, ctx, clientNodes, bmConfig, t) + + node1 := nodes[0] + node2 := nodes[1] + node3 := nodes[2] + require.NoError(node1.Start()) + defer func() { + require.NoError(node1.Stop()) + }() + require.NoError(waitForFirstBlock(node1, Store)) + require.NoError(node2.Start()) + defer func() { + require.NoError(node2.Stop()) + }() + require.NoError(node3.Start()) + defer func() { + require.NoError(node3.Stop()) + }() + + require.NoError(waitForAtLeastNBlocks(node2, numberOfBlocksTSyncTill, Store)) + require.NoError(waitForAtLeastNBlocks(node3, numberOfBlocksTSyncTill, Store)) + + require.NoError(verifyNodesSynced(node1, node2, Store)) + require.NoError(verifyNodesSynced(node1, node3, Store)) } func TestBlockExchange(t *testing.T) { t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNode(t, true) + testSingleAggregatorSingleFullNode(t, Block) }) t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorTwoFullNode(t, true) + testSingleAggregatorTwoFullNode(t, Block) }) t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNodeTrustedHash(t, true) + testSingleAggregatorSingleFullNodeTrustedHash(t, Block) }) } func TestHeaderExchange(t *testing.T) { t.Run("SingleAggregatorSingleFullNode", func(t *testing.T) { - testSingleAggregatorSingleFullNode(t, false) + testSingleAggregatorSingleFullNode(t, Header) }) t.Run("SingleAggregatorTwoFullNode", func(t *testing.T) { - testSingleAggregatorTwoFullNode(t, false) + testSingleAggregatorTwoFullNode(t, Header) }) t.Run("SingleAggregatorSingleFullNodeTrustedHash", func(t *testing.T) { - testSingleAggregatorSingleFullNodeTrustedHash(t, false) + testSingleAggregatorSingleFullNodeTrustedHash(t, Header) }) t.Run("SingleAggregatorSingleFullNodeSingleLightNode", testSingleAggregatorSingleFullNodeSingleLightNode) } -func testSingleAggregatorSingleFullNode(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorSingleFullNode(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 1 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 2 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -240,26 +292,26 @@ func testSingleAggregatorSingleFullNode(t *testing.T, useBlockExchange bool) { require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) require.NoError(node2.Start()) defer func() { require.NoError(node2.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node2, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node2, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) } -func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorTwoFullNode(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 2 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 3 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -269,7 +321,7 @@ func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { defer func() { require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) require.NoError(node2.Start()) defer func() { require.NoError(node2.Stop()) @@ -279,19 +331,21 @@ func testSingleAggregatorTwoFullNode(t *testing.T, useBlockExchange bool) { require.NoError(node3.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node2, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node2, 2, source)) + require.NoError(waitForAtLeastNBlocks(node3, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) + require.NoError(verifyNodesSynced(node1, node3, source)) } -func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchange bool) { +func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, source Source) { require := require.New(t) aggCtx, aggCancel := context.WithCancel(context.Background()) defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientNodes := 1 - nodes, _ := createNodes(aggCtx, ctx, clientNodes+1, t) + clientNodes := 2 + nodes, _ := createNodes(aggCtx, ctx, clientNodes, getBMConfig(), t) node1 := nodes[0] node2 := nodes[1] @@ -301,7 +355,7 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchang require.NoError(node1.Stop()) }() - require.NoError(waitForFirstBlock(node1, useBlockExchange)) + require.NoError(waitForFirstBlock(node1, source)) // Get the trusted hash from node1 and pass it to node2 config trustedHash, err := node1.hExService.headerStore.GetByHeight(aggCtx, 1) @@ -312,8 +366,8 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, useBlockExchang require.NoError(node2.Stop()) }() - require.NoError(waitForAtLeastNBlocks(node1, 2, useBlockExchange)) - require.NoError(verifyNodesSynced(node1, node2, useBlockExchange)) + require.NoError(waitForAtLeastNBlocks(node1, 2, source)) + require.NoError(verifyNodesSynced(node1, node2, source)) } func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { @@ -335,15 +389,16 @@ func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { defer func() { require.NoError(dalc.Stop()) }() - sequencer, _ := createNode(aggCtx, 0, true, false, keys, t) - fullNode, _ := createNode(ctx, 1, false, false, keys, t) + bmConfig := getBMConfig() + sequencer, _ := createNode(aggCtx, 0, true, false, keys, bmConfig, t) + fullNode, _ := createNode(ctx, 1, false, false, keys, bmConfig, t) sequencer.(*FullNode).dalc = dalc sequencer.(*FullNode).blockManager.SetDALC(dalc) fullNode.(*FullNode).dalc = dalc fullNode.(*FullNode).blockManager.SetDALC(dalc) - lightNode, _ := createNode(ctx, 2, false, true, keys, t) + lightNode, _ := createNode(ctx, 2, false, true, keys, bmConfig, t) require.NoError(sequencer.Start()) defer func() { @@ -358,8 +413,9 @@ func testSingleAggregatorSingleFullNodeSingleLightNode(t *testing.T) { require.NoError(lightNode.Stop()) }() - require.NoError(waitForAtLeastNBlocks(sequencer.(*FullNode), 2, false)) - require.NoError(verifyNodesSynced(fullNode, lightNode, false)) + require.NoError(waitForAtLeastNBlocks(sequencer.(*FullNode), 2, Header)) + require.NoError(verifyNodesSynced(sequencer, fullNode, Header)) + require.NoError(verifyNodesSynced(fullNode, lightNode, Header)) } // Creates a starts the given number of client nodes along with an aggregator node. Uses the given flag to decide whether to have the aggregator produce malicious blocks. @@ -368,7 +424,7 @@ func createAndStartNodes(clientNodes int, t *testing.T) ([]*FullNode, []*mocks.A defer aggCancel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, t) + nodes, apps := createNodes(aggCtx, ctx, clientNodes+1, getBMConfig(), t) startNodes(nodes, apps, t) defer func() { for _, n := range nodes { @@ -384,13 +440,15 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { // Wait for aggregator node to publish the first block for full nodes to initialize header exchange service require.NoError(t, nodes[0].Start()) - require.NoError(t, waitForFirstBlock(nodes[0], false)) + require.NoError(t, waitForFirstBlock(nodes[0], Header)) for i := 1; i < len(nodes); i++ { require.NoError(t, nodes[i].Start()) } // wait for nodes to start up and establish connections; 1 second ensures that test pass even on CI. - require.NoError(t, waitForAtLeastNBlocks(nodes[1], 2, false)) + for i := 1; i < len(nodes); i++ { + require.NoError(t, waitForAtLeastNBlocks(nodes[i], 2, Header)) + } for i := 1; i < len(nodes); i++ { data := strconv.Itoa(i) + time.Now().String() @@ -403,7 +461,7 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { defer close(doneChan) // create a MockTester, to catch the Failed asserts from the Mock package m := MockTester{t: t} - // We don't nedd to check any specific arguments to DeliverTx + // We don't need to check any specific arguments to DeliverTx // so just use a function that returns "true" for matching the args matcher := mock.MatchedBy(func(i interface{}) bool { return true }) err := testutils.Retry(300, 100*time.Millisecond, func() error { @@ -425,7 +483,7 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) { } // Creates the given number of nodes the given nodes using the given wait group to synchornize them -func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNode, []*mocks.Application) { +func createNodes(aggCtx, ctx context.Context, num int, bmConfig config.BlockManagerConfig, t *testing.T) ([]*FullNode, []*mocks.Application) { t.Helper() if aggCtx == nil { @@ -447,14 +505,14 @@ func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNod ds, _ := store.NewDefaultInMemoryKVStore() _ = dalc.Init([8]byte{}, nil, ds, test.NewFileLoggerCustom(t, test.TempLogFileName(t, "dalc"))) _ = dalc.Start() - node, app := createNode(aggCtx, 0, true, false, keys, t) + node, app := createNode(aggCtx, 0, true, false, keys, bmConfig, t) apps[0] = app nodes[0] = node.(*FullNode) // use same, common DALC, so nodes can share data nodes[0].dalc = dalc nodes[0].blockManager.SetDALC(dalc) for i := 1; i < num; i++ { - node, apps[i] = createNode(ctx, i, false, false, keys, t) + node, apps[i] = createNode(ctx, i, false, false, keys, bmConfig, t) nodes[i] = node.(*FullNode) nodes[i].dalc = dalc nodes[i].blockManager.SetDALC(dalc) @@ -463,7 +521,7 @@ func createNodes(aggCtx, ctx context.Context, num int, t *testing.T) ([]*FullNod return nodes, apps } -func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys []crypto.PrivKey, t *testing.T) (Node, *mocks.Application) { +func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys []crypto.PrivKey, bmConfig config.BlockManagerConfig, t *testing.T) (Node, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface @@ -472,11 +530,6 @@ func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys p2pConfig := config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } - bmConfig := config.BlockManagerConfig{ - DABlockTime: 100 * time.Millisecond, - BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly - NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1}, - } for i := 0; i < len(keys); i++ { if i == n { continue diff --git a/node/helpers_test.go b/node/helpers_test.go index 8f515fa8f86..f4972a11de3 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -40,8 +40,9 @@ func TestGetNodeHeight(t *testing.T) { for i := 0; i < num; i++ { keys[i], _, _ = crypto.GenerateEd25519Key(rand.Reader) } - fullNode, _ := createNode(ctx, 0, true, false, keys, t) - lightNode, _ := createNode(ctx, 1, true, true, keys, t) + bmConfig := getBMConfig() + fullNode, _ := createNode(ctx, 0, true, false, keys, bmConfig, t) + lightNode, _ := createNode(ctx, 1, true, true, keys, bmConfig, t) fullNode.(*FullNode).dalc = dalc fullNode.(*FullNode).blockManager.SetDALC(dalc) require.NoError(fullNode.Start()) @@ -55,7 +56,7 @@ func TestGetNodeHeight(t *testing.T) { }() require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { - num, err := getNodeHeight(fullNode, false) + num, err := getNodeHeight(fullNode, Header) if err != nil { return err } @@ -65,7 +66,27 @@ func TestGetNodeHeight(t *testing.T) { return errors.New("expected height > 0") })) require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { - num, err := getNodeHeight(lightNode, false) + num, err := getNodeHeight(fullNode, Block) + if err != nil { + return err + } + if num > 0 { + return nil + } + return errors.New("expected height > 0") + })) + require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { + num, err := getNodeHeight(fullNode, Store) + if err != nil { + return err + } + if num > 0 { + return nil + } + return errors.New("expected height > 0") + })) + require.NoError(testutils.Retry(1000, 100*time.Millisecond, func() error { + num, err := getNodeHeight(lightNode, Header) if err != nil { return err } diff --git a/node/test_helpers.go b/node/test_helpers.go index 4185a49f01b..e2e19d946c3 100644 --- a/node/test_helpers.go +++ b/node/test_helpers.go @@ -12,7 +12,17 @@ import ( cmtypes "github.com/cometbft/cometbft/types" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rollkit/rollkit/config" "github.com/rollkit/rollkit/conv" + "github.com/rollkit/rollkit/types" +) + +type Source int + +const ( + Header Source = iota + Block + Store ) var genesisValidatorKey = ed25519.GenPrivKey() @@ -29,14 +39,32 @@ func (m MockTester) Logf(format string, args ...interface{}) {} func (m MockTester) Errorf(format string, args ...interface{}) {} -func waitForFirstBlock(node *FullNode, useBlockExchange bool) error { - return waitForAtLeastNBlocks(node, 1, useBlockExchange) +func waitForFirstBlock(node Node, source Source) error { + return waitForAtLeastNBlocks(node, 1, source) } -func getNodeHeight(node Node, useBlockExchange bool) (uint64, error) { - if useBlockExchange { +func getBMConfig() config.BlockManagerConfig { + return config.BlockManagerConfig{ + DABlockTime: 100 * time.Millisecond, + BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly + NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1}, + } +} + +func getNodeHeight(node Node, source Source) (uint64, error) { + switch source { + case Header: + return getNodeHeightFromHeader(node) + case Block: return getNodeHeightFromBlock(node) + case Store: + return getNodeHeightFromStore(node) + default: + return 0, errors.New("invalid source") } +} + +func getNodeHeightFromHeader(node Node) (uint64, error) { if fn, ok := node.(*FullNode); ok { return fn.hExService.headerStore.Height(), nil } @@ -53,13 +81,20 @@ func getNodeHeightFromBlock(node Node) (uint64, error) { return 0, errors.New("not a full node") } -func verifyNodesSynced(node1, node2 Node, useBlockExchange bool) error { +func getNodeHeightFromStore(node Node) (uint64, error) { + if fn, ok := node.(*FullNode); ok { + return fn.blockManager.GetStoreHeight(), nil + } + return 0, errors.New("not a full node") +} + +func verifyNodesSynced(node1, node2 Node, source Source) error { return testutils.Retry(300, 100*time.Millisecond, func() error { - n1Height, err := getNodeHeight(node1, useBlockExchange) + n1Height, err := getNodeHeight(node1, source) if err != nil { return err } - n2Height, err := getNodeHeight(node2, useBlockExchange) + n2Height, err := getNodeHeight(node2, source) if err != nil { return err } @@ -70,9 +105,9 @@ func verifyNodesSynced(node1, node2 Node, useBlockExchange bool) error { }) } -func waitForAtLeastNBlocks(node Node, n int, useBlockExchange bool) error { +func waitForAtLeastNBlocks(node Node, n int, source Source) error { return testutils.Retry(300, 100*time.Millisecond, func() error { - nHeight, err := getNodeHeight(node, useBlockExchange) + nHeight, err := getNodeHeight(node, source) if err != nil { return err } diff --git a/types/signed_header.go b/types/signed_header.go index b8226dc07f7..575ed715715 100644 --- a/types/signed_header.go +++ b/types/signed_header.go @@ -30,6 +30,17 @@ func (sH *SignedHeader) Verify(untrst header.Header) error { Reason: err, } } + if err := sH.Header.Verify(&untrstH.Header); err != nil { + return &header.VerifyError{ + Reason: err, + } + } + + // TODO: Accept non-adjacent headers until go-header implements feature to accept non-adjacent + if sH.Height()+1 < untrst.Height() { + return nil + } + sHHash := sH.Header.Hash() if !bytes.Equal(untrstH.LastHeaderHash[:], sHHash) { return &header.VerifyError{ @@ -42,7 +53,7 @@ func (sH *SignedHeader) Verify(untrst header.Header) error { Reason: fmt.Errorf("last commit hash %v does not match hash of previous header %v", untrstH.LastCommitHash[:], sHHash), } } - return sH.Header.Verify(&untrstH.Header) + return nil } var _ header.Header = &SignedHeader{}