Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

network/simulation: Add ExecAdapter capability to swarm simulations #1503

Merged
merged 4 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/swarm-snapshot/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func create(ctx *cli.Context) error {
func createSnapshot(filename string, nodes int, services []string) (err error) {
log.Debug("create snapshot", "filename", filename, "nodes", nodes, "services", services)

sim := simulation.New(map[string]simulation.ServiceFunc{
sim := simulation.NewInProc(map[string]simulation.ServiceFunc{
"bzz": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
Expand Down
2 changes: 1 addition & 1 deletion network/simulation/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestServiceBucket(t *testing.T) {
testKey := "Key"
testValue := "Value"

sim := New(map[string]ServiceFunc{
sim := NewInProc(map[string]ServiceFunc{
"noop": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
b.Store(testKey, testValue+ctx.Config.ID.String())
return newNoopService(), nil, nil
Expand Down
4 changes: 2 additions & 2 deletions network/simulation/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// and waits for the number of connection events to
// be received.
func TestPeerEvents(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

_, err := sim.AddNodes(2)
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestPeerEvents(t *testing.T) {
}

func TestPeerEventsTimeout(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

_, err := sim.AddNodes(2)
Expand Down
8 changes: 4 additions & 4 deletions network/simulation/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// all nodes have the their Kademlias healthy.
func ExampleSimulation_WaitTillHealthy() {

sim := simulation.New(map[string]simulation.ServiceFunc{
sim := simulation.NewInProc(map[string]simulation.ServiceFunc{
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
hp := network.NewHiveParams()
Expand Down Expand Up @@ -77,7 +77,7 @@ func ExampleSimulation_WaitTillHealthy() {

// Watch all peer events in the simulation network, buy receiving from a channel.
func ExampleSimulation_PeerEvents() {
sim := simulation.New(nil)
sim := simulation.NewInProc(nil)
defer sim.Close()

events := sim.PeerEvents(context.Background(), sim.NodeIDs())
Expand All @@ -95,7 +95,7 @@ func ExampleSimulation_PeerEvents() {

// Detect when a nodes drop a peer.
func ExampleSimulation_PeerEvents_disconnections() {
sim := simulation.New(nil)
sim := simulation.NewInProc(nil)
defer sim.Close()

disconnections := sim.PeerEvents(
Expand All @@ -118,7 +118,7 @@ func ExampleSimulation_PeerEvents_disconnections() {
// Watch multiple types of events or messages. In this case, they differ only
// by MsgCode, but filters can be set for different types or protocols, too.
func ExampleSimulation_PeerEvents_multipleFilters() {
sim := simulation.New(nil)
sim := simulation.NewInProc(nil)
defer sim.Close()

msgs := sim.PeerEvents(
Expand Down
2 changes: 1 addition & 1 deletion network/simulation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSimulationWithHTTPServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

sim := New(
sim := NewInProc(
map[string]ServiceFunc{
"noop": func(_ *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
return newNoopService(), nil, nil
Expand Down
8 changes: 4 additions & 4 deletions network/simulation/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestWaitTillHealthy(t *testing.T) {
testNodesNum := 10

// create the first simulation
sim := New(createSimServiceMap(true))
sim := NewInProc(createSimServiceMap(true))

// connect and...
nodeIDs, err := sim.AddNodesAndConnectRing(testNodesNum)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestWaitTillHealthy(t *testing.T) {
// close the initial simulation
sim.Close()
// create a control simulation
controlSim := New(createSimServiceMap(false))
controlSim := NewInProc(createSimServiceMap(false))
defer controlSim.Close()

// load the snapshot into this control simulation
Expand Down Expand Up @@ -158,7 +158,7 @@ func createSimServiceMap(discovery bool) map[string]ServiceFunc {
func TestWaitTillSnapshotRecreated(t *testing.T) {
t.Skip("test is flaky. disabling until underlying problem is addressed")
var err error
sim := New(createSimServiceMap(true))
sim := NewInProc(createSimServiceMap(true))
_, err = sim.AddNodesAndConnectRing(16)
if err != nil {
t.Fatal(err)
Expand All @@ -177,7 +177,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) {
t.Fatal(err)
}

controlSim := New(createSimServiceMap(false))
controlSim := NewInProc(createSimServiceMap(false))
defer controlSim.Close()
err = controlSim.Net.Load(snap)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions network/simulation/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

func TestUpDownNodeIDs(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

ids, err := sim.AddNodes(10)
Expand Down Expand Up @@ -99,7 +99,7 @@ func equalNodeIDs(one, other []enode.ID) bool {
}

func TestAddNode(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

id, err := sim.AddNode()
Expand All @@ -118,7 +118,7 @@ func TestAddNode(t *testing.T) {
}

func TestAddNodeWithMsgEvents(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

id, err := sim.AddNode(AddNodeWithMsgEvents(true))
Expand All @@ -141,7 +141,7 @@ func TestAddNodeWithMsgEvents(t *testing.T) {
}

func TestAddNodeWithService(t *testing.T) {
sim := New(map[string]ServiceFunc{
sim := NewInProc(map[string]ServiceFunc{
"noop1": noopServiceFunc,
"noop2": noopServiceFunc,
})
Expand All @@ -162,7 +162,7 @@ func TestAddNodeWithService(t *testing.T) {
}

func TestAddNodeMultipleServices(t *testing.T) {
sim := New(map[string]ServiceFunc{
sim := NewInProc(map[string]ServiceFunc{
"noop1": noopServiceFunc,
"noop2": noopService2Func,
})
Expand All @@ -183,7 +183,7 @@ func TestAddNodeMultipleServices(t *testing.T) {
}

func TestAddNodeDuplicateServiceError(t *testing.T) {
sim := New(map[string]ServiceFunc{
sim := NewInProc(map[string]ServiceFunc{
"noop1": noopServiceFunc,
"noop2": noopServiceFunc,
})
Expand All @@ -197,7 +197,7 @@ func TestAddNodeDuplicateServiceError(t *testing.T) {
}

func TestAddNodes(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

nodesCount := 12
Expand All @@ -219,7 +219,7 @@ func TestAddNodes(t *testing.T) {
}

func TestAddNodesAndConnectFull(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

n := 12
Expand All @@ -233,7 +233,7 @@ func TestAddNodesAndConnectFull(t *testing.T) {
}

func TestAddNodesAndConnectChain(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

_, err := sim.AddNodesAndConnectChain(12)
Expand All @@ -252,7 +252,7 @@ func TestAddNodesAndConnectChain(t *testing.T) {
}

func TestAddNodesAndConnectRing(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

ids, err := sim.AddNodesAndConnectRing(12)
Expand All @@ -264,7 +264,7 @@ func TestAddNodesAndConnectRing(t *testing.T) {
}

func TestAddNodesAndConnectStar(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

ids, err := sim.AddNodesAndConnectStar(12)
Expand All @@ -278,7 +278,7 @@ func TestAddNodesAndConnectStar(t *testing.T) {
//To test that uploading a snapshot works
func TestUploadSnapshot(t *testing.T) {
log.Debug("Creating simulation")
s := New(map[string]ServiceFunc{
s := NewInProc(map[string]ServiceFunc{
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
hp := network.NewHiveParams()
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestUploadSnapshot(t *testing.T) {
}

func TestStartStopNode(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

id, err := sim.AddNode()
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestStartStopNode(t *testing.T) {
}

func TestStartStopRandomNode(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

_, err := sim.AddNodes(3)
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestStartStopRandomNode(t *testing.T) {
}

func TestStartStopRandomNodes(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

_, err := sim.AddNodes(10)
Expand Down
2 changes: 1 addition & 1 deletion network/simulation/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestService(t *testing.T) {
sim := New(noopServiceFuncMap)
sim := NewInProc(noopServiceFuncMap)
defer sim.Close()

id, err := sim.AddNode()
Expand Down
71 changes: 62 additions & 9 deletions network/simulation/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package simulation
import (
"context"
"errors"
"io/ioutil"
"net/http"
"os"
"sync"
"time"

Expand All @@ -31,6 +33,11 @@ import (
"github.com/ethersphere/swarm/network"
)

const (
SimulationTypeInproc = iota
SimulationTypeExec
)

// Common errors that are returned by functions in this package.
var (
ErrNodeNotFound = errors.New("node not found")
Expand All @@ -50,6 +57,8 @@ type Simulation struct {
done chan struct{}
mu sync.RWMutex
neighbourhoodSize int
baseDir string
typ int

httpSrv *http.Server //attach a HTTP server via SimulationOptions
handler *simulations.Server //HTTP handler for the server
Expand All @@ -71,19 +80,66 @@ type ServiceFunc func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Se
// every ServiceFunc must return a node.Service of the unique type.
// This restriction is required by node.Node.Start() function
// which is used to start node.Service returned by ServiceFunc.
func New(services map[string]ServiceFunc) (s *Simulation) {
func NewInProc(services map[string]ServiceFunc) (s *Simulation) {
s = &Simulation{
buckets: make(map[enode.ID]*sync.Map),
done: make(chan struct{}),
neighbourhoodSize: network.NewKadParams().NeighbourhoodSize,
typ: SimulationTypeInproc,
}

s.addServices(services)
adapterServices := s.toAdapterServices(services)

s.Net = simulations.NewNetwork(
adapters.NewTCPAdapter(adapterServices),
&simulations.NetworkConfig{ID: "0"},
)

return s
}

// NewExec does the same as New but lets the caller specify the adapter to use
func NewExec(services map[string]ServiceFunc) (s *Simulation, err error) {
nolash marked this conversation as resolved.
Show resolved Hide resolved
s = &Simulation{
buckets: make(map[enode.ID]*sync.Map),
done: make(chan struct{}),
neighbourhoodSize: network.NewKadParams().NeighbourhoodSize,
typ: SimulationTypeExec,
}

s.addServices(services)
adapterServices := s.toAdapterServices(services)

// exec adapters register services up front, not at node creation time
adapters.RegisterServices(adapterServices)
Copy link
Contributor

@skylenet skylenet Jun 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nolash We have a problem with this RegisterServices() function. It should be called within an init() function as described in the documentation: https://github.com/ethereum/go-ethereum/tree/master/p2p/simulations#services

So basically, to make the current exec/docker adapter work. we would have to register all services upfront.
In our case it gets a bit harder, because we attached this to (s *Simulation)... so unless we define all our simulations inside an init(), this won't work.

I've been thinking now about this for a while and I don't find a decent solution.... any idea?

fyi @nonsense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had completely forgotten about this requirement, I remember reading about it a year or so ago...

I also don't have any solutions :(

Copy link
Contributor Author

@nolash nolash Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case it gets a bit harder

Why exactly do we need to put it in an init (except for the doc claiming that we do)?


s.baseDir, err = ioutil.TempDir("", "swarm-sim")
if err != nil {
return nil, err
}
s.Net = simulations.NewNetwork(
adapters.NewExecAdapter(s.baseDir),
&simulations.NetworkConfig{ID: "0"},
)

return s, nil
}

// add names of available services to simulation
func (s *Simulation) addServices(services map[string]ServiceFunc) {
for name := range services {
s.serviceNames = append(s.serviceNames, name)
}
}

// convert services array for use with adapters.RegisterServices
func (s *Simulation) toAdapterServices(services map[string]ServiceFunc) map[string]adapters.ServiceFunc {
adapterServices := make(map[string]adapters.ServiceFunc, len(services))
for name, serviceFunc := range services {
// Scope this variables correctly
// as they will be in the adapterServices[name] function accessed later.
name, serviceFunc := name, serviceFunc
s.serviceNames = append(s.serviceNames, name)
adapterServices[name] = func(ctx *adapters.ServiceContext) (node.Service, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -102,13 +158,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) {
return service, nil
}
}

s.Net = simulations.NewNetwork(
adapters.NewTCPAdapter(adapterServices),
&simulations.NetworkConfig{ID: "0"},
)

return s
return adapterServices
}

// RunFunc is the function that will be called
Expand Down Expand Up @@ -208,6 +258,9 @@ func (s *Simulation) Close() {

s.shutdownWG.Wait()
s.Net.Shutdown()
if s.baseDir != "" {
os.RemoveAll(s.baseDir)
}
}

// Done returns a channel that is closed when the simulation
Expand Down
Loading