aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/common_test.go
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-02-13 20:03:23 +0800
committerViktor Trón <viktor.tron@gmail.com>2019-02-13 20:03:23 +0800
commit3fd6db2bf63ce90232de445c7f33943406a5e634 (patch)
treeaebd00021b52c4b4109feed2af315d2e63fa06ae /swarm/network/stream/common_test.go
parentd596bea2d501d20b92e0fd4baa8bba682157dfa7 (diff)
downloadgo-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar.gz
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar.bz2
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar.lz
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar.xz
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.tar.zst
go-tangerine-3fd6db2bf63ce90232de445c7f33943406a5e634.zip
swarm: fix network/stream data races (#19051)
* swarm/network/stream: newStreamerTester cleanup only if err is nil * swarm/network/stream: raise newStreamerTester waitForPeers timeout * swarm/network/stream: fix data races in GetPeerSubscriptions * swarm/storage: prevent data race on LDBStore.batchesC https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461775049 * swarm/network/stream: fix TestGetSubscriptionsRPC data race https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461768477 * swarm/network/stream: correctly use Simulation.Run callback https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461783804 * swarm/network: protect addrCountC in Kademlia.AddrCountC function https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462273444 * p2p/simulations: fix a deadlock calling getRandomNode with lock https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462317407 * swarm/network/stream: terminate disconnect goruotines in tests * swarm/network/stream: reduce memory consumption when testing data races * swarm/network/stream: add watchDisconnections helper function * swarm/network/stream: add concurrent counter for tests * swarm/network/stream: rename race/norace test files and use const * swarm/network/stream: remove watchSim and its panic * swarm/network/stream: pass context in watchDisconnections * swarm/network/stream: add concurrent safe bool for watchDisconnections * swarm/storage: fix LDBStore.batchesC data race by not closing it
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r--swarm/network/stream/common_test.go62
1 files changed, 58 insertions, 4 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 8a7d851fb..afd08d275 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
- return nil, nil, nil, func() {}, err
+ return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
@@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
delivery := NewDelivery(to, netStore)
@@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
}
protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
- err = waitForPeers(streamer, 1*time.Second, 1)
+ err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
+ teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}
@@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
}
return store, datadir, nil
}
+
+// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
+// disconnected to true in case of a disconnect event.
+func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ ctx,
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Drop(),
+ )
+ disconnected = new(boolean)
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-disconnections:
+ if d.Error != nil {
+ log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
+ } else {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
+ }
+ disconnected.set(true)
+ }
+ }
+ }()
+ return disconnected
+}
+
+// boolean is used to concurrently set
+// and read a boolean value.
+type boolean struct {
+ v bool
+ mu sync.RWMutex
+}
+
+// set sets the value.
+func (b *boolean) set(v bool) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.v = v
+}
+
+// bool reads the value.
+func (b *boolean) bool() bool {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ return b.v
+}