diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-25 23:35:54 +0800 |
---|---|---|
committer | Janos Guljas <janos@resenje.org> | 2018-09-26 17:34:40 +0800 |
commit | 3f7acbbeb929bc3a2a3073bae15977ec69761bab (patch) | |
tree | a1d371d0a8d043e51dff796ff04d0eb84599fa80 /swarm/network/fetcher.go | |
parent | d3441ebb563439bac0837d70591f92e2c6080303 (diff) | |
download | dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.gz dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.bz2 dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.lz dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.xz dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.zst dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.zip |
swarm: prevent forever running retrieve request loops
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r-- | swarm/network/fetcher.go | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index 5b4b61c7e..6aed57e22 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -32,6 +32,8 @@ var searchTimeout = 1 * time.Second // Also used in stream delivery. var RequestTimeout = 10 * time.Second +var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops + type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error) // Fetcher is created when a chunk is not found locally. It starts a request handler loop once and @@ -44,7 +46,7 @@ type Fetcher struct { protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk addr storage.Address // the address of the chunk to be fetched offerC chan *enode.ID // channel of sources (peer node id strings) - requestC chan struct{} + requestC chan uint8 // channel for incoming requests (with the hopCount value in it) skipCheck bool } @@ -53,6 +55,7 @@ type Request struct { Source *enode.ID // nodeID of peer to request from (can be nil) SkipCheck bool // whether to offer the chunk first or deliver directly peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) + HopCount uint8 // number of forwarded requests (hops) } // NewRequest returns a new instance of Request based on chunk address skip check and @@ -113,7 +116,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { addr: addr, protoRequestFunc: rf, offerC: make(chan *enode.ID), - requestC: make(chan struct{}), + requestC: make(chan uint8), skipCheck: skipCheck, } } @@ -136,7 +139,7 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { } // Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. -func (f *Fetcher) Request(ctx context.Context) { +func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { // First we need to have this select to make sure that we return if context is done select { case <-ctx.Done(): @@ -144,10 +147,15 @@ func (f *Fetcher) Request(ctx context.Context) { default: } + if hopCount >= maxHopCount { + log.Debug("fetcher request hop count limit reached", "hops", hopCount) + return + } + // This select alone would not guarantee that we return of context is done, it could potentially // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) select { - case f.requestC <- struct{}{}: + case f.requestC <- hopCount + 1: case <-ctx.Done(): } } @@ -161,6 +169,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { waitC <-chan time.Time // timer channel sources []*enode.ID // known sources, ie. peers that offered the chunk requested bool // true if the chunk was actually requested + hopCount uint8 ) gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected @@ -183,7 +192,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { doRequest = requested // incoming request - case <-f.requestC: + case hopCount = <-f.requestC: log.Trace("new request", "request addr", f.addr) // 2) chunk is requested, set requested flag // launch a request iff none been launched yet @@ -213,7 +222,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // need to issue a new request if doRequest { var err error - sources, err = f.doRequest(ctx, gone, peers, sources) + sources, err = f.doRequest(ctx, gone, peers, sources, hopCount) if err != nil { log.Info("unable to request", "request addr", f.addr, "err", err) } @@ -251,7 +260,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // * the peer's address is added to the set of peers to skip // * the peer's address is removed from prospective sources, and // * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) -func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID) ([]*enode.ID, error) { +func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { var i int var sourceID *enode.ID var quit chan struct{} @@ -260,6 +269,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki Addr: f.addr, SkipCheck: f.skipCheck, peersToSkip: peersToSkip, + HopCount: hopCount, } foundSource := false |