aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r--swarm/network/stream/peer.go184
1 files changed, 175 insertions, 9 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 152814bd4..28fd06e4d 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -24,8 +24,10 @@ import (
"time"
"github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/network"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -54,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
- *protocols.Peer
+ *network.BzzPeer
streamer *Registry
pq *pq.PriorityQueue
serverMu sync.RWMutex
@@ -74,9 +76,9 @@ type WrappedPriorityMsg struct {
}
// NewPeer is the constructor for Peer
-func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
+func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer {
p := &Peer{
- Peer: peer,
+ BzzPeer: peer,
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: streamer,
servers: make(map[Stream]*server),
@@ -90,7 +92,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
- p.Drop(err)
+ p.Drop()
}
})
@@ -134,7 +136,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
var msg interface{}
- spanName := "send.chunk.delivery"
+ metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1)
//we send different types of messages if delivery is for syncing or retrievals,
//even if handling and content of the message are the same,
@@ -144,16 +146,13 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
Addr: chunk.Address(),
SData: chunk.Data(),
}
- spanName += ".syncing"
} else {
msg = &ChunkDeliveryMsgRetrieval{
Addr: chunk.Address(),
SData: chunk.Data(),
}
- spanName += ".retrieval"
}
- ctx = context.WithValue(ctx, "stream_send_tag", nil)
return p.SendPriority(ctx, msg, priority)
}
@@ -416,7 +415,174 @@ func (p *Peer) removeClientParams(s Stream) error {
}
func (p *Peer) close() {
+ p.serverMu.Lock()
+ defer p.serverMu.Unlock()
+
for _, s := range p.servers {
s.Close()
}
+
+ p.servers = nil
+}
+
+// runUpdateSyncing is a long running function that creates the initial
+// syncing subscriptions to the peer and waits for neighbourhood depth change
+// to create new ones or quit existing ones based on the new neighbourhood depth
+// and if peer enters or leaves nearest neighbourhood by using
+// syncSubscriptionsDiff and updateSyncSubscriptions functions.
+func (p *Peer) runUpdateSyncing() {
+ timer := time.NewTimer(p.streamer.syncUpdateDelay)
+ defer timer.Stop()
+
+ select {
+ case <-timer.C:
+ case <-p.streamer.quit:
+ return
+ }
+
+ kad := p.streamer.delivery.kad
+ po := chunk.Proximity(p.BzzAddr.Over(), kad.BaseAddr())
+
+ depth := kad.NeighbourhoodDepth()
+
+ log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth)
+
+ // initial subscriptions
+ p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay))
+
+ depthChangeSignal, unsubscribeDepthChangeSignal := kad.SubscribeToNeighbourhoodDepthChange()
+ defer unsubscribeDepthChangeSignal()
+
+ prevDepth := depth
+ for {
+ select {
+ case _, ok := <-depthChangeSignal:
+ if !ok {
+ return
+ }
+ // update subscriptions for this peer when depth changes
+ depth := kad.NeighbourhoodDepth()
+ log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth)
+ p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay))
+ prevDepth = depth
+ case <-p.streamer.quit:
+ return
+ }
+ }
+ log.Debug("update syncing subscriptions: exiting", "peer", p.ID())
+}
+
+// updateSyncSubscriptions accepts two slices of integers, the first one
+// representing proximity order bins for required syncing subscriptions
+// and the second one representing bins for syncing subscriptions that
+// need to be removed. This function sends request for subscription
+// messages and quit messages for provided bins.
+func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) {
+ if p.streamer.getPeer(p.ID()) == nil {
+ log.Debug("update syncing subscriptions", "peer not found", p.ID())
+ return
+ }
+ log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins)
+ for _, po := range subBins {
+ p.subscribeSync(po)
+ }
+ for _, po := range quitBins {
+ p.quitSync(po)
+ }
+}
+
+// subscribeSync send the request for syncing subscriptions to the peer
+// using subscriptionFunc. This function is used to request syncing subscriptions
+// when new peer is added to the registry and on neighbourhood depth change.
+func (p *Peer) subscribeSync(po int) {
+ err := subscriptionFunc(p.streamer, p.ID(), uint8(po))
+ if err != nil {
+ log.Error("subscription", "err", err)
+ }
+}
+
+// quitSync sends the quit message for live and history syncing streams to the peer.
+// This function is used in runUpdateSyncing indirectly over updateSyncSubscriptions
+// to remove unneeded syncing subscriptions on neighbourhood depth change.
+func (p *Peer) quitSync(po int) {
+ live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true)
+ history := getHistoryStream(live)
+ err := p.streamer.Quit(p.ID(), live)
+ if err != nil && err != p2p.ErrShuttingDown {
+ log.Error("quit", "err", err, "peer", p.ID(), "stream", live)
+ }
+ err = p.streamer.Quit(p.ID(), history)
+ if err != nil && err != p2p.ErrShuttingDown {
+ log.Error("quit", "err", err, "peer", p.ID(), "stream", history)
+ }
+
+ err = p.removeServer(live)
+ if err != nil {
+ log.Error("remove server", "err", err, "peer", p.ID(), "stream", live)
+ }
+ err = p.removeServer(history)
+ if err != nil {
+ log.Error("remove server", "err", err, "peer", p.ID(), "stream", live)
+ }
+}
+
+// syncSubscriptionsDiff calculates to which proximity order bins a peer
+// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth
+// change from prevDepth to newDepth. Max argument limits the number of
+// proximity order bins. Returned values are slices of integers which represent
+// proximity order bins, the first one to which additional subscriptions need to
+// be requested and the second one which subscriptions need to be quit. Argument
+// prevDepth with value less then 0 represents no previous depth, used for
+// initial syncing subscriptions.
+func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) {
+ newStart, newEnd := syncBins(peerPO, newDepth, max)
+ if prevDepth < 0 {
+ // no previous depth, return the complete range
+ // for subscriptions requests and nothing for quitting
+ return intRange(newStart, newEnd), nil
+ }
+
+ prevStart, prevEnd := syncBins(peerPO, prevDepth, max)
+
+ if newStart < prevStart {
+ subBins = append(subBins, intRange(newStart, prevStart)...)
+ }
+
+ if prevStart < newStart {
+ quitBins = append(quitBins, intRange(prevStart, newStart)...)
+ }
+
+ if newEnd < prevEnd {
+ quitBins = append(quitBins, intRange(newEnd, prevEnd)...)
+ }
+
+ if prevEnd < newEnd {
+ subBins = append(subBins, intRange(prevEnd, newEnd)...)
+ }
+
+ return subBins, quitBins
+}
+
+// syncBins returns the range to which proximity order bins syncing
+// subscriptions need to be requested, based on peer proximity and
+// kademlia neighbourhood depth. Returned range is [start,end), inclusive for
+// start and exclusive for end.
+func syncBins(peerPO, depth, max int) (start, end int) {
+ if peerPO < depth {
+ // subscribe only to peerPO bin if it is not
+ // in the nearest neighbourhood
+ return peerPO, peerPO + 1
+ }
+ // subscribe from depth to max bin if the peer
+ // is in the nearest neighbourhood
+ return depth, max + 1
+}
+
+// intRange returns the slice of integers [start,end). The start
+// is inclusive and the end is not.
+func intRange(start, end int) (r []int) {
+ for i := start; i < end; i++ {
+ r = append(r, i)
+ }
+ return r
}