diff options
author | Janoš Guljaš <janos@users.noreply.github.com> | 2019-02-13 20:03:23 +0800 |
---|---|---|
committer | Rafael Matias <rafael@skyle.net> | 2019-02-19 20:11:52 +0800 |
commit | 8ea3d8ad90f90e7233e829ad141acfd9d911658c (patch) | |
tree | ed6e0fcb16c90c460052a62a66d3aa253ea3498d /swarm/network/stream/common_test.go | |
parent | a0127019c3d516e8d8cf83839583bcf71af763e0 (diff) | |
download | dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar.gz dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar.bz2 dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar.lz dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar.xz dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.tar.zst dexon-8ea3d8ad90f90e7233e829ad141acfd9d911658c.zip |
swarm: fix network/stream data races (#19051)
* swarm/network/stream: newStreamerTester cleanup only if err is nil
* swarm/network/stream: raise newStreamerTester waitForPeers timeout
* swarm/network/stream: fix data races in GetPeerSubscriptions
* swarm/storage: prevent data race on LDBStore.batchesC
https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461775049
* swarm/network/stream: fix TestGetSubscriptionsRPC data race
https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461768477
* swarm/network/stream: correctly use Simulation.Run callback
https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461783804
* swarm/network: protect addrCountC in Kademlia.AddrCountC function
https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462273444
* p2p/simulations: fix a deadlock calling getRandomNode with lock
https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462317407
* swarm/network/stream: terminate disconnect goruotines in tests
* swarm/network/stream: reduce memory consumption when testing data races
* swarm/network/stream: add watchDisconnections helper function
* swarm/network/stream: add concurrent counter for tests
* swarm/network/stream: rename race/norace test files and use const
* swarm/network/stream: remove watchSim and its panic
* swarm/network/stream: pass context in watchDisconnections
* swarm/network/stream: add concurrent safe bool for watchDisconnections
* swarm/storage: fix LDBStore.batchesC data race by not closing it
(cherry picked from commit 3fd6db2bf63ce90232de445c7f33943406a5e634)
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r-- | swarm/network/stream/common_test.go | 62 |
1 files changed, 58 insertions, 4 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 8a7d851fb..afd08d275 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste // temp datadir datadir, err := ioutil.TempDir("", "streamer") if err != nil { - return nil, nil, nil, func() {}, err + return nil, nil, nil, nil, err } removeDataDir := func() { os.RemoveAll(datadir) @@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste localStore, err := storage.NewTestLocalStoreForAddr(params) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } netStore, err := storage.NewNetStore(localStore, nil) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } delivery := NewDelivery(to, netStore) @@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste } protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol) - err = waitForPeers(streamer, 1*time.Second, 1) + err = waitForPeers(streamer, 10*time.Second, 1) if err != nil { + teardown() return nil, nil, nil, nil, errors.New("timeout: peer is not created") } @@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch } return store, datadir, nil } + +// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value +// disconnected to true in case of a disconnect event. +func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) { + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + ctx, + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Drop(), + ) + disconnected = new(boolean) + go func() { + for { + select { + case <-ctx.Done(): + return + case d := <-disconnections: + if d.Error != nil { + log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error) + } else { + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) + } + disconnected.set(true) + } + } + }() + return disconnected +} + +// boolean is used to concurrently set +// and read a boolean value. +type boolean struct { + v bool + mu sync.RWMutex +} + +// set sets the value. +func (b *boolean) set(v bool) { + b.mu.Lock() + defer b.mu.Unlock() + + b.v = v +} + +// bool reads the value. +func (b *boolean) bool() bool { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.v +} |