aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r--swarm/network/stream/delivery.go179
1 files changed, 57 insertions, 122 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index bc4f1f665..1b4a14ea2 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -20,9 +20,11 @@ import (
"context"
"errors"
"fmt"
+ "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -32,11 +34,6 @@ import (
olog "github.com/opentracing/opentracing-go/log"
)
-const (
- swarmChunkServerStreamName = "RETRIEVE_REQUEST"
- deliveryCap = 32
-)
-
var (
processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil)
handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil)
@@ -44,93 +41,25 @@ var (
requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil)
requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil)
+
+ lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)
)
type Delivery struct {
- chunkStore storage.SyncChunkStore
- kad *network.Kademlia
- getPeer func(enode.ID) *Peer
+ netStore *storage.NetStore
+ kad *network.Kademlia
+ getPeer func(enode.ID) *Peer
+ quit chan struct{}
}
-func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
+func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
return &Delivery{
- chunkStore: chunkStore,
- kad: kad,
+ netStore: netStore,
+ kad: kad,
+ quit: make(chan struct{}),
}
}
-// SwarmChunkServer implements Server
-type SwarmChunkServer struct {
- deliveryC chan []byte
- batchC chan []byte
- chunkStore storage.ChunkStore
- currentLen uint64
- quit chan struct{}
-}
-
-// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
- s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- chunkStore: chunkStore,
- quit: make(chan struct{}),
- }
- go s.processDeliveries()
- return s
-}
-
-// processDeliveries handles delivered chunk hashes
-func (s *SwarmChunkServer) processDeliveries() {
- var hashes []byte
- var batchC chan []byte
- for {
- select {
- case <-s.quit:
- return
- case hash := <-s.deliveryC:
- hashes = append(hashes, hash...)
- batchC = s.batchC
- case batchC <- hashes:
- hashes = nil
- batchC = nil
- }
- }
-}
-
-// SessionIndex returns zero in all cases for SwarmChunkServer.
-func (s *SwarmChunkServer) SessionIndex() (uint64, error) {
- return 0, nil
-}
-
-// SetNextBatch
-func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) {
- select {
- case hashes = <-s.batchC:
- case <-s.quit:
- return
- }
-
- from = s.currentLen
- s.currentLen += uint64(len(hashes))
- to = s.currentLen
- return
-}
-
-// Close needs to be called on a stream server
-func (s *SwarmChunkServer) Close() {
- close(s.quit)
-}
-
-// GetData retrives chunk data from db store
-func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
- if err != nil {
- return nil, err
- }
- return chunk.Data(), nil
-}
-
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type RetrieveRequestMsg struct {
Addr storage.Address
@@ -149,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
osp.LogFields(olog.String("ref", req.Addr.String()))
- s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
- if err != nil {
- return err
- }
- streamer := s.Server.(*SwarmChunkServer)
-
var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx = context.WithValue(ctx, "peer", sp.ID().String())
@@ -164,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
go func() {
select {
case <-ctx.Done():
- case <-streamer.quit:
+ case <-d.quit:
}
cancel()
}()
go func() {
defer osp.Finish()
- chunk, err := d.chunkStore.Get(ctx, req.Addr)
+ ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
return
}
- if req.SkipCheck {
- syncing := false
- osp.LogFields(olog.Bool("skipCheck", true))
+ syncing := false
- err = sp.Deliver(ctx, chunk, s.priority, syncing)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
- }
- osp.LogFields(olog.Bool("delivered", true))
- return
- }
- osp.LogFields(olog.Bool("skipCheck", false))
- select {
- case streamer.deliveryC <- chunk.Address()[:]:
- case <-streamer.quit:
+ err = sp.Deliver(ctx, ch, Top, syncing)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
-
+ osp.LogFields(olog.Bool("delivered", true))
}()
return nil
@@ -216,7 +129,7 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
// chunk delivery msg is response to retrieverequest msg
-func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
+func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req interface{}) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
@@ -224,36 +137,58 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
processReceivedChunksCount.Inc(1)
- // retrieve the span for the originating retrieverequest
- spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
- span := tracing.ShiftSpanByKey(spanId)
+ // record the last time we received a chunk delivery message
+ lastReceivedChunksMsg.Update(time.Now().UnixNano())
+
+ var msg *ChunkDeliveryMsg
+ var mode chunk.ModePut
+ switch r := req.(type) {
+ case *ChunkDeliveryMsgRetrieval:
+ msg = (*ChunkDeliveryMsg)(r)
+ peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr)
+ po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr)
+ depth := d.kad.NeighbourhoodDepth()
+ // chunks within the area of responsibility should always sync
+ // https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125
+ if po >= depth || peerPO < po {
+ mode = chunk.ModePutSync
+ } else {
+ // do not sync if peer that is sending us a chunk is closer to the chunk then we are
+ mode = chunk.ModePutRequest
+ }
+ case *ChunkDeliveryMsgSyncing:
+ msg = (*ChunkDeliveryMsg)(r)
+ mode = chunk.ModePutSync
+ case *ChunkDeliveryMsg:
+ msg = r
+ mode = chunk.ModePutSync
+ }
- log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())
+ log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID())
go func() {
defer osp.Finish()
- if span != nil {
- span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
- defer span.Finish()
- }
-
- req.peer = sp
- log.Trace("handle.chunk.delivery", "put", req.Addr)
- err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
+ msg.peer = sp
+ log.Trace("handle.chunk.delivery", "put", msg.Addr)
+ _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
// TODO: Enable this log line
- // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
- req.peer.Drop(err)
+ // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, )
+ msg.peer.Drop()
}
}
- log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
+ log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
}()
return nil
}
+func (d *Delivery) Close() {
+ close(d.quit)
+}
+
// RequestFromPeers sends a chunk retrieve request to a peer
// The most eligible peer that hasn't already been sent to is chosen
// TODO: define "eligible"