aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/simulation
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2018-07-31 04:55:25 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-31 04:55:25 +0800
commitd6efa691872efb723ea3177a92da9e9b31c34eba (patch)
tree9c7e85c9cab9a2cf1240db47a8de44162f69353e /swarm/network/simulation
parent3ea8ac6a9ab9e56164707119e9142f06fae4c316 (diff)
downloadgo-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.gz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.bz2
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.lz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.xz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.zst
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.zip
Merge netsim mig to master (#17241)
* swarm: merged stream-tests migration to develop * swarm/network: expose simulation RandomUpNode to use in stream tests * swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest * swarm: enforce waitkademlia for snapshot tests * swarm: fixed syncer tests and snapshot_sync_test * swarm: linting of simulation package * swarm: address review comments * swarm/network/stream: fix delivery_test bugs and refactor * swarm/network/stream: addressed PR comments @janos * swarm/network/stream: enforce waitKademlia, improve TestIntervals * swarm/network/stream: TestIntervals not waiting for chunk to be stored
Diffstat (limited to 'swarm/network/simulation')
-rw-r--r--swarm/network/simulation/bucket.go2
-rw-r--r--swarm/network/simulation/connect.go4
-rw-r--r--swarm/network/simulation/events.go11
-rw-r--r--swarm/network/simulation/http.go11
-rw-r--r--swarm/network/simulation/http_test.go7
-rw-r--r--swarm/network/simulation/node.go19
-rw-r--r--swarm/network/simulation/service.go2
7 files changed, 41 insertions, 15 deletions
diff --git a/swarm/network/simulation/bucket.go b/swarm/network/simulation/bucket.go
index b37afaaa4..ddbedb521 100644
--- a/swarm/network/simulation/bucket.go
+++ b/swarm/network/simulation/bucket.go
@@ -43,7 +43,7 @@ func (s *Simulation) SetNodeItem(id discover.NodeID, key interface{}, value inte
s.buckets[id].Store(key, value)
}
-// NodeItems returns a map of items from all nodes that are all set under the
+// NodesItems returns a map of items from all nodes that are all set under the
// same BucketKey.
func (s *Simulation) NodesItems(key interface{}) (values map[discover.NodeID]interface{}) {
s.mu.RLock()
diff --git a/swarm/network/simulation/connect.go b/swarm/network/simulation/connect.go
index 3fe82052b..3d0f6cb3f 100644
--- a/swarm/network/simulation/connect.go
+++ b/swarm/network/simulation/connect.go
@@ -54,7 +54,7 @@ func (s *Simulation) ConnectToLastNode(id discover.NodeID) (err error) {
// ConnectToRandomNode connects the node with provieded NodeID
// to a random node that is up.
func (s *Simulation) ConnectToRandomNode(id discover.NodeID) (err error) {
- n := s.randomUpNode(id)
+ n := s.RandomUpNode(id)
if n == nil {
return ErrNodeNotFound
}
@@ -135,7 +135,7 @@ func (s *Simulation) ConnectNodesStar(id discover.NodeID, ids []discover.NodeID)
return nil
}
-// ConnectNodesStar connects all nodes in a star topology
+// ConnectNodesStarPivot connects all nodes in a star topology
// with the center at already set pivot node.
// If ids argument is nil, all nodes that are up will be connected.
func (s *Simulation) ConnectNodesStarPivot(ids []discover.NodeID) (err error) {
diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go
index f9cfadb73..980a9a756 100644
--- a/swarm/network/simulation/events.go
+++ b/swarm/network/simulation/events.go
@@ -18,6 +18,7 @@ package simulation
import (
"context"
+ "sync"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent {
eventC := make(chan PeerEvent)
+ // wait group to make sure all subscriptions to admin peerEvents are established
+ // before this function returns.
+ var subsWG sync.WaitGroup
for _, id := range ids {
s.shutdownWG.Add(1)
+ subsWG.Add(1)
go func(id discover.NodeID) {
defer s.shutdownWG.Done()
client, err := s.Net.GetNode(id).Client()
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
defer sub.Unsubscribe()
+ subsWG.Done()
+
for {
select {
case <-ctx.Done():
@@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt
}(id)
}
+ // wait all subscriptions
+ subsWG.Wait()
return eventC
}
diff --git a/swarm/network/simulation/http.go b/swarm/network/simulation/http.go
index 40f13f32d..69ae3baec 100644
--- a/swarm/network/simulation/http.go
+++ b/swarm/network/simulation/http.go
@@ -29,7 +29,7 @@ var (
DefaultHTTPSimAddr = ":8888"
)
-//`With`(builder) pattern constructor for Simulation to
+//WithServer implements the builder pattern constructor for Simulation to
//start with a HTTP server
func (s *Simulation) WithServer(addr string) *Simulation {
//assign default addr if nothing provided
@@ -46,7 +46,12 @@ func (s *Simulation) WithServer(addr string) *Simulation {
Addr: addr,
Handler: s.handler,
}
- go s.httpSrv.ListenAndServe()
+ go func() {
+ err := s.httpSrv.ListenAndServe()
+ if err != nil {
+ log.Error("Error starting the HTTP server", "error", err)
+ }
+ }()
return s
}
@@ -55,7 +60,7 @@ func (s *Simulation) addSimulationRoutes() {
s.handler.POST("/runsim", s.RunSimulation)
}
-// StartNetwork starts all nodes in the network
+// RunSimulation is the actual POST endpoint runner
func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request) {
log.Debug("RunSimulation endpoint running")
s.runC <- struct{}{}
diff --git a/swarm/network/simulation/http_test.go b/swarm/network/simulation/http_test.go
index 4d8bf9946..775cf9219 100644
--- a/swarm/network/simulation/http_test.go
+++ b/swarm/network/simulation/http_test.go
@@ -96,7 +96,12 @@ func sendRunSignal(t *testing.T) {
if err != nil {
t.Fatalf("Request failed: %v", err)
}
- defer resp.Body.Close()
+ defer func() {
+ err := resp.Body.Close()
+ if err != nil {
+ log.Error("Error closing response body", "err", err)
+ }
+ }()
log.Debug("Signal sent")
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go
index bc433cfd8..784588fa6 100644
--- a/swarm/network/simulation/node.go
+++ b/swarm/network/simulation/node.go
@@ -195,7 +195,7 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
return ids, nil
}
-//Upload a snapshot
+//UploadSnapshot uploads a snapshot to the simulation
//This method tries to open the json file provided, applies the config to all nodes
//and then loads the snapshot into the Simulation network
func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error {
@@ -203,7 +203,12 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
if err != nil {
return err
}
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ log.Error("Error closing snapshot file", "err", err)
+ }
+ }()
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
return err
@@ -294,7 +299,7 @@ func (s *Simulation) StopNode(id discover.NodeID) (err error) {
// StopRandomNode stops a random node.
func (s *Simulation) StopRandomNode() (id discover.NodeID, err error) {
- n := s.randomUpNode()
+ n := s.RandomUpNode()
if n == nil {
return id, ErrNodeNotFound
}
@@ -324,18 +329,18 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// randomUpNode returns a random SimNode that is up.
+// RandomUpNode returns a random SimNode that is up.
// Arguments are NodeIDs for nodes that should not be returned.
-func (s *Simulation) randomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
+func (s *Simulation) RandomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.UpNodeIDs(), exclude...)
}
-// randomUpNode returns a random SimNode that is not up.
+// randomDownNode returns a random SimNode that is not up.
func (s *Simulation) randomDownNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.DownNodeIDs(), exclude...)
}
-// randomUpNode returns a random SimNode from the slice of NodeIDs.
+// randomNode returns a random SimNode from the slice of NodeIDs.
func (s *Simulation) randomNode(ids []discover.NodeID, exclude ...discover.NodeID) *adapters.SimNode {
for _, e := range exclude {
var i int
diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go
index d1cbf1f8b..02e7ad0cc 100644
--- a/swarm/network/simulation/service.go
+++ b/swarm/network/simulation/service.go
@@ -39,7 +39,7 @@ func (s *Simulation) Service(name string, id discover.NodeID) node.Service {
// RandomService returns a single Service by name on a
// randomly chosen node that is up.
func (s *Simulation) RandomService(name string) node.Service {
- n := s.randomUpNode()
+ n := s.RandomUpNode()
if n == nil {
return nil
}