Skip to content

Commit

Permalink
Merge pull request cosmos#487 from celestiaorg/tzdybal/use_last_commit
Browse files Browse the repository at this point in the history
feat: use gossiped commit in sync
  • Loading branch information
tzdybal authored Sep 8, 2022
2 parents 41d5cc3 + 358bf20 commit 3420af7
Show file tree
Hide file tree
Showing 10 changed files with 469 additions and 113 deletions.
151 changes: 93 additions & 58 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ type Manager struct {
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.Header
HeaderInCh chan *types.Header
HeaderOutCh chan *types.SignedHeader
HeaderInCh chan *types.SignedHeader

CommitInCh chan *types.Commit
CommitOutCh chan *types.Commit
lastCommit *types.Commit
CommitInCh chan *types.Commit
lastCommit atomic.Value

syncTarget uint64
blockInCh chan newBlockEvent
Expand Down Expand Up @@ -138,10 +137,9 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.Header, 100),
HeaderInCh: make(chan *types.Header, 100),
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
CommitInCh: make(chan *types.Commit, 100),
CommitOutCh: make(chan *types.Commit, 100),
blockInCh: make(chan newBlockEvent, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
Expand Down Expand Up @@ -195,8 +193,8 @@ func (m *Manager) SyncLoop(ctx context.Context) {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash())
newHeight := header.Height
m.logger.Debug("block header received", "height", header.Header.Height, "hash", header.Header.Hash())
newHeight := header.Header.Height
currentHeight := m.store.Height()
// in case of client reconnecting after being offline
// newHeight may be significantly larger than currentHeight
Expand All @@ -205,9 +203,16 @@ func (m *Manager) SyncLoop(ctx context.Context) {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCond.Signal()
}
m.CommitInCh <- &header.Commit
case commit := <-m.CommitInCh:
// TODO(tzdybal): check if it's from right aggregator
m.lastCommit = commit
m.lastCommit.Store(commit)
err := m.trySyncNextBlock(ctx, 0)
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
} else {
m.logger.Debug("synced using gossiped commit", "height", commit.Height)
}
case blockEvent := <-m.blockInCh:
block := blockEvent.block
daHeight := blockEvent.daHeight
Expand All @@ -218,49 +223,86 @@ func (m *Manager) SyncLoop(ctx context.Context) {
)
m.syncCache[block.Header.Height] = block
m.retrieveCond.Signal()
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory
b1, ok1 := m.syncCache[currentHeight+1]
b2, ok2 := m.syncCache[currentHeight+2]
if ok1 && ok2 {
m.logger.Info("Syncing block", "height", b1.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if err != nil {
m.logger.Error("failed to ApplyBlock", "error", err)
continue
}
err = m.store.SaveBlock(b1, &b2.LastCommit)
if err != nil {
m.logger.Error("failed to save block", "error", err)
continue
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
if err != nil {
m.logger.Error("failed to Commit", "error", err)
continue
}
m.store.SetHeight(b1.Header.Height)

err = m.store.SaveBlockResponses(b1.Header.Height, responses)
if err != nil {
m.logger.Error("failed to save block responses", "error", err)
continue
}

newState.DAHeight = daHeight
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
continue
}
delete(m.syncCache, currentHeight+1)
err := m.trySyncNextBlock(ctx, daHeight)
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
}
case <-ctx.Done():
return
}
}
}

// trySyncNextBlock tries to progress one step (one block) in sync process.
//
// To be able to apply block and height h, we need to have its Commit. It is contained in block at height h+1.
// If block at height h+1 is not available, value of last gossiped commit is checked.
// If commit for block h is available, we proceed with sync process, and remove synced block from sync cache.
func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
var b1 *types.Block
var commit *types.Commit
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory

b1, ok1 := m.syncCache[currentHeight+1]
if !ok1 {
return nil
}
b2, ok2 := m.syncCache[currentHeight+2]
if ok2 {
m.logger.Debug("using last commit from next block")
commit = &b2.LastCommit
} else {
lastCommit := m.getLastCommit()
if lastCommit != nil && lastCommit.Height == currentHeight+1 {
m.logger.Debug("using gossiped commit")
commit = lastCommit
}
}

if b1 != nil && commit != nil {
m.logger.Info("Syncing block", "height", b1.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
}
err = m.store.SaveBlock(b1, commit)
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(b1.Header.Height)

err = m.store.SaveBlockResponses(b1.Header.Height, responses)
if err != nil {
return fmt.Errorf("failed to save block responses: %w", err)
}

if daHeight > newState.DAHeight {
newState.DAHeight = daHeight
}
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
}
delete(m.syncCache, currentHeight+1)
}

return nil
}

