aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/fetcher.go
diff options
context:
space:
mode:
authorBalint Gabor <balint.g@gmail.com>2018-09-25 23:35:54 +0800
committerJanos Guljas <janos@resenje.org>2018-09-26 17:34:40 +0800
commit3f7acbbeb929bc3a2a3073bae15977ec69761bab (patch)
treea1d371d0a8d043e51dff796ff04d0eb84599fa80 /swarm/network/fetcher.go
parentd3441ebb563439bac0837d70591f92e2c6080303 (diff)
downloaddexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.gz
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.bz2
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.lz
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.xz
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.tar.zst
dexon-3f7acbbeb929bc3a2a3073bae15977ec69761bab.zip
swarm: prevent forever running retrieve request loops
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r--swarm/network/fetcher.go24
1 files changed, 17 insertions, 7 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index 5b4b61c7e..6aed57e22 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -32,6 +32,8 @@ var searchTimeout = 1 * time.Second
// Also used in stream delivery.
var RequestTimeout = 10 * time.Second
+var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
+
type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
@@ -44,7 +46,7 @@ type Fetcher struct {
protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
addr storage.Address // the address of the chunk to be fetched
offerC chan *enode.ID // channel of sources (peer node id strings)
- requestC chan struct{}
+ requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
skipCheck bool
}
@@ -53,6 +55,7 @@ type Request struct {
Source *enode.ID // nodeID of peer to request from (can be nil)
SkipCheck bool // whether to offer the chunk first or deliver directly
peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil)
+ HopCount uint8 // number of forwarded requests (hops)
}
// NewRequest returns a new instance of Request based on chunk address skip check and
@@ -113,7 +116,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
addr: addr,
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
- requestC: make(chan struct{}),
+ requestC: make(chan uint8),
skipCheck: skipCheck,
}
}
@@ -136,7 +139,7 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
-func (f *Fetcher) Request(ctx context.Context) {
+func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
case <-ctx.Done():
@@ -144,10 +147,15 @@ func (f *Fetcher) Request(ctx context.Context) {
default:
}
+ if hopCount >= maxHopCount {
+ log.Debug("fetcher request hop count limit reached", "hops", hopCount)
+ return
+ }
+
// This select alone would not guarantee that we return of context is done, it could potentially
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
- case f.requestC <- struct{}{}:
+ case f.requestC <- hopCount + 1:
case <-ctx.Done():
}
}
@@ -161,6 +169,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
waitC <-chan time.Time // timer channel
sources []*enode.ID // known sources, ie. peers that offered the chunk
requested bool // true if the chunk was actually requested
+ hopCount uint8
)
gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected
@@ -183,7 +192,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested
// incoming request
- case <-f.requestC:
+ case hopCount = <-f.requestC:
log.Trace("new request", "request addr", f.addr)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
@@ -213,7 +222,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if doRequest {
var err error
- sources, err = f.doRequest(ctx, gone, peers, sources)
+ sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
@@ -251,7 +260,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
-func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID) ([]*enode.ID, error) {
+func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
@@ -260,6 +269,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
Addr: f.addr,
SkipCheck: f.skipCheck,
peersToSkip: peersToSkip,
+ HopCount: hopCount,
}
foundSource := false