Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix(tests): stabilize session tests #63

Merged
merged 1 commit into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ type fakeWantManager struct {
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.wantReqs <- wantReq{cids, peers}
select {
case fwm.wantReqs <- wantReq{cids, peers}:
case <-ctx.Done():
}
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.cancelReqs <- wantReq{cids, peers}
select {
case fwm.cancelReqs <- wantReq{cids, peers}:
case <-ctx.Done():
}
}

type fakePeerManager struct {
Expand All @@ -39,8 +45,11 @@ type fakePeerManager struct {
findMorePeersRequested chan struct{}
}

func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
fpm.findMorePeersRequested <- struct{}{}
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
select {
case fpm.findMorePeersRequested <- struct{}{}:
case <-ctx.Done():
}
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
Expand Down Expand Up @@ -105,10 +114,20 @@ func TestSessionGetBlocks(t *testing.T) {
var receivedBlocks []blocks.Block
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
select {
case cancelBlock := <-cancelReqs:
newCancelReqs = append(newCancelReqs, cancelBlock)
case <-ctx.Done():
t.Fatal("did not cancel block want")
}

select {
case receivedBlock := <-getBlocksCh:
receivedBlocks = append(receivedBlocks, receivedBlock)
case <-ctx.Done():
t.Fatal("Did not receive block!")
}

select {
case wantBlock := <-wantReqs:
newBlockReqs = append(newBlockReqs, wantBlock)
Expand Down Expand Up @@ -169,7 +188,7 @@ func TestSessionGetBlocks(t *testing.T) {

func TestSessionFindMorePeers(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
Expand All @@ -191,26 +210,51 @@ func TestSessionFindMorePeers(t *testing.T) {
}

// clear the initial block of wants
<-wantReqs
select {
case <-wantReqs:
case <-ctx.Done():
t.Fatal("Did not make first want request ")
}

// receive a block to trigger a tick reset
time.Sleep(200 * time.Microsecond)
time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
// or there will be no tick set -- time precision on Windows in go is in the
// millisecond range
p := testutil.GeneratePeers(1)[0]
session.ReceiveBlockFrom(p, blks[0])
<-getBlocksCh
<-wantReqs
<-cancelReqs

// wait for a request to get more peers to occur
<-fpm.findMorePeersRequested
select {
case <-cancelReqs:
case <-ctx.Done():
t.Fatal("Did not cancel block")
}
select {
case <-getBlocksCh:
case <-ctx.Done():
t.Fatal("Did not get block")
}
select {
case <-wantReqs:
case <-ctx.Done():
t.Fatal("Did not make second want request ")
}

// verify a broadcast was made
receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")

// wait for a request to get more peers to occur
select {
case <-fpm.findMorePeersRequested:
case <-ctx.Done():
t.Fatal("Did not find more peers")
}
<-ctx.Done()
}
82 changes: 70 additions & 12 deletions sessionpeermanager/sessionpeermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sessionpeermanager

import (
"context"
"errors"
"math/rand"
"sync"
"testing"
Expand All @@ -18,27 +19,40 @@ import (
type fakePeerNetwork struct {
peers []peer.ID
connManager ifconnmgr.ConnManager
completed chan struct{}
connect chan struct{}
}

func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager {
return fpn.connManager
}

func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error {
return nil
func (fpn *fakePeerNetwork) ConnectTo(ctx context.Context, p peer.ID) error {
select {
case fpn.connect <- struct{}{}:
return nil
case <-ctx.Done():
return errors.New("Timeout Occurred")
}
}

func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
defer close(peerCh)
for _, p := range fpn.peers {
select {
case peerCh <- p:
case <-ctx.Done():
close(peerCh)
return
}
}
close(peerCh)

select {
case fpn.completed <- struct{}{}:
case <-ctx.Done():
}
}()
return peerCh
}
Expand All @@ -55,15 +69,13 @@ func (fcm *fakeConnManager) TagPeer(p peer.ID, tag string, n int) {

func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) {
defer fcm.wait.Done()

for i := 0; i < len(fcm.taggedPeers); i++ {
if fcm.taggedPeers[i] == p {
fcm.taggedPeers[i] = fcm.taggedPeers[len(fcm.taggedPeers)-1]
fcm.taggedPeers = fcm.taggedPeers[:len(fcm.taggedPeers)-1]
return
}
}

}

func (*fakeConnManager) GetTagInfo(p peer.ID) *ifconnmgr.TagInfo { return nil }
Expand All @@ -74,9 +86,12 @@ func TestFindingMorePeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
completed := make(chan struct{})
connect := make(chan struct{})

peers := testutil.GeneratePeers(5)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()

Expand All @@ -85,7 +100,20 @@ func TestFindingMorePeers(t *testing.T) {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer findCancel()
sessionPeerManager.FindMorePeers(ctx, c)
<-findCtx.Done()
select {
case <-completed:
case <-findCtx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-findCtx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)

sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != len(peers) {
t.Fatal("incorrect number of peers found")
Expand All @@ -106,7 +134,7 @@ func TestRecordingReceivedBlocks(t *testing.T) {
defer cancel()
p := testutil.GeneratePeers(1)[0]
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{nil, fcm}
fpn := &fakePeerNetwork{nil, fcm, nil, nil}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()

Expand All @@ -127,17 +155,32 @@ func TestRecordingReceivedBlocks(t *testing.T) {

func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(100)
completed := make(chan struct{})
connect := make(chan struct{})
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)

// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-ctx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)

// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
Expand Down Expand Up @@ -193,15 +236,30 @@ func TestUntaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(5)
completed := make(chan struct{})
connect := make(chan struct{})
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()

sessionPeerManager := New(ctx, id, fpn)

sessionPeerManager.FindMorePeers(ctx, c)
time.Sleep(5 * time.Millisecond)
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-ctx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)

if len(fcm.taggedPeers) != len(peers) {
t.Fatal("Peers were not tagged!")
}
Expand Down