diff --git a/cmd/swarm-snapshot/create.go b/cmd/swarm-snapshot/create.go index e3591b81fe..aaa63bf473 100644 --- a/cmd/swarm-snapshot/create.go +++ b/cmd/swarm-snapshot/create.go @@ -58,7 +58,7 @@ func create(ctx *cli.Context) error { func createSnapshot(filename string, nodes int, services []string) (err error) { log.Debug("create snapshot", "filename", filename, "nodes", nodes, "services", services) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) diff --git a/network/simulation/bucket_test.go b/network/simulation/bucket_test.go index 2273d35a29..16df52e651 100644 --- a/network/simulation/bucket_test.go +++ b/network/simulation/bucket_test.go @@ -32,7 +32,7 @@ func TestServiceBucket(t *testing.T) { testKey := "Key" testValue := "Value" - sim := New(map[string]ServiceFunc{ + sim := NewInProc(map[string]ServiceFunc{ "noop": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { b.Store(testKey, testValue+ctx.Config.ID.String()) return newNoopService(), nil, nil diff --git a/network/simulation/events_test.go b/network/simulation/events_test.go index 529844816f..6a164d6a15 100644 --- a/network/simulation/events_test.go +++ b/network/simulation/events_test.go @@ -28,7 +28,7 @@ import ( // and waits for the number of connection events to // be received. func TestPeerEvents(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() _, err := sim.AddNodes(2) @@ -68,7 +68,7 @@ func TestPeerEvents(t *testing.T) { } func TestPeerEventsTimeout(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() _, err := sim.AddNodes(2) diff --git a/network/simulation/example_test.go b/network/simulation/example_test.go index e0db2ec954..9e2601219f 100644 --- a/network/simulation/example_test.go +++ b/network/simulation/example_test.go @@ -34,7 +34,7 @@ import ( // all nodes have the their Kademlias healthy. func ExampleSimulation_WaitTillHealthy() { - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) hp := network.NewHiveParams() @@ -77,7 +77,7 @@ func ExampleSimulation_WaitTillHealthy() { // Watch all peer events in the simulation network, buy receiving from a channel. func ExampleSimulation_PeerEvents() { - sim := simulation.New(nil) + sim := simulation.NewInProc(nil) defer sim.Close() events := sim.PeerEvents(context.Background(), sim.NodeIDs()) @@ -95,7 +95,7 @@ func ExampleSimulation_PeerEvents() { // Detect when a nodes drop a peer. func ExampleSimulation_PeerEvents_disconnections() { - sim := simulation.New(nil) + sim := simulation.NewInProc(nil) defer sim.Close() disconnections := sim.PeerEvents( @@ -118,7 +118,7 @@ func ExampleSimulation_PeerEvents_disconnections() { // Watch multiple types of events or messages. In this case, they differ only // by MsgCode, but filters can be set for different types or protocols, too. func ExampleSimulation_PeerEvents_multipleFilters() { - sim := simulation.New(nil) + sim := simulation.NewInProc(nil) defer sim.Close() msgs := sim.PeerEvents( diff --git a/network/simulation/http_test.go b/network/simulation/http_test.go index dffd03a032..7a32da03e6 100644 --- a/network/simulation/http_test.go +++ b/network/simulation/http_test.go @@ -35,7 +35,7 @@ func TestSimulationWithHTTPServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - sim := New( + sim := NewInProc( map[string]ServiceFunc{ "noop": func(_ *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { return newNoopService(), nil, nil diff --git a/network/simulation/kademlia_test.go b/network/simulation/kademlia_test.go index f6839ba43f..c69832d29e 100644 --- a/network/simulation/kademlia_test.go +++ b/network/simulation/kademlia_test.go @@ -46,7 +46,7 @@ func TestWaitTillHealthy(t *testing.T) { testNodesNum := 10 // create the first simulation - sim := New(createSimServiceMap(true)) + sim := NewInProc(createSimServiceMap(true)) // connect and... nodeIDs, err := sim.AddNodesAndConnectRing(testNodesNum) @@ -88,7 +88,7 @@ func TestWaitTillHealthy(t *testing.T) { // close the initial simulation sim.Close() // create a control simulation - controlSim := New(createSimServiceMap(false)) + controlSim := NewInProc(createSimServiceMap(false)) defer controlSim.Close() // load the snapshot into this control simulation @@ -158,7 +158,7 @@ func createSimServiceMap(discovery bool) map[string]ServiceFunc { func TestWaitTillSnapshotRecreated(t *testing.T) { t.Skip("test is flaky. disabling until underlying problem is addressed") var err error - sim := New(createSimServiceMap(true)) + sim := NewInProc(createSimServiceMap(true)) _, err = sim.AddNodesAndConnectRing(16) if err != nil { t.Fatal(err) @@ -177,7 +177,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) { t.Fatal(err) } - controlSim := New(createSimServiceMap(false)) + controlSim := NewInProc(createSimServiceMap(false)) defer controlSim.Close() err = controlSim.Net.Load(snap) if err != nil { diff --git a/network/simulation/node_test.go b/network/simulation/node_test.go index 51fcf5d132..1435eae6cd 100644 --- a/network/simulation/node_test.go +++ b/network/simulation/node_test.go @@ -32,7 +32,7 @@ import ( ) func TestUpDownNodeIDs(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() ids, err := sim.AddNodes(10) @@ -99,7 +99,7 @@ func equalNodeIDs(one, other []enode.ID) bool { } func TestAddNode(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() id, err := sim.AddNode() @@ -118,7 +118,7 @@ func TestAddNode(t *testing.T) { } func TestAddNodeWithMsgEvents(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() id, err := sim.AddNode(AddNodeWithMsgEvents(true)) @@ -141,7 +141,7 @@ func TestAddNodeWithMsgEvents(t *testing.T) { } func TestAddNodeWithService(t *testing.T) { - sim := New(map[string]ServiceFunc{ + sim := NewInProc(map[string]ServiceFunc{ "noop1": noopServiceFunc, "noop2": noopServiceFunc, }) @@ -162,7 +162,7 @@ func TestAddNodeWithService(t *testing.T) { } func TestAddNodeMultipleServices(t *testing.T) { - sim := New(map[string]ServiceFunc{ + sim := NewInProc(map[string]ServiceFunc{ "noop1": noopServiceFunc, "noop2": noopService2Func, }) @@ -183,7 +183,7 @@ func TestAddNodeMultipleServices(t *testing.T) { } func TestAddNodeDuplicateServiceError(t *testing.T) { - sim := New(map[string]ServiceFunc{ + sim := NewInProc(map[string]ServiceFunc{ "noop1": noopServiceFunc, "noop2": noopServiceFunc, }) @@ -197,7 +197,7 @@ func TestAddNodeDuplicateServiceError(t *testing.T) { } func TestAddNodes(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() nodesCount := 12 @@ -219,7 +219,7 @@ func TestAddNodes(t *testing.T) { } func TestAddNodesAndConnectFull(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() n := 12 @@ -233,7 +233,7 @@ func TestAddNodesAndConnectFull(t *testing.T) { } func TestAddNodesAndConnectChain(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() _, err := sim.AddNodesAndConnectChain(12) @@ -252,7 +252,7 @@ func TestAddNodesAndConnectChain(t *testing.T) { } func TestAddNodesAndConnectRing(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() ids, err := sim.AddNodesAndConnectRing(12) @@ -264,7 +264,7 @@ func TestAddNodesAndConnectRing(t *testing.T) { } func TestAddNodesAndConnectStar(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() ids, err := sim.AddNodesAndConnectStar(12) @@ -278,7 +278,7 @@ func TestAddNodesAndConnectStar(t *testing.T) { //To test that uploading a snapshot works func TestUploadSnapshot(t *testing.T) { log.Debug("Creating simulation") - s := New(map[string]ServiceFunc{ + s := NewInProc(map[string]ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) hp := network.NewHiveParams() @@ -317,7 +317,7 @@ func TestUploadSnapshot(t *testing.T) { } func TestStartStopNode(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() id, err := sim.AddNode() @@ -353,7 +353,7 @@ func TestStartStopNode(t *testing.T) { } func TestStartStopRandomNode(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() _, err := sim.AddNodes(3) @@ -392,7 +392,7 @@ func TestStartStopRandomNode(t *testing.T) { } func TestStartStopRandomNodes(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() _, err := sim.AddNodes(10) diff --git a/network/simulation/service_test.go b/network/simulation/service_test.go index 23b0d86f24..4f42ec7a60 100644 --- a/network/simulation/service_test.go +++ b/network/simulation/service_test.go @@ -21,7 +21,7 @@ import ( ) func TestService(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() id, err := sim.AddNode() diff --git a/network/simulation/simulation.go b/network/simulation/simulation.go index 04439bd3b5..64ee44c52b 100644 --- a/network/simulation/simulation.go +++ b/network/simulation/simulation.go @@ -19,7 +19,9 @@ package simulation import ( "context" "errors" + "io/ioutil" "net/http" + "os" "sync" "time" @@ -31,6 +33,11 @@ import ( "github.com/ethersphere/swarm/network" ) +const ( + SimulationTypeInproc = iota + SimulationTypeExec +) + // Common errors that are returned by functions in this package. var ( ErrNodeNotFound = errors.New("node not found") @@ -50,6 +57,8 @@ type Simulation struct { done chan struct{} mu sync.RWMutex neighbourhoodSize int + baseDir string + typ int httpSrv *http.Server //attach a HTTP server via SimulationOptions handler *simulations.Server //HTTP handler for the server @@ -71,19 +80,66 @@ type ServiceFunc func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Se // every ServiceFunc must return a node.Service of the unique type. // This restriction is required by node.Node.Start() function // which is used to start node.Service returned by ServiceFunc. -func New(services map[string]ServiceFunc) (s *Simulation) { +func NewInProc(services map[string]ServiceFunc) (s *Simulation) { + s = &Simulation{ + buckets: make(map[enode.ID]*sync.Map), + done: make(chan struct{}), + neighbourhoodSize: network.NewKadParams().NeighbourhoodSize, + typ: SimulationTypeInproc, + } + + s.addServices(services) + adapterServices := s.toAdapterServices(services) + + s.Net = simulations.NewNetwork( + adapters.NewTCPAdapter(adapterServices), + &simulations.NetworkConfig{ID: "0"}, + ) + + return s +} + +// NewExec does the same as New but lets the caller specify the adapter to use +func NewExec(services map[string]ServiceFunc) (s *Simulation, err error) { s = &Simulation{ buckets: make(map[enode.ID]*sync.Map), done: make(chan struct{}), neighbourhoodSize: network.NewKadParams().NeighbourhoodSize, + typ: SimulationTypeExec, } + s.addServices(services) + adapterServices := s.toAdapterServices(services) + + // exec adapters register services up front, not at node creation time + adapters.RegisterServices(adapterServices) + + s.baseDir, err = ioutil.TempDir("", "swarm-sim") + if err != nil { + return nil, err + } + s.Net = simulations.NewNetwork( + adapters.NewExecAdapter(s.baseDir), + &simulations.NetworkConfig{ID: "0"}, + ) + + return s, nil +} + +// add names of available services to simulation +func (s *Simulation) addServices(services map[string]ServiceFunc) { + for name := range services { + s.serviceNames = append(s.serviceNames, name) + } +} + +// convert services array for use with adapters.RegisterServices +func (s *Simulation) toAdapterServices(services map[string]ServiceFunc) map[string]adapters.ServiceFunc { adapterServices := make(map[string]adapters.ServiceFunc, len(services)) for name, serviceFunc := range services { // Scope this variables correctly // as they will be in the adapterServices[name] function accessed later. name, serviceFunc := name, serviceFunc - s.serviceNames = append(s.serviceNames, name) adapterServices[name] = func(ctx *adapters.ServiceContext) (node.Service, error) { s.mu.Lock() defer s.mu.Unlock() @@ -102,13 +158,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) { return service, nil } } - - s.Net = simulations.NewNetwork( - adapters.NewTCPAdapter(adapterServices), - &simulations.NetworkConfig{ID: "0"}, - ) - - return s + return adapterServices } // RunFunc is the function that will be called @@ -208,6 +258,9 @@ func (s *Simulation) Close() { s.shutdownWG.Wait() s.Net.Shutdown() + if s.baseDir != "" { + os.RemoveAll(s.baseDir) + } } // Done returns a channel that is closed when the simulation diff --git a/network/simulation/simulation_test.go b/network/simulation/simulation_test.go index 1d0338f593..6f5e309f9b 100644 --- a/network/simulation/simulation_test.go +++ b/network/simulation/simulation_test.go @@ -43,7 +43,7 @@ func init() { // TestRun tests if Run method calls RunFunc and if it handles context properly. func TestRun(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) defer sim.Close() t.Run("call", func(t *testing.T) { @@ -104,7 +104,7 @@ func TestClose(t *testing.T) { sleep := 50 * time.Millisecond - sim := New(map[string]ServiceFunc{ + sim := NewInProc(map[string]ServiceFunc{ "noop": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { return newNoopService(), func() { time.Sleep(sleep) @@ -151,7 +151,7 @@ func TestClose(t *testing.T) { // TestDone checks if Close method triggers the closing of done channel. func TestDone(t *testing.T) { - sim := New(noopServiceFuncMap) + sim := NewInProc(noopServiceFuncMap) sleep := 50 * time.Millisecond timeout := 2 * time.Second diff --git a/network/stream/intervals_test.go b/network/stream/intervals_test.go index 49972e54e6..96a1efd4eb 100644 --- a/network/stream/intervals_test.go +++ b/network/stream/intervals_test.go @@ -59,7 +59,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { externalStreamSessionAt := uint64(50) externalStreamMaxKeys := uint64(100) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) { addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { diff --git a/network/stream/peer_test.go b/network/stream/peer_test.go index d6ed4524b1..deaec6afb6 100644 --- a/network/stream/peer_test.go +++ b/network/stream/peer_test.go @@ -130,7 +130,7 @@ func TestSyncSubscriptionsDiff(t *testing.T) { // made on initial node connections and that subscriptions are correctly changed // when kademlia neighbourhood depth is changed by connecting more nodes. func TestUpdateSyncingSubscriptions(t *testing.T) { - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { diff --git a/network/stream/snapshot_retrieval_test.go b/network/stream/snapshot_retrieval_test.go index d0fbd0c629..6e86f6188f 100644 --- a/network/stream/snapshot_retrieval_test.go +++ b/network/stream/snapshot_retrieval_test.go @@ -171,7 +171,7 @@ func runPureRetrievalTest(t *testing.T, nodeCount int, chunkCount int) { t.Helper() // the pure retrieval test needs a different service map, as we want // syncing disabled and we don't need to set the syncUpdateDelay - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { @@ -316,7 +316,7 @@ func runFileRetrievalTest(t *testing.T, nodeCount int) { t.Helper() - sim := simulation.New(retrievalSimServiceMap) + sim := simulation.NewInProc(retrievalSimServiceMap) defer sim.Close() log.Info("Initializing test config", "node count", nodeCount) @@ -404,7 +404,7 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) { t.Helper() - sim := simulation.New(retrievalSimServiceMap) + sim := simulation.NewInProc(retrievalSimServiceMap) defer sim.Close() conf := &synctestConfig{} diff --git a/network/stream/snapshot_sync_test.go b/network/stream/snapshot_sync_test.go index 39d5920273..6d393a3a6c 100644 --- a/network/stream/snapshot_sync_test.go +++ b/network/stream/snapshot_sync_test.go @@ -134,7 +134,7 @@ var simServiceMap = map[string]simulation.ServiceFunc{ } func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { - sim := simulation.New(simServiceMap) + sim := simulation.NewInProc(simServiceMap) defer sim.Close() log.Info("Initializing test config") diff --git a/network/stream/streamer_test.go b/network/stream/streamer_test.go index 4e01056204..dc080916a1 100644 --- a/network/stream/streamer_test.go +++ b/network/stream/streamer_test.go @@ -998,7 +998,7 @@ func TestGetServerSubscriptionsRPC(t *testing.T) { return nil } // create a standard sim - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) if err != nil { diff --git a/network/stream/syncer_test.go b/network/stream/syncer_test.go index ee4cee995f..b46b623510 100644 --- a/network/stream/syncer_test.go +++ b/network/stream/syncer_test.go @@ -54,7 +54,7 @@ func TestTwoNodesFullSync(t *testing.T) { // chunkCount = 1000 //~4mb syncTime = 5 * time.Second ) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr := network.NewAddr(ctx.Config.Node()) @@ -253,7 +253,7 @@ func TestStarNetworkSync(t *testing.T) { syncTime = 30 * time.Second filesize = chunkCount * chunkSize ) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr := network.NewAddr(ctx.Config.Node()) @@ -466,7 +466,7 @@ type chunkProxData struct { func TestSameVersionID(t *testing.T) { //test version ID v := uint(1) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { @@ -529,7 +529,7 @@ func TestSameVersionID(t *testing.T) { func TestDifferentVersionID(t *testing.T) { //create a variable to hold the version ID v := uint(0) - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { diff --git a/network_test.go b/network_test.go index 128757cf75..13ee49a398 100644 --- a/network_test.go +++ b/network_test.go @@ -292,7 +292,7 @@ func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwa o = new(testSwarmNetworkOptions) } - sim := simulation.New(map[string]simulation.ServiceFunc{ + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { config := api.NewConfig() diff --git a/pss/prox_test.go b/pss/prox_test.go index 6366130d42..6436035a65 100644 --- a/pss/prox_test.go +++ b/pss/prox_test.go @@ -227,7 +227,7 @@ func testProxNetwork(t *testing.T, nodeCount int, msgCount int, timeout time.Dur handlerContextFuncs := make(map[Topic]handlerContextFunc) handlerContextFuncs[topic] = nodeMsgHandler services := newProxServices(td, true, handlerContextFuncs, td.kademlias) - td.sim = simulation.New(services) + td.sim = simulation.NewInProc(services) defer td.sim.Close() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel()