Skip to content

Commit

Permalink
Buffer future messages (NEW) (ethereum#672)
Browse files Browse the repository at this point in the history
Backlog Changes:

 - Decouple Backlog from Core. Make it it's own type
 - Make MsgBacklog a interface + a default implementation
 - Backlog: extract common logic into `extractMessageView()` fn
 - Backlog: separate between `cleanBacklogForSeq` and
 `processBacklogForSeq`
 - `processBacklogForSeq` will stop iteration when process() returns
 false (optimization)
 - Rename `processBacklog()` to `updateState()`. Processing the backlog
 is a sideeffect of changing the currentView + State
 - Clean up backlog  changes. Add inner tests.

 Other changes:

  - cleanup checkMessage with a "first check the view, then the msgCode"
  strategy
  - checkMessage no longer err with errTooFarIntoTheFuture, instead
  backloag will check that, and don't store those messages
  - errTooFarIntoTheFuture is no longer an error
  - checkMessage doesn't have a case for Old commit messages. Instead
  handleCommit will check if checkMEssage failed with oldMessage and do
  the check there.
  - FIX: error when processing RoundChangeMessages
  • Loading branch information
Mariano Cortesi committed Dec 5, 2019
1 parent 530458a commit 17c027b
Show file tree
Hide file tree
Showing 8 changed files with 575 additions and 402 deletions.
397 changes: 264 additions & 133 deletions consensus/istanbul/core/backlog.go

Large diffs are not rendered by default.

514 changes: 271 additions & 243 deletions consensus/istanbul/core/backlog_test.go

Large diffs are not rendered by default.

26 changes: 16 additions & 10 deletions consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,24 @@ func (c *core) handleCommit(msg *istanbul.Message) error {
return errFailedDecodeCommit
}

if err := c.checkMessage(istanbul.MsgCommit, commit.Subject.View); err != nil {
return err
}
err = c.checkMessage(istanbul.MsgCommit, commit.Subject.View)

// Valid commit messages may be for the current, or previous sequence. We compare against our
// current view to find out which.

if commit.Subject.View.Cmp(c.current.View()) == 0 {
return c.handleCheckedCommitForCurrentSequence(msg, commit)
} else {
if err == errOldMessage {
// Discard messages from previous views, unless they are commits from the previous sequence,
// with the same round as what we wound up finalizing, as we would be able to include those
// to create the ParentAggregatedSeal for our next proposal.
lastSubject, err := c.backend.LastSubject()
if err != nil {
return err
} else if commit.Subject.View.Cmp(lastSubject.View) != 0 {
return errOldMessage
}
return c.handleCheckedCommitForPreviousSequence(msg, commit)
} else if err != nil {
return err
}

return c.handleCheckedCommitForCurrentSequence(msg, commit)
}

func (c *core) handleCheckedCommitForPreviousSequence(msg *istanbul.Message, commit *istanbul.CommittedSubject) error {
Expand Down Expand Up @@ -167,7 +173,7 @@ func (c *core) handleCheckedCommitForCurrentSequence(msg *istanbul.Message, comm
return err
}
// Process Backlog Messages
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

logger.Trace("Got quorum prepares or commits", "tag", "stateTransition", "commits", c.current.Commits, "prepares", c.current.Prepares)
c.sendCommit()
Expand Down
18 changes: 11 additions & 7 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
selectProposer: validator.GetProposerSelector(config.ProposerPolicy),
handlerWg: new(sync.WaitGroup),
backend: backend,
backlogs: make(map[istanbul.Validator]*prque.Prque),
backlogsMu: new(sync.Mutex),
pendingRequests: prque.New(nil),
pendingRequestsMu: new(sync.Mutex),
consensusTimestamp: time.Time{},
Expand All @@ -61,6 +59,13 @@ func New(backend istanbul.Backend, config *istanbul.Config) Engine {
sequenceMeter: metrics.NewRegisteredMeter("consensus/istanbul/core/sequence", nil),
consensusTimer: metrics.NewRegisteredTimer("consensus/istanbul/core/consensus", nil),
}
msgBacklog := newMsgBacklog(
func(msg *istanbul.Message) {
c.sendEvent(backlogEvent{
msg: msg,
})
}, c.checkMessage)
c.backlog = msgBacklog
c.validateFn = c.checkValidatorSignature
return c
}
Expand All @@ -81,8 +86,7 @@ type core struct {

validateFn func([]byte, []byte) (common.Address, error)

backlogs map[istanbul.Validator]*prque.Prque
backlogsMu *sync.Mutex
backlog MsgBacklog

rsdb RoundStateDB
current RoundState
Expand Down Expand Up @@ -165,7 +169,7 @@ func (c *core) commit() error {
}

// Process Backlog Messages
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

proposal := c.current.Proposal()
if proposal != nil {
Expand Down Expand Up @@ -355,7 +359,7 @@ func (c *core) startNewRound(round *big.Int) error {

// Process backlog
c.processPendingRequests()
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

if roundChange && c.isProposer() && request != nil {
c.sendPreprepare(request, roundChangeCertificate)
Expand Down Expand Up @@ -393,7 +397,7 @@ func (c *core) waitForDesiredRound(r *big.Int) error {
c.newRoundChangeTimerForView(desiredView)

// Process Backlog Messages
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

// Send round change
c.sendRoundChange(r)
Expand Down
1 change: 0 additions & 1 deletion consensus/istanbul/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

type backlogEvent struct {
src istanbul.Validator
msg *istanbul.Message
}

Expand Down
17 changes: 11 additions & 6 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *core) Start() error {

// Process backlog
c.processPendingRequests()
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
Expand Down Expand Up @@ -126,9 +126,12 @@ func (c *core) handleEvents() {
c.logger.Debug("Error in handling istanbul message", "err", err)
}
case backlogEvent:
// No need to check signature for internal messages
if err := c.handleCheckedMsg(ev.msg, ev.src); err != nil {
c.logger.Warn("Error in handling istanbul message that was sent from a backlog event", "err", err)
if payload, err := ev.msg.Payload(); err != nil {
c.logger.Error("Error in retrieving payload from istanbul message that was sent from a backlog event", "err", err)
} else {
if err := c.handleMsg(payload); err != nil {
c.logger.Warn("Error in handling istanbul message that was sent from a backlog event", "err", err)
}
}
}
case event, ok := <-c.timeoutSub.Chan():
Expand Down Expand Up @@ -186,9 +189,11 @@ func (c *core) handleCheckedMsg(msg *istanbul.Message, src istanbul.Validator) e
// Store the message if it's a future message
catchFutureMessages := func(err error) error {
if err == errFutureMessage {
c.storeBacklog(msg, src)
// Store in backlog (if it's not from self)
if msg.Address != c.address {
c.backlog.store(msg)
}
}

return err
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (c *core) handlePrepare(msg *istanbul.Message) error {
logger.Trace("Got quorum prepares or commits", "tag", "stateTransition", "commits", c.current.Commits, "prepares", c.current.Prepares)

// Process Backlog Messages
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())

c.sendCommit()

Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *core) handlePreprepare(msg *istanbul.Message) error {
}

// Process Backlog Messages
c.processBacklog()
c.backlog.updateState(c.current.View(), c.current.State())
c.sendPrepare()
}

Expand Down

0 comments on commit 17c027b

Please sign in to comment.