aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go45
1 files changed, 36 insertions, 9 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 65bcce8b9..622b46e4c 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -95,6 +95,7 @@ type Registry struct {
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
+ quit chan struct{} // terminates registry goroutines
}
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -117,6 +118,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
// check if retrieval has been disabled
retrieval := options.Retrieval != RetrievalDisabled
+ quit := make(chan struct{})
+
streamer := &Registry{
addr: localID,
skipCheck: options.SkipCheck,
@@ -128,6 +131,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
balance: balance,
+ quit: quit,
}
streamer.setupSpec()
@@ -172,25 +176,41 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
go func() {
defer close(out)
- for i := range in {
+ for {
select {
- case <-out:
- default:
+ case i, ok := <-in:
+ if !ok {
+ return
+ }
+ select {
+ case <-out:
+ default:
+ }
+ out <- i
+ case <-quit:
+ return
}
- out <- i
}
}()
return out
}
+ kad := streamer.delivery.kad
+ // get notification channels from Kademlia before returning
+ // from this function to avoid race with Close method and
+ // the goroutine created below
+ depthC := latestIntC(kad.NeighbourhoodDepthC())
+ addressBookSizeC := latestIntC(kad.AddrCountC())
+
go func() {
// wait for kademlia table to be healthy
- time.Sleep(options.SyncUpdateDelay)
-
- kad := streamer.delivery.kad
- depthC := latestIntC(kad.NeighbourhoodDepthC())
- addressBookSizeC := latestIntC(kad.AddrCountC())
+ // but return if Registry is closed before
+ select {
+ case <-time.After(options.SyncUpdateDelay):
+ case <-quit:
+ return
+ }
// initial requests for syncing subscription to peers
streamer.updateSyncing()
@@ -229,6 +249,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
<-timer.C
}
timer.Reset(options.SyncUpdateDelay)
+ case <-quit:
+ break loop
}
}
timer.Stop()
@@ -398,6 +420,11 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error {
}
func (r *Registry) Close() error {
+ // Stop sending neighborhood depth change and address count
+ // change from Kademlia that were initiated in NewRegistry constructor.
+ r.delivery.kad.CloseNeighbourhoodDepthC()
+ r.delivery.kad.CloseAddrCountC()
+ close(r.quit)
return r.intervalsStore.Close()
}