Skip to content

Commit

Permalink
test: add async reactor test
Browse files Browse the repository at this point in the history
  • Loading branch information
Woosang Son committed Nov 2, 2020
1 parent eb84988 commit fcf090e
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions p2p/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,3 +786,127 @@ func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
delete(book.addrs, addr.String())
}
func (book *addrBookMock) Save() {}

type NormalReactor struct {
BaseReactor
channels []*conn.ChannelDescriptor
msgChan chan []byte
}

func NewNormalReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *NormalReactor {
nr := &NormalReactor{
channels: channels,
}
nr.BaseReactor = *NewBaseReactor("NormalReactor", nr, async, recvBufSize)
nr.msgChan = make(chan []byte)
nr.SetLogger(log.TestingLogger())
return nr
}

func (nr *NormalReactor) GetChannels() []*conn.ChannelDescriptor {
return nr.channels
}

func (nr *NormalReactor) AddPeer(peer Peer) {}

func (nr *NormalReactor) RemovePeer(peer Peer, reason interface{}) {}

func (nr *NormalReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
nr.msgChan <- msgBytes
}

type BlockedReactor struct {
BaseReactor
channels []*conn.ChannelDescriptor
waitChan chan int
}

func NewBlockedReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *BlockedReactor {
br := &BlockedReactor{
channels: channels,
}
br.BaseReactor = *NewBaseReactor("BlockedReactor", br, async, recvBufSize)
br.waitChan = make(chan int, 1)
br.SetLogger(log.TestingLogger())
return br
}

func (br *BlockedReactor) GetChannels() []*conn.ChannelDescriptor {
return br.channels
}

func (br *BlockedReactor) AddPeer(peer Peer) {}

func (br *BlockedReactor) RemovePeer(peer Peer, reason interface{}) {}

func (br *BlockedReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
<-br.waitChan
}

const (
reactorNameNormal = "normal"
reactorNameBlocked = "blocked"
)

func TestSyncReactor(t *testing.T) {
cfg.RecvAsync = false
s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(0))
defer s1.Stop()
defer s2.Stop()

normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor)
blockedReactor := s2.Reactor(reactorNameBlocked).(*BlockedReactor)
s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first
time.Sleep(time.Millisecond * 200) // to make order among messages
s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor

select {
case <-normalReactor.msgChan:
assert.Fail(t, "blocked reactor is not blocked")
case <- time.After(time.Second * 1):
assert.True(t, true, "blocked reactor is blocked: OK")
}

blockedReactor.waitChan <- 1 // release blocked reactor
select {
case msg := <-normalReactor.msgChan:
assert.True(t, bytes.Equal(msg, []byte{0}))
}
}

func TestAsyncReactor(t *testing.T) {
cfg.RecvAsync = true
s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(1))
defer s1.Stop()
defer s2.Stop()

normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor)
s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first
time.Sleep(time.Millisecond * 200) // to make order among messages
s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor

select {
case msg := <-normalReactor.msgChan:
assert.True(t, bytes.Equal(msg, []byte{0}))
case <- time.After(time.Second * 1):
assert.Fail(t, "blocked reactor is blocked")
}
}

func getInitSwitchFunc(bufSize int) func(int, *Switch, *config.P2PConfig) *Switch {
return func(i int, sw *Switch, config *config.P2PConfig) *Switch {
sw.SetAddrBook(&addrBookMock{
addrs: make(map[string]struct{}),
ourAddrs: make(map[string]struct{})})

// Make two reactors of two channels each
sw.AddReactor(reactorNameNormal, NewNormalReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
}, config.RecvAsync, bufSize))
sw.AddReactor(reactorNameBlocked, NewBlockedReactor([]*conn.ChannelDescriptor{
{ID: byte(0x01), Priority: 10},
}, config.RecvAsync, bufSize))

return sw
}
}

0 comments on commit fcf090e

Please sign in to comment.