Skip to content

Commit

Permalink
fix: Reactor impls call BaseReactor's OnStart
Browse files Browse the repository at this point in the history
  • Loading branch information
Woosang Son committed Nov 2, 2020
1 parent 3fd9d02 commit ef770b1
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 9 deletions.
3 changes: 3 additions & 0 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

err := bcR.pool.Start()
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

go bcR.poolRoutine()
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func NewReactor(consensusState *State, fastSync bool, async bool, recvBufSize in
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "fastSync", conR.FastSync())

// call BaseReactor's OnStart()
conR.BaseReactor.OnStart()

// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()

Expand Down
3 changes: 3 additions & 0 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (memR *Reactor) SetLogger(l log.Logger) {

// OnStart implements p2p.BaseReactor.
func (memR *Reactor) OnStart() error {
// call BaseReactor's OnStart()
memR.BaseReactor.OnStart()

if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}
Expand Down
5 changes: 3 additions & 2 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ const (

func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig()
// Use sync mode because tx sequence must be guaranteed.
config.P2P.RecvAsync = false
// In this test, a reactor receives 1000 tx message from a peer.
// A reactor has 3 peer, so up to 3000 txs can be stacked
config.P2P.MempoolRecvBufSize = 3000
const N = 4
reactors := makeAndConnectReactors(config, N)
defer func() {
Expand Down
8 changes: 8 additions & 0 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

func (br *BaseReactor) OnStart() error {
if br.recvMsgBuf != nil {
// if it is async mode it starts RecvRoutine()
go br.RecvRoutine()
}
return nil
}

func (br *BaseReactor) RecvRoutine() {
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion p2p/mock/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ func NewReactor() *Reactor {
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
6 changes: 0 additions & 6 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,6 @@ func createMConnection(
config tmconn.MConnConfig,
) *tmconn.MConnection {

if config.RecvAsync {
for _, r := range reactorsByCh {
go r.RecvRoutine()
}
}

onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
Expand Down
3 changes: 3 additions & 0 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func NewReactor(b AddrBook, async bool, config *ReactorConfig) *Reactor {

// OnStart implements BaseService
func (r *Reactor) OnStart() error {
// call BaseReactor's OnStart()
r.BaseReactor.OnStart()

err := r.book.Start()
if err != nil && err != service.ErrAlreadyStarted {
return err
Expand Down

0 comments on commit ef770b1

Please sign in to comment.