aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/fetcher.go36
-rw-r--r--swarm/network/fetcher_test.go113
-rw-r--r--swarm/network/stream/delivery.go7
-rw-r--r--swarm/network/stream/messages.go2
-rw-r--r--swarm/network/stream/peer.go26
-rw-r--r--swarm/network/stream/stream.go5
6 files changed, 99 insertions, 90 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index 3043f6475..6b2175166 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -52,6 +52,7 @@ type Fetcher struct {
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
+ ctx context.Context
}
type Request struct {
@@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
-func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
- fetcher := NewFetcher(source, f.request, f.skipCheck)
- go fetcher.run(ctx, peersToSkip)
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
+ go fetcher.run(peers)
return fetcher
}
// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
-func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
@@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
+ ctx: ctx,
}
}
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
-func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
+func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
-func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
+func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
-func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested
// all Fetcher context closed, can quit
- case <-ctx.Done():
+ case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if doRequest {
var err error
- sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
+ sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
-func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
+func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource {
req.Source = nil
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select {
case <-quit:
gone <- sourceID
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}()
return sources, nil
diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go
index 563c6cece..4e464f10f 100644
--- a/swarm/network/fetcher_test.go
+++ b/swarm/network/fetcher_test.go
@@ -69,7 +69,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
func TestFetcherSingleRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peers := []string{"a", "b", "c", "d"}
peersToSkip := &sync.Map{}
@@ -77,13 +81,9 @@ func TestFetcherSingleRequest(t *testing.T) {
peersToSkip.Store(p, time.Now())
}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
select {
case request := <-requester.requestC:
@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
func TestFetcherCancelStopsFetcher(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// we start the fetcher, and then we immediately cancel the context
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
cancel()
- rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
- defer rcancel()
// we call Request with an active context
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
func TestFetcherCancelStopsRequest(t *testing.T) {
+ t.Skip("since context is now per fetcher, this test is likely redundant")
+
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- // we start the fetcher with an active context
- go fetcher.run(ctx, peersToSkip)
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
- rctx, rcancel := context.WithCancel(context.Background())
- rcancel()
+ peersToSkip := &sync.Map{}
+
+ // we start the fetcher with an active context
+ go fetcher.run(peersToSkip)
// we call Request with a cancelled context
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
}
// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
- rctx = context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
select {
case <-requester.requestC:
@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
func TestFetcherOfferUsesSource(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
- rctx := context.Background()
// call the Offer function with the source peer
- fetcher.Offer(rctx, &sourcePeerID)
+ fetcher.Offer(&sourcePeerID)
// fetcher should not initiate request
select {
@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
}
// call Request after the Offer
- rctx = context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// there should be exactly 1 request coming from fetcher
var request *Request
@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
// call Request first
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// there should be a request coming from fetcher
var request *Request
@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
}
// after the Request call Offer
- fetcher.Offer(context.Background(), &sourcePeerID)
+ fetcher.Offer(&sourcePeerID)
// there should be a request coming from fetcher
select {
@@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker
fetcher.searchTimeout = 250 * time.Millisecond
peersToSkip := &sync.Map{}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
// call the fetch function with an active context
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// after 100ms the first request should be initiated
time.Sleep(100 * time.Millisecond)
@@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {
fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
- fetcher.Request(context.Background(), 0)
+ fetcher.Request(0)
// check if the created fetchFunction really starts a fetcher and initiates a request
select {
@@ -353,7 +350,11 @@ func TestFetcherFactory(t *testing.T) {
func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout
@@ -361,13 +362,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
peersToSkip := &sync.Map{}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
select {
case <-requester.requestC:
@@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
func TestFetcherMaxHopCount(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- peersToSkip := &sync.Map{}
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
- go fetcher.run(ctx, peersToSkip)
+ peersToSkip := &sync.Map{}
- rctx := context.Background()
- fetcher.Request(rctx, maxHopCount)
+ go fetcher.run(peersToSkip)
// if hopCount is already at max no request should be initiated
select {
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 988afcce8..fae6994f0 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
ctx, osp = spancontext.StartSpan(
ctx,
"retrieve.request")
- defer osp.Finish()
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
}()
go func() {
+ defer osp.Finish()
chunk, err := d.chunkStore.Get(ctx, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
ctx, osp = spancontext.StartSpan(
ctx,
"chunk.delivery")
- defer osp.Finish()
processReceivedChunksCount.Inc(1)
go func() {
+ defer osp.Finish()
+
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
- }, Top)
+ }, Top, "request.from.peers")
if err != nil {
return nil, nil, err
}
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..de4e8a3bb 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(ctx, msg, c.priority)
+ err := p.SendPriority(ctx, msg, c.priority, "")
if err != nil {
log.Warn("SendPriority error", "err", err)
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 4bccf56f5..68da8f44a 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -65,6 +65,7 @@ type Peer struct {
// on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams
quit chan struct{}
+ spans sync.Map
}
type WrappedPriorityMsg struct {
@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
+ spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
+ defer p.spans.Delete(wmsg.Context)
+ sp, ok := p.spans.Load(wmsg.Context)
+ if ok {
+ defer sp.(opentracing.Span).Finish()
+ }
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
- var sp opentracing.Span
var msg interface{}
spanName := "send.chunk.delivery"
@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
}
spanName += ".retrieval"
}
- ctx, sp = spancontext.StartSpan(
- ctx,
- spanName)
- defer sp.Finish()
- return p.SendPriority(ctx, msg, priority)
+ return p.SendPriority(ctx, msg, priority, spanName)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
+ if traceId != "" {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ traceId,
+ )
+ p.spans.Store(ctx, sp)
+ }
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(ctx, msg, s.priority)
+ return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
}
func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index e06048053..ee4f57c1a 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.SendPriority(context.TODO(), msg, priority, "")
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -730,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil {
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+
+ if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {