aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/simulation/events.go
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2018-07-31 04:55:25 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-31 04:55:25 +0800
commitd6efa691872efb723ea3177a92da9e9b31c34eba (patch)
tree9c7e85c9cab9a2cf1240db47a8de44162f69353e /swarm/network/simulation/events.go
parent3ea8ac6a9ab9e56164707119e9142f06fae4c316 (diff)
downloadgo-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.gz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.bz2
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.lz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.xz
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.zst
go-tangerine-d6efa691872efb723ea3177a92da9e9b31c34eba.zip
Merge netsim mig to master (#17241)
* swarm: merged stream-tests migration to develop * swarm/network: expose simulation RandomUpNode to use in stream tests * swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest * swarm: enforce waitkademlia for snapshot tests * swarm: fixed syncer tests and snapshot_sync_test * swarm: linting of simulation package * swarm: address review comments * swarm/network/stream: fix delivery_test bugs and refactor * swarm/network/stream: addressed PR comments @janos * swarm/network/stream: enforce waitKademlia, improve TestIntervals * swarm/network/stream: TestIntervals not waiting for chunk to be stored
Diffstat (limited to 'swarm/network/simulation/events.go')
-rw-r--r--swarm/network/simulation/events.go11
1 files changed, 11 insertions, 0 deletions
diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go
index f9cfadb73..980a9a756 100644
--- a/swarm/network/simulation/events.go
+++ b/swarm/network/simulation/events.go
@@ -18,6 +18,7 @@ package simulation
import (
"context"
+ "sync"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent {
eventC := make(chan PeerEvent)
+ // wait group to make sure all subscriptions to admin peerEvents are established
+ // before this function returns.
+ var subsWG sync.WaitGroup
for _, id := range ids {
s.shutdownWG.Add(1)
+ subsWG.Add(1)
go func(id discover.NodeID) {
defer s.shutdownWG.Done()
client, err := s.Net.GetNode(id).Client()
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
defer sub.Unsubscribe()
+ subsWG.Done()
+
for {
select {
case <-ctx.Done():
@@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt
}(id)
}
+ // wait all subscriptions
+ subsWG.Wait()
return eventC
}