Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task(lib/grandpa): ensure messages are stored and re-processed when needed #2107

Merged
merged 20 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,15 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

// TODO: ensure grandpa stores *all* previously received votes and discards them
// only when they are for already finalised rounds; currently this causes issues
// because a vote might be received slightly too early, causing a round mismatch err,
// causing grandpa to discard the vote. (#1855)
_, isConsensusMsg := msg.(*ConsensusMessage)
if !added && !isConsensusMsg {
// if we could not add message in cache, message was already present in the
// cache and was sent.
if !added {
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return
}
}

// TODO: Add message to cache after we successfully wrote it to stream?
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved

// we've completed the handshake with the peer, send message directly
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
if err := s.host.writeToStream(stream, msg); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (s *Service) playGrandpaRound() error {
}

logger.Debug("receiving pre-vote messages...")
go s.receiveMessages(ctx)
go s.receiveVoteMessages(ctx)
time.Sleep(s.interval)

if s.paused.Load().(bool) {
Expand All @@ -507,6 +507,8 @@ func (s *Service) playGrandpaRound() error {

logger.Debugf("sending pre-vote message %s...", pv)
roundComplete := make(chan struct{})
// <-roundComplete will receive the default value of channel's type when it gets
// closed, so we don't need to explicitly send a value.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
defer close(roundComplete)

// continue to send prevote messages until round is done
Expand Down Expand Up @@ -550,6 +552,7 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet
ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

// TODO: Send the message just once?
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
for {
if s.paused.Load().(bool) {
return
Expand Down
13 changes: 12 additions & 1 deletion lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
case *CommitMessage:
return nil, h.handleCommitMessage(msg)
case *NeighbourMessage:
// It seems like we can afford to not retry handling neighbour message
// if it errors.
return nil, h.handleNeighbourMessage(msg)
case *CatchUpRequest:
// CatchUpRequest seems like something that can be dropped, if we fail
// to process it
return h.handleCatchUpRequest(msg)
case *CatchUpResponse:
return nil, h.handleCatchUpResponse(msg)
err := h.handleCatchUpResponse(msg)
// TODO: Retry for which errors
if err != nil {
// TODO: If I can directly access tracker, why are we using `in` channel for
// networkVoteMessage
h.grandpa.tracker.addCatchUpResponse(msg)
}
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
default:
return nil, ErrInvalidMessageType
}
Expand Down
19 changes: 14 additions & 5 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
)

// tracker keeps track of messages that have been received that have failed to validate with ErrBlockDoesNotExist
// these messages may be needed again in the case that we are slightly out of sync with the rest of the network
// tracker keeps track of messages that have been received, but have failed to
// validate with ErrBlockDoesNotExist. These messages may be needed again in the
// case that we are slightly out of sync with the rest of the network.
type tracker struct {
blockState BlockState
handler *MessageHandler
Expand All @@ -23,18 +24,18 @@ type tracker struct {
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}
// round is used as key
catchUpResponseMessages map[uint64]*CatchUpResponse
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

func newTracker(bs BlockState, handler *MessageHandler) *tracker {
in := bs.GetImportedBlockNotifierChannel()

return &tracker{
blockState: bs,
handler: handler,
voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage),
commitMessages: make(map[common.Hash]*CommitMessage),
mapLock: sync.Mutex{},
in: in,
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),
}
}
Expand Down Expand Up @@ -71,6 +72,13 @@ func (t *tracker) addCommit(cm *CommitMessage) {
t.commitMessages[cm.Vote.Hash] = cm
}

func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
t.mapLock.Lock()
defer t.mapLock.Unlock()
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved

t.catchUpResponseMessages[cr.Round] = cr
}

func (t *tracker) handleBlocks() {
for {
select {
Expand All @@ -93,6 +101,7 @@ func (t *tracker) handleBlock(b *types.Block) {
h := b.Header.Hash()
if vms, has := t.voteMessages[h]; has {
for _, v := range vms {
// handleMessage would never error for vote message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe update the warn log? :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you want here? Add this comment in log or remove the log?

_, err := t.handler.handleMessage(v.from, v.msg)
if err != nil {
logger.Warnf("failed to handle vote message %v: %s", v, err)
Expand Down
6 changes: 3 additions & 3 deletions lib/grandpa/message_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMessageTracker_ValidateMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestMessageTracker_SendMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrBlockDoesNotExist, err)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down
7 changes: 1 addition & 6 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ func (hs *GrandpaHandshake) Encode() ([]byte, error) {

// Decode the message into a GrandpaHandshake
func (hs *GrandpaHandshake) Decode(in []byte) error {
err := scale.Unmarshal(in, hs)
if err != nil {
return err
}

return nil
return scale.Unmarshal(in, hs)
}

// Type ...
Expand Down
15 changes: 9 additions & 6 deletions lib/grandpa/vote_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type networkVoteMessage struct {
msg *VoteMessage
}

// receiveMessages receives messages from the in channel until the specified condition is met
func (s *Service) receiveMessages(ctx context.Context) {
// receiveVoteMessages receives messages from the in channel until the specified condition is met
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
func (s *Service) receiveVoteMessages(ctx context.Context) {
for {
select {
case msg, ok := <-s.in:
Expand Down Expand Up @@ -65,7 +65,7 @@ func (s *Service) receiveMessages(ctx context.Context) {
logger.Warnf("unsupported stage %s", vm.Message.Stage.String())
}

v, err := s.validateMessage(msg.from, vm)
v, err := s.validateVoteMessage(msg.from, vm)
if err != nil {
logger.Debugf("failed to validate vote message %v: %s", vm, err)
continue
Expand Down Expand Up @@ -122,9 +122,9 @@ func (s *Service) createSignedVoteAndVoteMessage(vote *Vote, stage Subround) (*S
return pc, vm, nil
}

// validateMessage validates a VoteMessage and adds it to the current votes
// validateVoteMessage validates a VoteMessage and adds it to the current votes
// it returns the resulting vote if validated, error otherwise
func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
// make sure round does not increment while VoteMessage is being validated
s.roundLock.Lock()
defer s.roundLock.Unlock()
Expand Down Expand Up @@ -153,7 +153,9 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
return nil, err
}

// check that setIDs match
// TODO: Change in set ID means possible change in voters (authorities). That
// would make me think that I could avoid the message in this case. Is that so?
// It seems the vote is considered invalid if set ID do not match.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
if m.SetID != s.state.setID {
return nil, ErrSetIDMismatch
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {

equivocated := s.checkForEquivocation(voter, just, m.Message.Stage)
if equivocated {
// A vote is considered invalid if it is equivocatory
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrEquivocation
}

Expand Down
12 changes: 6 additions & 6 deletions lib/grandpa/vote_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestValidateMessage_Valid(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

vote, err := gs.validateMessage("", msg)
vote, err := gs.validateVoteMessage("", msg)
require.NoError(t, err)
require.Equal(t, h.Hash(), vote.Hash)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestValidateMessage_InvalidSignature(t *testing.T) {

msg.Message.Signature[63] = 0

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrInvalidSignature)
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestValidateMessage_SetIDMismatch(t *testing.T) {

gs.state.setID = 1

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrSetIDMismatch)
}

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestValidateMessage_Equivocation(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrEquivocation, err, gs.prevotes)
}

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestValidateMessage_BlockDoesNotExist(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
}

Expand Down Expand Up @@ -374,6 +374,6 @@ func TestValidateMessage_IsNotDescendant(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, errInvalidVoteBlock, err, gs.prevotes)
}