diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 45 |
1 files changed, 36 insertions, 9 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 65bcce8b9..622b46e4c 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -95,6 +95,7 @@ type Registry struct { spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting prices protocols.Prices //implements protocols.Prices, provides prices to accounting + quit chan struct{} // terminates registry goroutines } // RegistryOptions holds optional values for NewRegistry constructor. @@ -117,6 +118,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy // check if retrieval has been disabled retrieval := options.Retrieval != RetrievalDisabled + quit := make(chan struct{}) + streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, @@ -128,6 +131,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, balance: balance, + quit: quit, } streamer.setupSpec() @@ -172,25 +176,41 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy go func() { defer close(out) - for i := range in { + for { select { - case <-out: - default: + case i, ok := <-in: + if !ok { + return + } + select { + case <-out: + default: + } + out <- i + case <-quit: + return } - out <- i } }() return out } + kad := streamer.delivery.kad + // get notification channels from Kademlia before returning + // from this function to avoid race with Close method and + // the goroutine created below + depthC := latestIntC(kad.NeighbourhoodDepthC()) + addressBookSizeC := latestIntC(kad.AddrCountC()) + go func() { // wait for kademlia table to be healthy - time.Sleep(options.SyncUpdateDelay) - - kad := streamer.delivery.kad - depthC := latestIntC(kad.NeighbourhoodDepthC()) - addressBookSizeC := latestIntC(kad.AddrCountC()) + // but return if Registry is closed before + select { + case <-time.After(options.SyncUpdateDelay): + case <-quit: + return + } // initial requests for syncing subscription to peers streamer.updateSyncing() @@ -229,6 +249,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy <-timer.C } timer.Reset(options.SyncUpdateDelay) + case <-quit: + break loop } } timer.Stop() @@ -398,6 +420,11 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { } func (r *Registry) Close() error { + // Stop sending neighborhood depth change and address count + // change from Kademlia that were initiated in NewRegistry constructor. + r.delivery.kad.CloseNeighbourhoodDepthC() + r.delivery.kad.CloseAddrCountC() + close(r.quit) return r.intervalsStore.Close() } |