diff options
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/discovery.go | 9 | ||||
-rw-r--r-- | swarm/network/protocol.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/common_test.go | 14 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 47 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 24 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 29 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 40 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 10 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 42 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 5 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 9 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 2 |
13 files changed, 152 insertions, 85 deletions
diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index c0868410c..55bf7c033 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -17,6 +17,7 @@ package network import ( + "context" "fmt" "sync" @@ -48,7 +49,7 @@ func newDiscovery(p *BzzPeer, o Overlay) *discPeer { } // HandleMsg is the message handler that delegates incoming messages -func (d *discPeer) HandleMsg(msg interface{}) error { +func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *peersMsg: @@ -99,14 +100,14 @@ func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) { resp := &peersMsg{ Peers: []*BzzAddr{ToAddr(a)}, } - go d.Send(resp) + go d.Send(context.TODO(), resp) } // NotifyDepth sends a subPeers Msg to the receiver notifying them about // a change in the depth of saturation func (d *discPeer) NotifyDepth(po uint8) { // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po)) - go d.Send(&subPeersMsg{Depth: po}) + go d.Send(context.TODO(), &subPeersMsg{Depth: po}) } /* @@ -178,7 +179,7 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { }) if len(peers) > 0 { // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d)) - go d.Send(&peersMsg{Peers: peers}) + go d.Send(context.TODO(), &peersMsg{Peers: peers}) } } d.sentPeers = true diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 39673f5a1..8bf81fde6 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -82,9 +82,9 @@ type Peer interface { type Conn interface { ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages - Send(interface{}) error // can send messages + Send(context.Context, interface{}) error // can send messages Drop(error) // disconnect this peer - Run(func(interface{}) error) error // the run function to run a protocol + Run(func(context.Context, interface{}) error) error // the run function to run a protocol Off() OverlayAddr } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 6a2c27401..4d55c6ee3 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -126,7 +126,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { return testRegistry, nil } -func defaultRetrieveFunc(id discover.NodeID) func(chunk *storage.Chunk) error { +func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { return nil } @@ -217,14 +217,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(chunk) + rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { @@ -369,8 +369,8 @@ func newTestExternalClient(db *storage.DBAPI) *testExternalClient { } } -func (c *testExternalClient) NeedData(hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(hash) +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { + chunk, _ := c.db.GetOrCreateRequest(ctx, hash) if chunk.ReqC == nil { return nil } @@ -429,7 +429,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6 return b, from, to, nil, nil } -func (s *testExternalServer) GetData([]byte) ([]byte, error) { +func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { return make([]byte, 4096), nil } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 75aabad6c..fa210e300 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "time" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -118,8 +121,8 @@ func (s *SwarmChunkServer) Close() { } // GetData retrives chunk data from db store -func (s *SwarmChunkServer) GetData(key []byte) ([]byte, error) { - chunk, err := s.db.Get(storage.Address(key)) +func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { + chunk, err := s.db.Get(ctx, storage.Address(key)) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { @@ -134,25 +137,37 @@ type RetrieveRequestMsg struct { SkipCheck bool } -func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error { +func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) handleRetrieveRequestMsgCount.Inc(1) + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "retrieve.request") + defer osp.Finish() + s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false)) if err != nil { return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(req.Addr) + chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) if chunk.ReqC != nil { if created { - if err := d.RequestFromPeers(chunk.Addr[:], true, sp.ID()); err != nil { + if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) chunk.SetErrored(storage.ErrChunkForward) return nil } } go func() { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "waiting.delivery") + defer osp.Finish() + t := time.NewTimer(10 * time.Minute) defer t.Stop() @@ -169,7 +184,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e chunk.SetErrored(nil) if req.SkipCheck { - err := sp.Deliver(chunk, s.priority) + err := sp.Deliver(ctx, chunk, s.priority) if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) sp.Drop(err) @@ -185,7 +200,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - return sp.Deliver(chunk, s.priority) + return sp.Deliver(ctx, chunk, s.priority) } streamer.deliveryC <- chunk.Addr[:] return nil @@ -197,7 +212,13 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } -func (d *Delivery) handleChunkDeliveryMsg(sp *Peer, req *ChunkDeliveryMsg) error { +func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "chunk.delivery") + defer osp.Finish() + req.peer = sp d.receiveC <- req return nil @@ -209,7 +230,7 @@ R: processReceivedChunksCount.Inc(1) // this should be has locally - chunk, err := d.db.Get(req.Addr) + chunk, err := d.db.Get(context.TODO(), req.Addr) if err == nil { continue R } @@ -224,7 +245,7 @@ R: default: } chunk.SData = req.SData - d.db.Put(chunk) + d.db.Put(context.TODO(), chunk) go func(req *ChunkDeliveryMsg) { err := chunk.WaitToStore() @@ -236,10 +257,11 @@ R: } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { +func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { var success bool var err error requestFromPeersCount.Inc(1) + d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool { spId := p.(network.Peer).ID() for _, p := range peersToSkip { @@ -253,8 +275,7 @@ func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ... log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) return true } - // TODO: skip light nodes that do not accept retrieve requests - err = sp.SendPriority(&RetrieveRequestMsg{ + err = sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: hash, SkipCheck: skipCheck, }, Top) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index cd87557b1..f3da893a2 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -46,7 +46,7 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(hash0[:], true) + streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -80,7 +80,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(peerID) - peer.handleSubscribeMsg(&SubscribeMsg{ + peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ Stream: NewStream(swarmChunkServerStreamName, "", false), History: nil, Priority: Top, @@ -131,7 +131,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { stream := NewStream(swarmChunkServerStreamName, "", false) - peer.handleSubscribeMsg(&SubscribeMsg{ + peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ Stream: stream, History: nil, Priority: Top, @@ -140,7 +140,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash := storage.Address(hash0[:]) chunk := storage.NewChunk(hash, nil) chunk.SData = hash - localStore.Put(chunk) + localStore.Put(context.TODO(), chunk) chunk.WaitToStore() err = tester.TestExchanges(p2ptest.Exchange{ @@ -179,7 +179,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash = storage.Address(hash1[:]) chunk = storage.NewChunk(hash, nil) chunk.SData = hash1[:] - localStore.Put(chunk) + localStore.Put(context.TODO(), chunk) chunk.WaitToStore() err = tester.TestExchanges(p2ptest.Exchange{ @@ -234,7 +234,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(chunkKey) + chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) if !created { t.Fatal("chunk already exists") @@ -285,7 +285,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { case <-chunk.ReqC: } - storedChunk, err := localStore.Get(chunkKey) + storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -401,8 +401,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } // create a retriever FileStore for the pivot node delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(chunk *storage.Chunk) error { - return delivery.RequestFromPeers(chunk.Addr[:], skipCheck) + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) @@ -617,8 +617,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip // create a retriever FileStore for the pivot node // by now deliveries are set for each node by the streamer service delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(chunk *storage.Chunk) error { - return delivery.RequestFromPeers(chunk.Addr[:], skipCheck) + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) @@ -650,7 +650,7 @@ Loop: errs := make(chan error) for _, hash := range hashes { go func(h storage.Address) { - _, err := netStore.Get(h) + _, err := netStore.Get(ctx, h) log.Warn("test check netstore get", "hash", h, "err", err) errs <- err }(hash) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 5668a73e9..a19f63589 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "fmt" "sync" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Stream defines a unique stream identifier. @@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct { Priority uint8 // delivered on priority channel } -func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) { +func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) { log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream)) return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority) } -func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) { +func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) { metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1) defer func() { if err != nil { - if e := p.Send(SubscribeErrorMsg{ + if e := p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), }); e != nil { log.Error("send stream subscribe error message", "err", err) @@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string { // handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface // Filter method -func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { +func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "handle.offered.hashes") + defer sp.Finish() + c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] - if wait := c.NeedData(hash); wait != nil { + if wait := c.NeedData(ctx, hash); wait != nil { want.Set(i/HashSize, true) wg.Add(1) // create request and wait until the chunk data arrives and is stored @@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(msg, c.priority) + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority err, so dropping peer", "err", err) p.Drop(err) @@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string { // handleWantedHashesMsg protocol msg handler // * sends the next batch of unsynced keys // * sends the actual data chunks as per WantedHashesMsg -func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { +func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1) log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) @@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1) hash := hashes[i*HashSize : (i+1)*HashSize] - data, err := s.GetData(hash) + data, err := s.GetData(ctx, hash) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } @@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - if err := p.Deliver(chunk, s.priority); err != nil { + if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } } @@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string { return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig) } -func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error { +func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error { _, err := p.getServer(req.Stream) // store the strongest takeoverproof for the stream in streamer return err diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 29984a911..80b9ab711 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -27,8 +27,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) var sendTimeout = 30 * time.Second @@ -62,6 +64,11 @@ type Peer struct { quit chan struct{} } +type WrappedPriorityMsg struct { + Context context.Context + Msg interface{} +} + // NewPeer is the constructor for Peer func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { p := &Peer{ @@ -74,7 +81,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { quit: make(chan struct{}), } ctx, cancel := context.WithCancel(context.Background()) - go p.pq.Run(ctx, func(i interface{}) { p.Send(i) }) + go p.pq.Run(ctx, func(i interface{}) { + wmsg := i.(WrappedPriorityMsg) + p.Send(wmsg.Context, wmsg.Msg) + }) go func() { <-p.quit cancel() @@ -83,25 +93,41 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "send.chunk.delivery") + defer sp.Finish() + msg := &ChunkDeliveryMsg{ Addr: chunk.Addr, SData: chunk.SData, } - return p.SendPriority(msg, priority) + return p.SendPriority(ctx, msg, priority) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(msg interface{}, priority uint8) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) defer cancel() - return p.pq.Push(ctx, msg, int(priority)) + wmsg := WrappedPriorityMsg{ + Context: ctx, + Msg: msg, + } + return p.pq.Push(cctx, wmsg, int(priority)) } // SendOfferedHashes sends OfferedHashesMsg protocol msg func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { + var sp opentracing.Span + ctx, sp := spancontext.StartSpan( + context.TODO(), + "send.offered.hashes") + defer sp.Finish() + hashes, from, to, proof, err := s.SetNextBatch(f, t) if err != nil { return err @@ -124,7 +150,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(msg, s.priority) + return p.SendPriority(ctx, msg, s.priority) } func (p *Peer) getServer(s Stream) (*server, error) { diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index da5253e8a..9961a0bc7 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -55,10 +55,10 @@ func initRetrievalTest() { //deliveries for each node deliveries = make(map[discover.NodeID]*Delivery) //global retrieve func - getRetrieveFunc = func(id discover.NodeID) func(chunk *storage.Chunk) error { - return func(chunk *storage.Chunk) error { + getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { + return func(ctx context.Context, chunk *storage.Chunk) error { skipCheck := true - return deliveries[id].RequestFromPeers(chunk.Addr[:], skipCheck) + return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } } //registries, map of discover.NodeID to its streamer @@ -412,7 +412,7 @@ func runFileRetrievalTest(nodeCount int) error { for i, hash := range conf.hashes { reader, _ := fileStore.Retrieve(context.TODO(), hash) //check that we can read the file size and that it corresponds to the generated file size - if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) { + if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) { allSuccess = false log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) } else { @@ -699,7 +699,7 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { for _, chnk := range conf.hashes { reader, _ := fileStore.Retrieve(context.TODO(), chnk) //assuming that reading the Size of the chunk is enough to know we found it - if s, err := reader.Size(nil); err != nil || s != chunkSize { + if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize { allSuccess = false log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id) } else { diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index fd8863d43..0b5257c60 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -437,7 +437,7 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error { } else { //use the actual localstore lstore := stores[id] - _, err = lstore.Get(chunk) + _, err = lstore.Get(context.TODO(), chunk) } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 9b4658c51..56f242e91 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,8 +32,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -235,7 +237,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang if e, ok := err.(*notFoundError); ok && e.t == "server" { // request subscription only if the server for this stream is not created log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h) - return peer.Send(&RequestSubscriptionMsg{ + return peer.Send(context.TODO(), &RequestSubscriptionMsg{ Stream: s, History: h, Priority: prio, @@ -285,7 +287,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(msg, priority) + return peer.SendPriority(context.TODO(), msg, priority) } func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { @@ -299,7 +301,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { } log.Debug("Unsubscribe ", "peer", peerId, "stream", s) - if err := peer.Send(msg); err != nil { + if err := peer.Send(context.TODO(), msg); err != nil { return err } return peer.removeClient(s) @@ -320,11 +322,17 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { } log.Debug("Quit ", "peer", peerId, "stream", s) - return peer.Send(msg) + return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(chunk *storage.Chunk) error { - return r.delivery.RequestFromPeers(chunk.Addr[:], r.skipCheck) +func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "registry.retrieve") + defer sp.Finish() + + return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) } func (r *Registry) NodeInfo() interface{} { @@ -460,11 +468,11 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { } // HandleMsg is the message handler that delegates incoming messages -func (p *Peer) HandleMsg(msg interface{}) error { +func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *SubscribeMsg: - return p.handleSubscribeMsg(msg) + return p.handleSubscribeMsg(ctx, msg) case *SubscribeErrorMsg: return p.handleSubscribeErrorMsg(msg) @@ -473,22 +481,22 @@ func (p *Peer) HandleMsg(msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(msg) + return p.handleOfferedHashesMsg(ctx, msg) case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(msg) + return p.handleTakeoverProofMsg(ctx, msg) case *WantedHashesMsg: - return p.handleWantedHashesMsg(msg) + return p.handleWantedHashesMsg(ctx, msg) case *ChunkDeliveryMsg: - return p.streamer.delivery.handleChunkDeliveryMsg(p, msg) + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(p, msg) + return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) case *RequestSubscriptionMsg: - return p.handleRequestSubscription(msg) + return p.handleRequestSubscription(ctx, msg) case *QuitMsg: return p.handleQuitMsg(msg) @@ -508,7 +516,7 @@ type server struct { // Server interface for outgoing peer Streamer type Server interface { SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) - GetData([]byte) ([]byte, error) + GetData(context.Context, []byte) ([]byte, error) Close() } @@ -551,7 +559,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData([]byte) func() + NeedData(context.Context, []byte) func() BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } @@ -588,7 +596,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(tp, c.priority); err != nil { + if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 44622c995..7523860c9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -18,6 +18,7 @@ package stream import ( "bytes" + "context" "testing" "time" @@ -79,7 +80,7 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { return func() { @@ -114,7 +115,7 @@ func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, ui return make([]byte, HashSize), from + 1, to + 1, nil, nil } -func (self *testServer) GetData([]byte) ([]byte, error) { +func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { return nil, nil } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 5510b2409..d7febe4a3 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -17,6 +17,7 @@ package stream import ( + "context" "math" "strconv" "time" @@ -78,8 +79,8 @@ func (s *SwarmSyncerServer) Close() { } // GetSection retrieves the actual chunk from localstore -func (s *SwarmSyncerServer) GetData(key []byte) ([]byte, error) { - chunk, err := s.db.Get(storage.Address(key)) +func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { + chunk, err := s.db.Get(ctx, storage.Address(key)) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { @@ -210,8 +211,8 @@ func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { } // NeedData -func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(key) +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { + chunk, _ := s.db.GetOrCreateRequest(ctx, key) // TODO: we may want to request from this peer anyway even if the request exists // ignoreExistingRequest is temporary commented out until its functionality is verified. diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 5fea7befe..a3d53e648 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -231,7 +231,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck for j := i; j < nodes; j++ { total += len(hashes[j]) for _, key := range hashes[j] { - chunk, err := dbs[i].Get(key) + chunk, err := dbs[i].Get(ctx, key) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { |