diff options
author | holisticode <holistic.computing@gmail.com> | 2018-07-31 04:55:25 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-31 04:55:25 +0800 |
commit | d6efa691872efb723ea3177a92da9e9b31c34eba (patch) | |
tree | 9c7e85c9cab9a2cf1240db47a8de44162f69353e /swarm/network/simulation/events.go | |
parent | 3ea8ac6a9ab9e56164707119e9142f06fae4c316 (diff) | |
download | go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.gz go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.bz2 go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.lz go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.xz go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.zst go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.zip |
Merge netsim mig to master (#17241)
* swarm: merged stream-tests migration to develop
* swarm/network: expose simulation RandomUpNode to use in stream tests
* swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest
* swarm: enforce waitkademlia for snapshot tests
* swarm: fixed syncer tests and snapshot_sync_test
* swarm: linting of simulation package
* swarm: address review comments
* swarm/network/stream: fix delivery_test bugs and refactor
* swarm/network/stream: addressed PR comments @janos
* swarm/network/stream: enforce waitKademlia, improve TestIntervals
* swarm/network/stream: TestIntervals not waiting for chunk to be stored
Diffstat (limited to 'swarm/network/simulation/events.go')
-rw-r--r-- | swarm/network/simulation/events.go | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go index f9cfadb73..980a9a756 100644 --- a/swarm/network/simulation/events.go +++ b/swarm/network/simulation/events.go @@ -18,6 +18,7 @@ package simulation import ( "context" + "sync" "github.com/ethereum/go-ethereum/p2p/discover" @@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter { func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent { eventC := make(chan PeerEvent) + // wait group to make sure all subscriptions to admin peerEvents are established + // before this function returns. + var subsWG sync.WaitGroup for _, id := range ids { s.shutdownWG.Add(1) + subsWG.Add(1) go func(id discover.NodeID) { defer s.shutdownWG.Done() client, err := s.Net.GetNode(id).Client() if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } events := make(chan *p2p.PeerEvent) sub, err := client.Subscribe(ctx, "admin", events, "peerEvents") if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } defer sub.Unsubscribe() + subsWG.Done() + for { select { case <-ctx.Done(): @@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt }(id) } + // wait all subscriptions + subsWG.Wait() return eventC } |