From fcf090eed18de2b0eb27e112808e07242d3c633e Mon Sep 17 00:00:00 2001 From: Woosang Son Date: Mon, 2 Nov 2020 16:38:12 +0900 Subject: [PATCH] test: add async reactor test --- p2p/switch_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index abb41ce05..0e4a09e57 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -786,3 +786,127 @@ func (book *addrBookMock) RemoveAddress(addr *NetAddress) { delete(book.addrs, addr.String()) } func (book *addrBookMock) Save() {} + +type NormalReactor struct { + BaseReactor + channels []*conn.ChannelDescriptor + msgChan chan []byte +} + +func NewNormalReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *NormalReactor { + nr := &NormalReactor{ + channels: channels, + } + nr.BaseReactor = *NewBaseReactor("NormalReactor", nr, async, recvBufSize) + nr.msgChan = make(chan []byte) + nr.SetLogger(log.TestingLogger()) + return nr +} + +func (nr *NormalReactor) GetChannels() []*conn.ChannelDescriptor { + return nr.channels +} + +func (nr *NormalReactor) AddPeer(peer Peer) {} + +func (nr *NormalReactor) RemovePeer(peer Peer, reason interface{}) {} + +func (nr *NormalReactor) Receive(chID byte, peer Peer, msgBytes []byte) { + nr.msgChan <- msgBytes +} + +type BlockedReactor struct { + BaseReactor + channels []*conn.ChannelDescriptor + waitChan chan int +} + +func NewBlockedReactor(channels []*conn.ChannelDescriptor, async bool, recvBufSize int) *BlockedReactor { + br := &BlockedReactor{ + channels: channels, + } + br.BaseReactor = *NewBaseReactor("BlockedReactor", br, async, recvBufSize) + br.waitChan = make(chan int, 1) + br.SetLogger(log.TestingLogger()) + return br +} + +func (br *BlockedReactor) GetChannels() []*conn.ChannelDescriptor { + return br.channels +} + +func (br *BlockedReactor) AddPeer(peer Peer) {} + +func (br *BlockedReactor) RemovePeer(peer Peer, reason interface{}) {} + +func (br *BlockedReactor) Receive(chID byte, peer Peer, msgBytes []byte) { + <-br.waitChan +} + +const ( + reactorNameNormal = "normal" + reactorNameBlocked = "blocked" +) + +func TestSyncReactor(t *testing.T) { + cfg.RecvAsync = false + s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(0)) + defer s1.Stop() + defer s2.Stop() + + normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor) + blockedReactor := s2.Reactor(reactorNameBlocked).(*BlockedReactor) + s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first + time.Sleep(time.Millisecond * 200) // to make order among messages + s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor + + select { + case <-normalReactor.msgChan: + assert.Fail(t, "blocked reactor is not blocked") + case <- time.After(time.Second * 1): + assert.True(t, true, "blocked reactor is blocked: OK") + } + + blockedReactor.waitChan <- 1 // release blocked reactor + select { + case msg := <-normalReactor.msgChan: + assert.True(t, bytes.Equal(msg, []byte{0})) + } +} + +func TestAsyncReactor(t *testing.T) { + cfg.RecvAsync = true + s1, s2 := MakeSwitchPair(t, getInitSwitchFunc(1)) + defer s1.Stop() + defer s2.Stop() + + normalReactor := s2.Reactor(reactorNameNormal).(*NormalReactor) + s1.Broadcast(0x01, []byte{1}) // the message for blocked reactor is first + time.Sleep(time.Millisecond * 200) // to make order among messages + s1.Broadcast(0x00, []byte{0}) // and then second message is for normal reactor + + select { + case msg := <-normalReactor.msgChan: + assert.True(t, bytes.Equal(msg, []byte{0})) + case <- time.After(time.Second * 1): + assert.Fail(t, "blocked reactor is blocked") + } +} + +func getInitSwitchFunc(bufSize int) func(int, *Switch, *config.P2PConfig) *Switch { + return func(i int, sw *Switch, config *config.P2PConfig) *Switch { + sw.SetAddrBook(&addrBookMock{ + addrs: make(map[string]struct{}), + ourAddrs: make(map[string]struct{})}) + + // Make two reactors of two channels each + sw.AddReactor(reactorNameNormal, NewNormalReactor([]*conn.ChannelDescriptor{ + {ID: byte(0x00), Priority: 10}, + }, config.RecvAsync, bufSize)) + sw.AddReactor(reactorNameBlocked, NewBlockedReactor([]*conn.ChannelDescriptor{ + {ID: byte(0x01), Priority: 10}, + }, config.RecvAsync, bufSize)) + + return sw + } +} \ No newline at end of file