From d6efa691872efb723ea3177a92da9e9b31c34eba Mon Sep 17 00:00:00 2001 From: holisticode Date: Mon, 30 Jul 2018 15:55:25 -0500 Subject: Merge netsim mig to master (#17241) * swarm: merged stream-tests migration to develop * swarm/network: expose simulation RandomUpNode to use in stream tests * swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest * swarm: enforce waitkademlia for snapshot tests * swarm: fixed syncer tests and snapshot_sync_test * swarm: linting of simulation package * swarm: address review comments * swarm/network/stream: fix delivery_test bugs and refactor * swarm/network/stream: addressed PR comments @janos * swarm/network/stream: enforce waitKademlia, improve TestIntervals * swarm/network/stream: TestIntervals not waiting for chunk to be stored --- swarm/network/simulation/bucket.go | 2 +- swarm/network/simulation/connect.go | 4 ++-- swarm/network/simulation/events.go | 11 +++++++++++ swarm/network/simulation/http.go | 11 ++++++++--- swarm/network/simulation/http_test.go | 7 ++++++- swarm/network/simulation/node.go | 19 ++++++++++++------- swarm/network/simulation/service.go | 2 +- 7 files changed, 41 insertions(+), 15 deletions(-) (limited to 'swarm/network/simulation') diff --git a/swarm/network/simulation/bucket.go b/swarm/network/simulation/bucket.go index b37afaaa4..ddbedb521 100644 --- a/swarm/network/simulation/bucket.go +++ b/swarm/network/simulation/bucket.go @@ -43,7 +43,7 @@ func (s *Simulation) SetNodeItem(id discover.NodeID, key interface{}, value inte s.buckets[id].Store(key, value) } -// NodeItems returns a map of items from all nodes that are all set under the +// NodesItems returns a map of items from all nodes that are all set under the // same BucketKey. func (s *Simulation) NodesItems(key interface{}) (values map[discover.NodeID]interface{}) { s.mu.RLock() diff --git a/swarm/network/simulation/connect.go b/swarm/network/simulation/connect.go index 3fe82052b..3d0f6cb3f 100644 --- a/swarm/network/simulation/connect.go +++ b/swarm/network/simulation/connect.go @@ -54,7 +54,7 @@ func (s *Simulation) ConnectToLastNode(id discover.NodeID) (err error) { // ConnectToRandomNode connects the node with provieded NodeID // to a random node that is up. func (s *Simulation) ConnectToRandomNode(id discover.NodeID) (err error) { - n := s.randomUpNode(id) + n := s.RandomUpNode(id) if n == nil { return ErrNodeNotFound } @@ -135,7 +135,7 @@ func (s *Simulation) ConnectNodesStar(id discover.NodeID, ids []discover.NodeID) return nil } -// ConnectNodesStar connects all nodes in a star topology +// ConnectNodesStarPivot connects all nodes in a star topology // with the center at already set pivot node. // If ids argument is nil, all nodes that are up will be connected. func (s *Simulation) ConnectNodesStarPivot(ids []discover.NodeID) (err error) { diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go index f9cfadb73..980a9a756 100644 --- a/swarm/network/simulation/events.go +++ b/swarm/network/simulation/events.go @@ -18,6 +18,7 @@ package simulation import ( "context" + "sync" "github.com/ethereum/go-ethereum/p2p/discover" @@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter { func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent { eventC := make(chan PeerEvent) + // wait group to make sure all subscriptions to admin peerEvents are established + // before this function returns. + var subsWG sync.WaitGroup for _, id := range ids { s.shutdownWG.Add(1) + subsWG.Add(1) go func(id discover.NodeID) { defer s.shutdownWG.Done() client, err := s.Net.GetNode(id).Client() if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } events := make(chan *p2p.PeerEvent) sub, err := client.Subscribe(ctx, "admin", events, "peerEvents") if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } defer sub.Unsubscribe() + subsWG.Done() + for { select { case <-ctx.Done(): @@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt }(id) } + // wait all subscriptions + subsWG.Wait() return eventC } diff --git a/swarm/network/simulation/http.go b/swarm/network/simulation/http.go index 40f13f32d..69ae3baec 100644 --- a/swarm/network/simulation/http.go +++ b/swarm/network/simulation/http.go @@ -29,7 +29,7 @@ var ( DefaultHTTPSimAddr = ":8888" ) -//`With`(builder) pattern constructor for Simulation to +//WithServer implements the builder pattern constructor for Simulation to //start with a HTTP server func (s *Simulation) WithServer(addr string) *Simulation { //assign default addr if nothing provided @@ -46,7 +46,12 @@ func (s *Simulation) WithServer(addr string) *Simulation { Addr: addr, Handler: s.handler, } - go s.httpSrv.ListenAndServe() + go func() { + err := s.httpSrv.ListenAndServe() + if err != nil { + log.Error("Error starting the HTTP server", "error", err) + } + }() return s } @@ -55,7 +60,7 @@ func (s *Simulation) addSimulationRoutes() { s.handler.POST("/runsim", s.RunSimulation) } -// StartNetwork starts all nodes in the network +// RunSimulation is the actual POST endpoint runner func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request) { log.Debug("RunSimulation endpoint running") s.runC <- struct{}{} diff --git a/swarm/network/simulation/http_test.go b/swarm/network/simulation/http_test.go index 4d8bf9946..775cf9219 100644 --- a/swarm/network/simulation/http_test.go +++ b/swarm/network/simulation/http_test.go @@ -96,7 +96,12 @@ func sendRunSignal(t *testing.T) { if err != nil { t.Fatalf("Request failed: %v", err) } - defer resp.Body.Close() + defer func() { + err := resp.Body.Close() + if err != nil { + log.Error("Error closing response body", "err", err) + } + }() log.Debug("Signal sent") if resp.StatusCode != http.StatusOK { t.Fatalf("err %s", resp.Status) diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go index bc433cfd8..784588fa6 100644 --- a/swarm/network/simulation/node.go +++ b/swarm/network/simulation/node.go @@ -195,7 +195,7 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i return ids, nil } -//Upload a snapshot +//UploadSnapshot uploads a snapshot to the simulation //This method tries to open the json file provided, applies the config to all nodes //and then loads the snapshot into the Simulation network func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error { @@ -203,7 +203,12 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) if err != nil { return err } - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + log.Error("Error closing snapshot file", "err", err) + } + }() jsonbyte, err := ioutil.ReadAll(f) if err != nil { return err @@ -294,7 +299,7 @@ func (s *Simulation) StopNode(id discover.NodeID) (err error) { // StopRandomNode stops a random node. func (s *Simulation) StopRandomNode() (id discover.NodeID, err error) { - n := s.randomUpNode() + n := s.RandomUpNode() if n == nil { return id, ErrNodeNotFound } @@ -324,18 +329,18 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// randomUpNode returns a random SimNode that is up. +// RandomUpNode returns a random SimNode that is up. // Arguments are NodeIDs for nodes that should not be returned. -func (s *Simulation) randomUpNode(exclude ...discover.NodeID) *adapters.SimNode { +func (s *Simulation) RandomUpNode(exclude ...discover.NodeID) *adapters.SimNode { return s.randomNode(s.UpNodeIDs(), exclude...) } -// randomUpNode returns a random SimNode that is not up. +// randomDownNode returns a random SimNode that is not up. func (s *Simulation) randomDownNode(exclude ...discover.NodeID) *adapters.SimNode { return s.randomNode(s.DownNodeIDs(), exclude...) } -// randomUpNode returns a random SimNode from the slice of NodeIDs. +// randomNode returns a random SimNode from the slice of NodeIDs. func (s *Simulation) randomNode(ids []discover.NodeID, exclude ...discover.NodeID) *adapters.SimNode { for _, e := range exclude { var i int diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go index d1cbf1f8b..02e7ad0cc 100644 --- a/swarm/network/simulation/service.go +++ b/swarm/network/simulation/service.go @@ -39,7 +39,7 @@ func (s *Simulation) Service(name string, id discover.NodeID) node.Service { // RandomService returns a single Service by name on a // randomly chosen node that is up. func (s *Simulation) RandomService(name string) node.Service { - n := s.randomUpNode() + n := s.RandomUpNode() if n == nil { return nil } -- cgit v1.2.3