From 03e10a06768f3bcdc89aeb9ea45bfb0d354b08ee Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 24 Jan 2019 15:40:43 -0800 Subject: [PATCH] fix(tests): stabilize session tests Improve stability of tests for Session and SessionPeerManager fix #61 --- session/session_test.go | 90 ++++++++++++++----- sessionpeermanager/sessionpeermanager_test.go | 82 ++++++++++++++--- 2 files changed, 137 insertions(+), 35 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index d578f7a7..9f6aef54 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -26,11 +26,17 @@ type fakeWantManager struct { } func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) { - fwm.wantReqs <- wantReq{cids, peers} + select { + case fwm.wantReqs <- wantReq{cids, peers}: + case <-ctx.Done(): + } } func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) { - fwm.cancelReqs <- wantReq{cids, peers} + select { + case fwm.cancelReqs <- wantReq{cids, peers}: + case <-ctx.Done(): + } } type fakePeerManager struct { @@ -39,8 +45,11 @@ type fakePeerManager struct { findMorePeersRequested chan struct{} } -func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) { - fpm.findMorePeersRequested <- struct{}{} +func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) { + select { + case fpm.findMorePeersRequested <- struct{}{}: + case <-ctx.Done(): + } } func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID { @@ -105,10 +114,20 @@ func TestSessionGetBlocks(t *testing.T) { var receivedBlocks []blocks.Block for i, p := range peers { session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]) - receivedBlock := <-getBlocksCh - receivedBlocks = append(receivedBlocks, receivedBlock) - cancelBlock := <-cancelReqs - newCancelReqs = append(newCancelReqs, cancelBlock) + select { + case cancelBlock := <-cancelReqs: + newCancelReqs = append(newCancelReqs, cancelBlock) + case <-ctx.Done(): + t.Fatal("did not cancel block want") + } + + select { + case receivedBlock := <-getBlocksCh: + receivedBlocks = append(receivedBlocks, receivedBlock) + case <-ctx.Done(): + t.Fatal("Did not receive block!") + } + select { case wantBlock := <-wantReqs: newBlockReqs = append(newBlockReqs, wantBlock) @@ -169,7 +188,7 @@ func TestSessionGetBlocks(t *testing.T) { func TestSessionFindMorePeers(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond) defer cancel() wantReqs := make(chan wantReq, 1) cancelReqs := make(chan wantReq, 1) @@ -191,26 +210,51 @@ func TestSessionFindMorePeers(t *testing.T) { } // clear the initial block of wants - <-wantReqs + select { + case <-wantReqs: + case <-ctx.Done(): + t.Fatal("Did not make first want request ") + } // receive a block to trigger a tick reset - time.Sleep(200 * time.Microsecond) + time.Sleep(20 * time.Millisecond) // need to make sure some latency registers + // or there will be no tick set -- time precision on Windows in go is in the + // millisecond range p := testutil.GeneratePeers(1)[0] session.ReceiveBlockFrom(p, blks[0]) - <-getBlocksCh - <-wantReqs - <-cancelReqs - - // wait for a request to get more peers to occur - <-fpm.findMorePeersRequested + select { + case <-cancelReqs: + case <-ctx.Done(): + t.Fatal("Did not cancel block") + } + select { + case <-getBlocksCh: + case <-ctx.Done(): + t.Fatal("Did not get block") + } + select { + case <-wantReqs: + case <-ctx.Done(): + t.Fatal("Did not make second want request ") + } // verify a broadcast was made - receivedWantReq := <-wantReqs - if len(receivedWantReq.cids) < broadcastLiveWantsLimit { - t.Fatal("did not rebroadcast whole live list") + select { + case receivedWantReq := <-wantReqs: + if len(receivedWantReq.cids) < broadcastLiveWantsLimit { + t.Fatal("did not rebroadcast whole live list") + } + if receivedWantReq.peers != nil { + t.Fatal("did not make a broadcast") + } + case <-ctx.Done(): + t.Fatal("Never rebroadcast want list") } - if receivedWantReq.peers != nil { - t.Fatal("did not make a broadcast") + + // wait for a request to get more peers to occur + select { + case <-fpm.findMorePeersRequested: + case <-ctx.Done(): + t.Fatal("Did not find more peers") } - <-ctx.Done() } diff --git a/sessionpeermanager/sessionpeermanager_test.go b/sessionpeermanager/sessionpeermanager_test.go index b4e723b1..2ec38f0a 100644 --- a/sessionpeermanager/sessionpeermanager_test.go +++ b/sessionpeermanager/sessionpeermanager_test.go @@ -2,6 +2,7 @@ package sessionpeermanager import ( "context" + "errors" "math/rand" "sync" "testing" @@ -18,27 +19,40 @@ import ( type fakePeerNetwork struct { peers []peer.ID connManager ifconnmgr.ConnManager + completed chan struct{} + connect chan struct{} } func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager { return fpn.connManager } -func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error { - return nil +func (fpn *fakePeerNetwork) ConnectTo(ctx context.Context, p peer.ID) error { + select { + case fpn.connect <- struct{}{}: + return nil + case <-ctx.Done(): + return errors.New("Timeout Occurred") + } } func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID { peerCh := make(chan peer.ID) go func() { - defer close(peerCh) for _, p := range fpn.peers { select { case peerCh <- p: case <-ctx.Done(): + close(peerCh) return } } + close(peerCh) + + select { + case fpn.completed <- struct{}{}: + case <-ctx.Done(): + } }() return peerCh } @@ -55,7 +69,6 @@ func (fcm *fakeConnManager) TagPeer(p peer.ID, tag string, n int) { func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) { defer fcm.wait.Done() - for i := 0; i < len(fcm.taggedPeers); i++ { if fcm.taggedPeers[i] == p { fcm.taggedPeers[i] = fcm.taggedPeers[len(fcm.taggedPeers)-1] @@ -63,7 +76,6 @@ func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) { return } } - } func (*fakeConnManager) GetTagInfo(p peer.ID) *ifconnmgr.TagInfo { return nil } @@ -74,9 +86,12 @@ func TestFindingMorePeers(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() + completed := make(chan struct{}) + connect := make(chan struct{}) + peers := testutil.GeneratePeers(5) fcm := &fakeConnManager{} - fpn := &fakePeerNetwork{peers, fcm} + fpn := &fakePeerNetwork{peers, fcm, completed, connect} c := testutil.GenerateCids(1)[0] id := testutil.GenerateSessionID() @@ -85,7 +100,20 @@ func TestFindingMorePeers(t *testing.T) { findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond) defer findCancel() sessionPeerManager.FindMorePeers(ctx, c) - <-findCtx.Done() + select { + case <-completed: + case <-findCtx.Done(): + t.Fatal("Did not finish finding providers") + } + for range peers { + select { + case <-connect: + case <-findCtx.Done(): + t.Fatal("Did not connect to peer") + } + } + time.Sleep(2 * time.Millisecond) + sessionPeers := sessionPeerManager.GetOptimizedPeers() if len(sessionPeers) != len(peers) { t.Fatal("incorrect number of peers found") @@ -106,7 +134,7 @@ func TestRecordingReceivedBlocks(t *testing.T) { defer cancel() p := testutil.GeneratePeers(1)[0] fcm := &fakeConnManager{} - fpn := &fakePeerNetwork{nil, fcm} + fpn := &fakePeerNetwork{nil, fcm, nil, nil} c := testutil.GenerateCids(1)[0] id := testutil.GenerateSessionID() @@ -127,17 +155,32 @@ func TestRecordingReceivedBlocks(t *testing.T) { func TestOrderingPeers(t *testing.T) { ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond) defer cancel() peers := testutil.GeneratePeers(100) + completed := make(chan struct{}) + connect := make(chan struct{}) fcm := &fakeConnManager{} - fpn := &fakePeerNetwork{peers, fcm} + fpn := &fakePeerNetwork{peers, fcm, completed, connect} c := testutil.GenerateCids(1) id := testutil.GenerateSessionID() sessionPeerManager := New(ctx, id, fpn) // add all peers to session sessionPeerManager.FindMorePeers(ctx, c[0]) + select { + case <-completed: + case <-ctx.Done(): + t.Fatal("Did not finish finding providers") + } + for range peers { + select { + case <-connect: + case <-ctx.Done(): + t.Fatal("Did not connect to peer") + } + } + time.Sleep(2 * time.Millisecond) // record broadcast sessionPeerManager.RecordPeerRequests(nil, c) @@ -193,15 +236,30 @@ func TestUntaggingPeers(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() peers := testutil.GeneratePeers(5) + completed := make(chan struct{}) + connect := make(chan struct{}) fcm := &fakeConnManager{} - fpn := &fakePeerNetwork{peers, fcm} + fpn := &fakePeerNetwork{peers, fcm, completed, connect} c := testutil.GenerateCids(1)[0] id := testutil.GenerateSessionID() sessionPeerManager := New(ctx, id, fpn) sessionPeerManager.FindMorePeers(ctx, c) - time.Sleep(5 * time.Millisecond) + select { + case <-completed: + case <-ctx.Done(): + t.Fatal("Did not finish finding providers") + } + for range peers { + select { + case <-connect: + case <-ctx.Done(): + t.Fatal("Did not connect to peer") + } + } + time.Sleep(2 * time.Millisecond) + if len(fcm.taggedPeers) != len(peers) { t.Fatal("Peers were not tagged!") }