Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Fix T.Fatal inside a goroutine in tests #18409

Merged
merged 5 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions swarm/network/simulation/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func TestPeerEventsTimeout(t *testing.T) {
events := sim.PeerEvents(ctx, sim.NodeIDs())

done := make(chan struct{})
errC := make(chan error)
go func() {
for e := range events {
if e.Error == context.Canceled {
Expand All @@ -90,14 +91,16 @@ func TestPeerEventsTimeout(t *testing.T) {
close(done)
return
} else {
t.Fatal(e.Error)
errC <- e.Error
}
}
}()

select {
case <-time.After(time.Second):
t.Error("no context deadline received")
t.Fatal("no context deadline received")
case err := <-errC:
t.Fatal(err)
case <-done:
// all good, context deadline detected
}
Expand Down
21 changes: 11 additions & 10 deletions swarm/network/simulation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func TestSimulationWithHTTPServer(t *testing.T) {
//this time the timeout should be long enough so that it doesn't kick in too early
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
go sendRunSignal(t)
errC := make(chan error, 1)
go triggerSimulationRun(t, errC)
result = sim.Run(ctx, func(ctx context.Context, sim *Simulation) error {
log.Debug("This run waits for the run signal from `frontend`...")
//ensure with a Sleep that simulation doesn't terminate before the signal is received
Expand All @@ -83,27 +84,27 @@ func TestSimulationWithHTTPServer(t *testing.T) {
if result.Error != nil {
t.Fatal(result.Error)
}
if err := <-errC; err != nil {
t.Fatal(err)
}
log.Debug("Test terminated successfully")
}

func sendRunSignal(t *testing.T) {
func triggerSimulationRun(t *testing.T, errC chan error) {
//We need to first wait for the sim HTTP server to start running...
time.Sleep(2 * time.Second)
//then we can send the signal

log.Debug("Sending run signal to simulation: POST /runsim...")
resp, err := http.Post(fmt.Sprintf("http://localhost%s/runsim", DefaultHTTPSimAddr), "application/json", nil)
if err != nil {
t.Fatalf("Request failed: %v", err)
errC <- fmt.Errorf("Request failed: %v", err)
return
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.Error("Error closing response body", "err", err)
}
}()
log.Debug("Signal sent")
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
errC <- fmt.Errorf("err %s", resp.Status)
return
}
errC <- resp.Body.Close()
}
34 changes: 27 additions & 7 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package stream
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -500,7 +502,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)

log.Info("Starting simulation")
ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]
Expand Down Expand Up @@ -553,14 +555,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
if err != nil {
t.Fatalf("requesting chunks action error: %v", err)
}
retErrC <- err
}()

log.Debug("Watching for disconnections")
Expand All @@ -570,11 +571,19 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error)
disconnected.Store(true)
}
}
}()
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
}
}()
Expand All @@ -595,6 +604,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
t.Fatalf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
})
Expand Down Expand Up @@ -675,7 +687,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
}

ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]

Expand All @@ -702,11 +714,19 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
b.Fatal(d.Error)
disconnected.Store(true)
}
}
}()
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
}
}()
Expand Down
14 changes: 12 additions & 2 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package stream
import (
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -117,7 +119,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}

result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
checker := nodeIDs[1]
Expand Down Expand Up @@ -162,11 +164,19 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return err
}

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error)
disconnected.Store(true)
}
}
}()
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
}
}()
Expand Down
23 changes: 17 additions & 6 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -213,11 +214,13 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal("unexpected disconnect")
cancelSimRun()
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
disconnected.Store(true)
}
}
}()

Expand All @@ -226,6 +229,9 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
if result.Error != nil {
t.Fatal(result.Error)
}
if yes, ok := disconnected.Load().(bool); ok && yes {
t.Fatal("disconnect events received")
}
log.Info("Simulation ended")
}

Expand Down Expand Up @@ -395,11 +401,13 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal("unexpected disconnect")
cancelSimRun()
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
disconnected.Store(true)
}
}
}()

Expand Down Expand Up @@ -514,6 +522,9 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
return result.Error
}

if yes, ok := disconnected.Load().(bool); ok && yes {
t.Fatal("disconnect events received")
}
log.Info("Simulation ended")
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions swarm/network/stream/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package stream

import (
"context"
"errors"
"fmt"
"io/ioutil"
"math"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -129,7 +131,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
if err != nil {
t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()

nodeIndex := make(map[enode.ID]int)
Expand All @@ -143,11 +145,19 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error)
disconnected.Store(true)
}
}
}()
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
}
}()
Expand Down
Loading