func (m *Manager) getLastCommit() *types.Commit {
ptr := m.lastCommit.Load()
if ptr == nil {
return nil
}
return ptr.(*types.Commit)
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
Expand Down Expand Up @@ -365,6 +407,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

var block *types.Block
var commit *types.Commit

// Check if there's an already stored block at a newer height
// If there is use that instead of creating a new block
Expand All @@ -385,7 +428,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
if err != nil {
return err
}
commit := &types.Commit{
commit = &types.Commit{
Height: block.Header.Height,
HeaderHash: block.Header.Hash(),
Signatures: []types.Signature{sign},
Expand Down Expand Up @@ -441,8 +484,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(block.Header.Height)

m.publishHeader(block)
m.publishCommit(lastCommit)
m.publishSignedHeader(block, commit)

return nil
}
Expand All @@ -468,8 +510,6 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error
return fmt.Errorf("Failed to submit block to DA layer after %d attempts", maxSubmitAttempts)
}

m.HeaderOutCh <- &block.Header

return nil
}

Expand All @@ -482,13 +522,8 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishHeader(block *types.Block) {
m.HeaderOutCh <- &block.Header
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishCommit(commit *types.Commit) {
m.CommitOutCh <- commit
func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) {
m.HeaderOutCh <- &types.SignedHeader{Header: block.Header, Commit: *commit}
}

func updateState(s *types.State, res *abci.ResponseInitChain) {
Expand Down
7 changes: 4 additions & 3 deletions node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestTxGossipingAndAggregation(t *testing.T) {
for _, n := range nodes {
require.NoError(n.Stop())
}
time.Sleep(100 * time.Millisecond)
aggApp := apps[0]
apps = apps[1:]

Expand Down Expand Up @@ -150,10 +151,10 @@ func TestTxGossipingAndAggregation(t *testing.T) {

// assert that all blocks known to node are same as produced by aggregator
for h := uint64(1); h <= nodes[i].Store.Height(); h++ {
nodeBlock, err := nodes[i].Store.LoadBlock(h)
require.NoError(err)
aggBlock, err := nodes[0].Store.LoadBlock(h)
require.NoError(err)
nodeBlock, err := nodes[i].Store.LoadBlock(h)
require.NoError(err)
assert.Equal(aggBlock, nodeBlock)
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, key
ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n),
}
bmConfig := config.BlockManagerConfig{
BlockTime: 1 * time.Second,
BlockTime: 300 * time.Millisecond,
NamespaceID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1},
}
for i := 0; i < len(keys); i++ {
Expand Down
13 changes: 7 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (n *Node) initGenesisChunks() error {
func (n *Node) headerPublishLoop(ctx context.Context) {
for {
select {
case header := <-n.blockManager.HeaderOutCh:
headerBytes, err := header.MarshalBinary()
case signedHeader := <-n.blockManager.HeaderOutCh:
headerBytes, err := signedHeader.MarshalBinary()
if err != nil {
n.Logger.Error("failed to serialize block header", "error", err)
n.Logger.Error("failed to serialize signed block header", "error", err)
}
err = n.P2P.GossipHeader(ctx, headerBytes)
err = n.P2P.GossipSignedHeader(ctx, headerBytes)
if err != nil {
n.Logger.Error("failed to gossip block header", "error", err)
n.Logger.Error("failed to gossip signed block header", "error", err)
}
case <-ctx.Done():
return
Expand Down Expand Up @@ -309,7 +309,7 @@ func (n *Node) newTxValidator() p2p.GossipValidator {
func (n *Node) newHeaderValidator() p2p.GossipValidator {
return func(headerMsg *p2p.GossipMessage) bool {
n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data))
var header types.Header
var header types.SignedHeader
err := header.UnmarshalBinary(headerMsg.Data)
if err != nil {
n.Logger.Error("failed to deserialize header", "error", err)
Expand Down Expand Up @@ -341,6 +341,7 @@ func (n *Node) newCommitValidator() p2p.GossipValidator {
n.Logger.Error("failed to validate commit", "error", err)
return false
}
n.Logger.Debug("commit received", "height", commit.Height)
n.blockManager.CommitInCh <- &commit
return true
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (c *Client) SetTxValidator(val GossipValidator) {
c.txValidator = val
}

// GossipHeader sends the block header to the P2P network.
func (c *Client) GossipHeader(ctx context.Context, headerBytes []byte) error {
// GossipSignedHeader sends the block header to the P2P network.
func (c *Client) GossipSignedHeader(ctx context.Context, headerBytes []byte) error {
c.logger.Debug("Gossiping block header", "len", len(headerBytes))
return c.headerGossiper.Publish(ctx, headerBytes)
}
Expand Down
5 changes: 5 additions & 0 deletions proto/optimint/optimint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ message Commit {
repeated bytes signatures = 3;
}

message SignedHeader {
Header header = 1;
Commit commit = 2;
}

message Data {
repeated bytes txs = 1;
repeated bytes intermediate_state_roots = 2;
Expand Down
10 changes: 9 additions & 1 deletion types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,21 @@ type EvidenceData struct {
Evidence []Evidence
}

// Commit cointains evidence of block creation.
// Commit contains evidence of block creation.
type Commit struct {
Height uint64
HeaderHash [32]byte
Signatures []Signature // most of the time this is a single signature
}

// SignedHeader combines Header and its Commit.
//
// Used mostly for gossiping.
type SignedHeader struct {
Header Header
Commit Commit
}

// Signature represents signature of block creator.
type Signature []byte

Expand Down
6 changes: 3 additions & 3 deletions types/pb/dalc/dalc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3420af7

Please sign in to comment.