From 35c752a0337b34c8d8499e13339b4ffdae0c87fa Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 2 Dec 2021 17:45:53 +0530 Subject: [PATCH 01/15] temp --- lib/grandpa/network.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index a959d893a8..089a9c36bd 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -55,12 +55,7 @@ func (hs *GrandpaHandshake) Encode() ([]byte, error) { // Decode the message into a GrandpaHandshake func (hs *GrandpaHandshake) Decode(in []byte) error { - err := scale.Unmarshal(in, hs) - if err != nil { - return err - } - - return nil + return scale.Unmarshal(in, hs) } // Type ... From 12704296093d0e27f29a035b40a4cb8daab8a7bb Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 6 Dec 2021 21:16:21 +0530 Subject: [PATCH 02/15] temp --- dot/network/notifications.go | 1 + lib/grandpa/message_tracker.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index b1b0479f92..9f06b24a87 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -289,6 +289,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } + // this is the place // TODO: ensure grandpa stores *all* previously received votes and discards them // only when they are for already finalised rounds; currently this causes issues // because a vote might be received slightly too early, causing a round mismatch err, diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index d0fceea06b..4857ab6211 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -11,8 +11,9 @@ import ( "github.com/ChainSafe/gossamer/lib/crypto/ed25519" ) -// tracker keeps track of messages that have been received that have failed to validate with ErrBlockDoesNotExist -// these messages may be needed again in the case that we are slightly out of sync with the rest of the network +// tracker keeps track of messages that have been received that have failed to +// validate with ErrBlockDoesNotExist. These messages may be needed again in the +// case that we are slightly out of sync with the rest of the network. type tracker struct { blockState BlockState handler *MessageHandler From d909eea72d91f5caf9a260b78efa9d4c3cd224ae Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 8 Dec 2021 17:43:53 +0530 Subject: [PATCH 03/15] temp --- lib/grandpa/grandpa.go | 5 ++++- lib/grandpa/message_handler.go | 16 ++++++++++++++-- lib/grandpa/message_tracker.go | 25 +++++++++++++++++++++---- lib/grandpa/network.go | 2 ++ lib/grandpa/vote_message.go | 5 +++-- 5 files changed, 44 insertions(+), 9 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 0955dcbb7f..43774c12d5 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -483,7 +483,7 @@ func (s *Service) playGrandpaRound() error { } logger.Debug("receiving pre-vote messages...") - go s.receiveMessages(ctx) + go s.receiveVoteMessages(ctx) time.Sleep(s.interval) if s.paused.Load().(bool) { @@ -507,6 +507,8 @@ func (s *Service) playGrandpaRound() error { logger.Debugf("sending pre-vote message %s...", pv) roundComplete := make(chan struct{}) + // <-roundComplete will receive the default value of channel's type when it gets + // closed, so we don't need to explicitly send a value. defer close(roundComplete) // continue to send prevote messages until round is done @@ -550,6 +552,7 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() + // this for loop might be the place where messages are getting rebroadcasted for { if s.paused.Load().(bool) { return diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 57dd870449..c6618d3008 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -53,11 +53,23 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CommitMessage: return nil, h.handleCommitMessage(msg) case *NeighbourMessage: + // It seems like we can afford to not retry handling neighbour message + // if it errors. return nil, h.handleNeighbourMessage(msg) case *CatchUpRequest: - return h.handleCatchUpRequest(msg) + notificationsMessage, err := h.handleCatchUpRequest(msg) + if err != nil { + // TODO: If I can directly access tracker, why are we using in channel for + // networkVoteMessage + h.grandpa.tracker.addCatchUpRequest(msg) + } + return notificationsMessage, err case *CatchUpResponse: - return nil, h.handleCatchUpResponse(msg) + err := h.handleCatchUpResponse(msg) + if err != nil { + h.grandpa.tracker.addCatchUpResponse(msg) + } + return nil, err default: return nil, ErrInvalidMessageType } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 4857ab6211..1508ee196d 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -11,7 +11,7 @@ import ( "github.com/ChainSafe/gossamer/lib/crypto/ed25519" ) -// tracker keeps track of messages that have been received that have failed to +// tracker keeps track of messages that have been received, but have failed to // validate with ErrBlockDoesNotExist. These messages may be needed again in the // case that we are slightly out of sync with the rest of the network. type tracker struct { @@ -24,18 +24,20 @@ type tracker struct { mapLock sync.Mutex in chan *types.Block // receive imported block from BlockState stopped chan struct{} + // round is used as key + catchUpRequestMessages map[uint64]*CatchUpRequest + // round is used as key + catchUpResponseMessages map[uint64]*CatchUpResponse } func newTracker(bs BlockState, handler *MessageHandler) *tracker { - in := bs.GetImportedBlockNotifierChannel() - return &tracker{ blockState: bs, handler: handler, voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage), commitMessages: make(map[common.Hash]*CommitMessage), mapLock: sync.Mutex{}, - in: in, + in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), } } @@ -72,6 +74,20 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } +func (t *tracker) addCatchUpRequest(cr *CatchUpRequest) { + t.mapLock.Lock() + defer t.mapLock.Unlock() + + t.catchUpRequestMessages[cr.Round] = cr +} + +func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { + t.mapLock.Lock() + defer t.mapLock.Unlock() + + t.catchUpResponseMessages[cr.Round] = cr +} + func (t *tracker) handleBlocks() { for { select { @@ -94,6 +110,7 @@ func (t *tracker) handleBlock(b *types.Block) { h := b.Header.Hash() if vms, has := t.voteMessages[h]; has { for _, v := range vms { + // handleMessage would never error for vote message _, err := t.handler.handleMessage(v.from, v.msg) if err != nil { logger.Warnf("failed to handle vote message %v: %s", v, err) diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 089a9c36bd..718e9ed992 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -154,8 +154,10 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( switch m.(type) { case *NeighbourMessage: + // TODO: why don't we do anything? return false, nil case *CatchUpResponse: + // TODO: why don't we do anything? return false, nil } diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 07f5b798e2..2741958887 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -22,8 +22,8 @@ type networkVoteMessage struct { msg *VoteMessage } -// receiveMessages receives messages from the in channel until the specified condition is met -func (s *Service) receiveMessages(ctx context.Context) { +// receiveVoteMessages receives messages from the in channel until the specified condition is met +func (s *Service) receiveVoteMessages(ctx context.Context) { for { select { case msg, ok := <-s.in: @@ -158,6 +158,7 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) { return nil, ErrSetIDMismatch } + // This is where round mismatch is being checked // check that vote is for current round if m.Round != s.state.round { if m.Round < s.state.round { From e4c791e3aa8b6ee0a9e70eba4af56a35d0e3031b Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 8 Dec 2021 21:56:49 +0530 Subject: [PATCH 04/15] lib/grandpa: ensure messages are stored and re-processed when needed --- dot/network/notifications.go | 8 +++++--- lib/grandpa/grandpa.go | 1 - lib/grandpa/message_handler.go | 13 ++++++------- lib/grandpa/message_tracker.go | 9 --------- lib/grandpa/message_tracker_test.go | 6 +++--- lib/grandpa/network.go | 2 -- lib/grandpa/vote_message.go | 12 +++++++----- lib/grandpa/vote_message_test.go | 12 ++++++------ 8 files changed, 27 insertions(+), 36 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 9f06b24a87..8aef384807 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -289,13 +289,15 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - // this is the place // TODO: ensure grandpa stores *all* previously received votes and discards them // only when they are for already finalised rounds; currently this causes issues // because a vote might be received slightly too early, causing a round mismatch err, // causing grandpa to discard the vote. (#1855) - _, isConsensusMsg := msg.(*ConsensusMessage) - if !added && !isConsensusMsg { + // added means message was already sent. + // for consensus messages we are happy to send them again, but not for other messages + // TODO: Is it bad behaviour to send consensus message multiple times? + // TODO: What to do if vote does not reach because of network related issue? + if !added { return } } diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 43774c12d5..997c6dd972 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -552,7 +552,6 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() - // this for loop might be the place where messages are getting rebroadcasted for { if s.paused.Load().(bool) { return diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index c6618d3008..4d78299947 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -57,16 +57,15 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. // if it errors. return nil, h.handleNeighbourMessage(msg) case *CatchUpRequest: - notificationsMessage, err := h.handleCatchUpRequest(msg) - if err != nil { - // TODO: If I can directly access tracker, why are we using in channel for - // networkVoteMessage - h.grandpa.tracker.addCatchUpRequest(msg) - } - return notificationsMessage, err + // CatchUpRequest seems like something that can be dropped, if we fail + // to process it + return h.handleCatchUpRequest(msg) case *CatchUpResponse: err := h.handleCatchUpResponse(msg) + // TODO: Retry for which errors if err != nil { + // TODO: If I can directly access tracker, why are we using `in` channel for + // networkVoteMessage h.grandpa.tracker.addCatchUpResponse(msg) } return nil, err diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 1508ee196d..f927209cc7 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -25,8 +25,6 @@ type tracker struct { in chan *types.Block // receive imported block from BlockState stopped chan struct{} // round is used as key - catchUpRequestMessages map[uint64]*CatchUpRequest - // round is used as key catchUpResponseMessages map[uint64]*CatchUpResponse } @@ -74,13 +72,6 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } -func (t *tracker) addCatchUpRequest(cr *CatchUpRequest) { - t.mapLock.Lock() - defer t.mapLock.Unlock() - - t.catchUpRequestMessages[cr.Round] = cr -} - func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { t.mapLock.Lock() defer t.mapLock.Unlock() diff --git a/lib/grandpa/message_tracker_test.go b/lib/grandpa/message_tracker_test.go index e29c19eb5b..f2b147d118 100644 --- a/lib/grandpa/message_tracker_test.go +++ b/lib/grandpa/message_tracker_test.go @@ -37,7 +37,7 @@ func TestMessageTracker_ValidateMessage(t *testing.T) { msg: msg, } - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrBlockDoesNotExist) require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) } @@ -69,7 +69,7 @@ func TestMessageTracker_SendMessage(t *testing.T) { msg: msg, } - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrBlockDoesNotExist) require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) @@ -115,7 +115,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) { msg: msg, } - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, ErrBlockDoesNotExist, err) require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 718e9ed992..089a9c36bd 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -154,10 +154,8 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( switch m.(type) { case *NeighbourMessage: - // TODO: why don't we do anything? return false, nil case *CatchUpResponse: - // TODO: why don't we do anything? return false, nil } diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 9901cb0ff4..bcc7d27a3a 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -65,7 +65,7 @@ func (s *Service) receiveVoteMessages(ctx context.Context) { logger.Warnf("unsupported stage %s", vm.Message.Stage.String()) } - v, err := s.validateMessage(msg.from, vm) + v, err := s.validateVoteMessage(msg.from, vm) if err != nil { logger.Debugf("failed to validate vote message %v: %s", vm, err) continue @@ -122,9 +122,9 @@ func (s *Service) createSignedVoteAndVoteMessage(vote *Vote, stage Subround) (*S return pc, vm, nil } -// validateMessage validates a VoteMessage and adds it to the current votes +// validateVoteMessage validates a VoteMessage and adds it to the current votes // it returns the resulting vote if validated, error otherwise -func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) { +func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, error) { // make sure round does not increment while VoteMessage is being validated s.roundLock.Lock() defer s.roundLock.Unlock() @@ -153,12 +153,13 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) { return nil, err } - // check that setIDs match + // TODO: Change in set ID means possible change in voters (authorities). That + // would make me think that I could avoid the message in this case. Is that so? + // It seems the vote is considered invalid if set ID do not match. if m.SetID != s.state.setID { return nil, ErrSetIDMismatch } - // This is where round mismatch is being checked // check that vote is for current round if m.Round != s.state.round { if m.Round < s.state.round { @@ -230,6 +231,7 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) { equivocated := s.checkForEquivocation(voter, just, m.Message.Stage) if equivocated { + // A vote is considered invalid if it is equivocatory return nil, ErrEquivocation } diff --git a/lib/grandpa/vote_message_test.go b/lib/grandpa/vote_message_test.go index 5b05b0476d..c3da00ba21 100644 --- a/lib/grandpa/vote_message_test.go +++ b/lib/grandpa/vote_message_test.go @@ -183,7 +183,7 @@ func TestValidateMessage_Valid(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - vote, err := gs.validateMessage("", msg) + vote, err := gs.validateVoteMessage("", msg) require.NoError(t, err) require.Equal(t, h.Hash(), vote.Hash) } @@ -219,7 +219,7 @@ func TestValidateMessage_InvalidSignature(t *testing.T) { msg.Message.Signature[63] = 0 - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrInvalidSignature) } @@ -253,7 +253,7 @@ func TestValidateMessage_SetIDMismatch(t *testing.T) { gs.state.setID = 1 - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrSetIDMismatch) } @@ -298,7 +298,7 @@ func TestValidateMessage_Equivocation(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, ErrEquivocation, err, gs.prevotes) } @@ -333,7 +333,7 @@ func TestValidateMessage_BlockDoesNotExist(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrBlockDoesNotExist) } @@ -374,6 +374,6 @@ func TestValidateMessage_IsNotDescendant(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - _, err = gs.validateMessage("", msg) + _, err = gs.validateVoteMessage("", msg) require.Equal(t, errInvalidVoteBlock, err, gs.prevotes) } From 82d1248432ddf8de2fae035da4ffc39bda77020f Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Fri, 10 Dec 2021 17:09:39 +0530 Subject: [PATCH 05/15] removing extra comments --- dot/network/notifications.go | 12 ++++-------- lib/grandpa/grandpa.go | 1 + 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 8aef384807..18684c9aa9 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -289,19 +289,15 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - // TODO: ensure grandpa stores *all* previously received votes and discards them - // only when they are for already finalised rounds; currently this causes issues - // because a vote might be received slightly too early, causing a round mismatch err, - // causing grandpa to discard the vote. (#1855) - // added means message was already sent. - // for consensus messages we are happy to send them again, but not for other messages - // TODO: Is it bad behaviour to send consensus message multiple times? - // TODO: What to do if vote does not reach because of network related issue? + // if we could not add message in cache, message was already present in the + // cache and was sent. if !added { return } } + // TODO: Add message to cache after we successfully wrote it to stream? + // we've completed the handshake with the peer, send message directly logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg) if err := s.host.writeToStream(stream, msg); err != nil { diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 997c6dd972..4dab0d57ea 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -552,6 +552,7 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() + // TODO: Send the message just once? for { if s.paused.Load().(bool) { return From b651b6e3cc8ce2094863af82887ae4b51ce7f249 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 16 Dec 2021 13:01:53 +0530 Subject: [PATCH 06/15] Addressed some reviews --- dot/network/notifications.go | 22 ++++++++-------------- lib/grandpa/grandpa.go | 3 ++- lib/grandpa/vote_message.go | 4 +--- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 18684c9aa9..e5a4ba7f7b 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -282,22 +282,11 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - if s.host.messageCache != nil { - added, err := s.host.messageCache.put(peer, msg) - if err != nil { - logger.Errorf("failed to add message to cache for peer %s: %s", peer, err) - return - } - - // if we could not add message in cache, message was already present in the - // cache and was sent. - if !added { - return - } + if (s.host.messageCache != nil) && (s.host.messageCache.exists(peer, msg)) { + // message has already been sent + return } - // TODO: Add message to cache after we successfully wrote it to stream? - // we've completed the handshake with the peer, send message directly logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg) if err := s.host.writeToStream(stream, msg); err != nil { @@ -308,6 +297,11 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc closeOutboundStream(info, peer, stream) } return + } else if s.host.messageCache != nil { + if _, err := s.host.messageCache.put(peer, msg); err != nil { + logger.Errorf("failed to add message to cache for peer %s: %s", peer, err) + return + } } logger.Tracef("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 4dab0d57ea..6f7313464e 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -552,7 +552,8 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() - // TODO: Send the message just once? + // Eventhough, this looks like we are sending messages multiple times, + // caching would make sure that they are being sent only once. for { if s.paused.Load().(bool) { return diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index bcc7d27a3a..6cdfba1f7e 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -153,9 +153,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro return nil, err } - // TODO: Change in set ID means possible change in voters (authorities). That - // would make me think that I could avoid the message in this case. Is that so? - // It seems the vote is considered invalid if set ID do not match. + // vote is considered invalid if set ID do not match. if m.SetID != s.state.setID { return nil, ErrSetIDMismatch } From 0231833d6000581e1722568f90d73fa76fe6e072 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Fri, 17 Dec 2021 19:34:09 +0530 Subject: [PATCH 07/15] addressed more reviews --- lib/grandpa/message_handler.go | 14 ++++++-------- lib/grandpa/message_tracker.go | 24 +++++++++++------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 4d78299947..b473161239 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -53,19 +53,17 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CommitMessage: return nil, h.handleCommitMessage(msg) case *NeighbourMessage: - // It seems like we can afford to not retry handling neighbour message - // if it errors. + // we can afford to not retry handling neighbour message, if it errors. return nil, h.handleNeighbourMessage(msg) case *CatchUpRequest: - // CatchUpRequest seems like something that can be dropped, if we fail - // to process it return h.handleCatchUpRequest(msg) case *CatchUpResponse: err := h.handleCatchUpResponse(msg) - // TODO: Retry for which errors - if err != nil { - // TODO: If I can directly access tracker, why are we using `in` channel for - // networkVoteMessage + if err == blocktree.ErrNodeNotFound { + // TODO: we are adding these messages to reprocess them again, but we + // haven't added code to reprocess them. Do that. + // Also, revisit if we need to add these message in synchronous manner + // or not. If not, change catchUpResponseMessages to a normal map. #1531 h.grandpa.tracker.addCatchUpResponse(msg) } return nil, err diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index f927209cc7..b65c3b9512 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -24,19 +24,20 @@ type tracker struct { mapLock sync.Mutex in chan *types.Block // receive imported block from BlockState stopped chan struct{} - // round is used as key - catchUpResponseMessages map[uint64]*CatchUpResponse + // round(uint64) is used as key and *CatchUpResponse as value + catchUpResponseMessages sync.Map } func newTracker(bs BlockState, handler *MessageHandler) *tracker { return &tracker{ - blockState: bs, - handler: handler, - voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage), - commitMessages: make(map[common.Hash]*CommitMessage), - mapLock: sync.Mutex{}, - in: bs.GetImportedBlockNotifierChannel(), - stopped: make(chan struct{}), + blockState: bs, + handler: handler, + voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage), + commitMessages: make(map[common.Hash]*CommitMessage), + mapLock: sync.Mutex{}, + in: bs.GetImportedBlockNotifierChannel(), + stopped: make(chan struct{}), + catchUpResponseMessages: sync.Map{}, } } @@ -73,10 +74,7 @@ func (t *tracker) addCommit(cm *CommitMessage) { } func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { - t.mapLock.Lock() - defer t.mapLock.Unlock() - - t.catchUpResponseMessages[cr.Round] = cr + t.catchUpResponseMessages.Store(cr.Round, cr) } func (t *tracker) handleBlocks() { From b0973e996e39b611f402f386fb8686e9f76071eb Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Tue, 4 Jan 2022 11:26:49 +0530 Subject: [PATCH 08/15] rephrase the comment --- lib/grandpa/grandpa.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 6f7313464e..b438d8afdf 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -507,8 +507,9 @@ func (s *Service) playGrandpaRound() error { logger.Debugf("sending pre-vote message %s...", pv) roundComplete := make(chan struct{}) - // <-roundComplete will receive the default value of channel's type when it gets - // closed, so we don't need to explicitly send a value. + // roundComplete is a signal channel which is closed when the round completes + // (will receive the default value of channel's type), so we don't need to + // explicitly send a value. defer close(roundComplete) // continue to send prevote messages until round is done From b8d630a90c2ef1648b3b65c12f26b1acafb96d49 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 5 Jan 2022 15:52:05 +0530 Subject: [PATCH 09/15] trying to pass TestSync_SingleBlockProducer --- tests/stress/stress_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stress/stress_test.go b/tests/stress/stress_test.go index 3aa7ece8d4..5f9b81dca8 100644 --- a/tests/stress/stress_test.go +++ b/tests/stress/stress_test.go @@ -111,7 +111,7 @@ func TestSync_SingleBlockProducer(t *testing.T) { numCmps := 10 for i := 0; i < numCmps; i++ { - time.Sleep(time.Second) + time.Sleep(3 * time.Second) t.Log("comparing...", i) hashes, err := compareBlocksByNumberWithRetry(t, nodes, strconv.Itoa(i)) if len(hashes) > 1 || len(hashes) == 0 { From eefa437d7fe397dc1a8cd5a87fe87d13e449b23f Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 5 Jan 2022 17:47:11 +0530 Subject: [PATCH 10/15] Update lib/grandpa/grandpa.go Co-authored-by: Quentin McGaw --- lib/grandpa/grandpa.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index b438d8afdf..0e2eb7a1a5 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -553,7 +553,7 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() - // Eventhough, this looks like we are sending messages multiple times, + // Though, this looks like we are sending messages multiple times, // caching would make sure that they are being sent only once. for { if s.paused.Load().(bool) { From 9be22f63fe63a73cc7a075f61929871952b72cbf Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 5 Jan 2022 17:47:45 +0530 Subject: [PATCH 11/15] Update dot/network/notifications.go Co-authored-by: Quentin McGaw --- dot/network/notifications.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 464b3017b6..660a1d09e4 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -305,7 +305,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } else if s.host.messageCache != nil { if _, err := s.host.messageCache.put(peer, msg); err != nil { - logger.Errorf("failed to add message to cache for peer %s: %s", peer, err) + logger.Errorf("failed to add message to cache for peer %s: %w", peer, err) return } } From 6fdead0d76fd6bc9d8599f08456f0bf1207b032a Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 5 Jan 2022 17:48:53 +0530 Subject: [PATCH 12/15] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index e6b8c609ac..0d0f6620a3 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -59,7 +59,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. return h.handleCatchUpRequest(msg) case *CatchUpResponse: err := h.handleCatchUpResponse(msg) - if err == blocktree.ErrNodeNotFound { + if errors.Is(err, blocktree.ErrNodeNotFound) { // TODO: we are adding these messages to reprocess them again, but we // haven't added code to reprocess them. Do that. // Also, revisit if we need to add these message in synchronous manner From 93417ff6d8c306506fa8874326f72923e8bf2f16 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 5 Jan 2022 17:49:17 +0530 Subject: [PATCH 13/15] Update lib/grandpa/vote_message.go Co-authored-by: Quentin McGaw --- lib/grandpa/vote_message.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 6cdfba1f7e..2cf24d44d8 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -229,7 +229,6 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro equivocated := s.checkForEquivocation(voter, just, m.Message.Stage) if equivocated { - // A vote is considered invalid if it is equivocatory return nil, ErrEquivocation } From 9dd727d54c7da25c371c98db25d8619cf359a129 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 10 Jan 2022 21:46:42 +0530 Subject: [PATCH 14/15] addressed some reviews --- lib/grandpa/grandpa.go | 2 +- lib/grandpa/message_tracker.go | 10 +++++++--- lib/grandpa/vote_message.go | 3 +-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index b438d8afdf..40d7c8cdbc 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -553,7 +553,7 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() - // Eventhough, this looks like we are sending messages multiple times, + // Though this looks like we are sending messages multiple times, // caching would make sure that they are being sent only once. for { if s.paused.Load().(bool) { diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index b65c3b9512..4f540ac0dc 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -24,8 +24,10 @@ type tracker struct { mapLock sync.Mutex in chan *types.Block // receive imported block from BlockState stopped chan struct{} + + catchUpResponseMessageMutex sync.Mutex // round(uint64) is used as key and *CatchUpResponse as value - catchUpResponseMessages sync.Map + catchUpResponseMessages map[uint64]*CatchUpResponse } func newTracker(bs BlockState, handler *MessageHandler) *tracker { @@ -37,7 +39,7 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { mapLock: sync.Mutex{}, in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), - catchUpResponseMessages: sync.Map{}, + catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } } @@ -74,7 +76,9 @@ func (t *tracker) addCommit(cm *CommitMessage) { } func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { - t.catchUpResponseMessages.Store(cr.Round, cr) + t.catchUpResponseMessageMutex.Lock() + defer t.catchUpResponseMessageMutex.Unlock() + t.catchUpResponseMessages[cr.Round] = cr } func (t *tracker) handleBlocks() { diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 6cdfba1f7e..5141ad833d 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -22,7 +22,7 @@ type networkVoteMessage struct { msg *VoteMessage } -// receiveVoteMessages receives messages from the in channel until the specified condition is met +// receiveVoteMessages receives messages from the in channel until a grandpa round finishes. func (s *Service) receiveVoteMessages(ctx context.Context) { for { select { @@ -153,7 +153,6 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro return nil, err } - // vote is considered invalid if set ID do not match. if m.SetID != s.state.setID { return nil, ErrSetIDMismatch } From 2ae41d4a5946bd45130401423762454d24dd6172 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 10 Jan 2022 21:55:24 +0530 Subject: [PATCH 15/15] Update dot/network/notifications.go Co-authored-by: Quentin McGaw --- dot/network/notifications.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 660a1d09e4..281006265f 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -288,7 +288,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - if (s.host.messageCache != nil) && (s.host.messageCache.exists(peer, msg)) { + if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) { // message has already been sent return }