diff options
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r-- | swarm/network/stream/delivery.go | 179 |
1 files changed, 57 insertions, 122 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index bc4f1f665..1b4a14ea2 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -20,9 +20,11 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -32,11 +34,6 @@ import ( olog "github.com/opentracing/opentracing-go/log" ) -const ( - swarmChunkServerStreamName = "RETRIEVE_REQUEST" - deliveryCap = 32 -) - var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) @@ -44,93 +41,25 @@ var ( requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) type Delivery struct { - chunkStore storage.SyncChunkStore - kad *network.Kademlia - getPeer func(enode.ID) *Peer + netStore *storage.NetStore + kad *network.Kademlia + getPeer func(enode.ID) *Peer + quit chan struct{} } -func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { +func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { return &Delivery{ - chunkStore: chunkStore, - kad: kad, + netStore: netStore, + kad: kad, + quit: make(chan struct{}), } } -// SwarmChunkServer implements Server -type SwarmChunkServer struct { - deliveryC chan []byte - batchC chan []byte - chunkStore storage.ChunkStore - currentLen uint64 - quit chan struct{} -} - -// NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { - s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - chunkStore: chunkStore, - quit: make(chan struct{}), - } - go s.processDeliveries() - return s -} - -// processDeliveries handles delivered chunk hashes -func (s *SwarmChunkServer) processDeliveries() { - var hashes []byte - var batchC chan []byte - for { - select { - case <-s.quit: - return - case hash := <-s.deliveryC: - hashes = append(hashes, hash...) - batchC = s.batchC - case batchC <- hashes: - hashes = nil - batchC = nil - } - } -} - -// SessionIndex returns zero in all cases for SwarmChunkServer. -func (s *SwarmChunkServer) SessionIndex() (uint64, error) { - return 0, nil -} - -// SetNextBatch -func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { - select { - case hashes = <-s.batchC: - case <-s.quit: - return - } - - from = s.currentLen - s.currentLen += uint64(len(hashes)) - to = s.currentLen - return -} - -// Close needs to be called on a stream server -func (s *SwarmChunkServer) Close() { - close(s.quit) -} - -// GetData retrives chunk data from db store -func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) - if err != nil { - return nil, err - } - return chunk.Data(), nil -} - // RetrieveRequestMsg is the protocol msg for chunk retrieve requests type RetrieveRequestMsg struct { Addr storage.Address @@ -149,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * osp.LogFields(olog.String("ref", req.Addr.String())) - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) - if err != nil { - return err - } - streamer := s.Server.(*SwarmChunkServer) - var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future ctx = context.WithValue(ctx, "peer", sp.ID().String()) @@ -164,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { select { case <-ctx.Done(): - case <-streamer.quit: + case <-d.quit: } cancel() }() go func() { defer osp.Finish() - chunk, err := d.chunkStore.Get(ctx, req.Addr) + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } - if req.SkipCheck { - syncing := false - osp.LogFields(olog.Bool("skipCheck", true)) + syncing := false - err = sp.Deliver(ctx, chunk, s.priority, syncing) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) - return - } - osp.LogFields(olog.Bool("skipCheck", false)) - select { - case streamer.deliveryC <- chunk.Address()[:]: - case <-streamer.quit: + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - + osp.LogFields(olog.Bool("delivered", true)) }() return nil @@ -216,7 +129,7 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg type ChunkDeliveryMsgSyncing ChunkDeliveryMsg // chunk delivery msg is response to retrieverequest msg -func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { +func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req interface{}) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( ctx, @@ -224,36 +137,58 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch processReceivedChunksCount.Inc(1) - // retrieve the span for the originating retrieverequest - spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) - span := tracing.ShiftSpanByKey(spanId) + // record the last time we received a chunk delivery message + lastReceivedChunksMsg.Update(time.Now().UnixNano()) + + var msg *ChunkDeliveryMsg + var mode chunk.ModePut + switch r := req.(type) { + case *ChunkDeliveryMsgRetrieval: + msg = (*ChunkDeliveryMsg)(r) + peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr) + po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr) + depth := d.kad.NeighbourhoodDepth() + // chunks within the area of responsibility should always sync + // https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125 + if po >= depth || peerPO < po { + mode = chunk.ModePutSync + } else { + // do not sync if peer that is sending us a chunk is closer to the chunk then we are + mode = chunk.ModePutRequest + } + case *ChunkDeliveryMsgSyncing: + msg = (*ChunkDeliveryMsg)(r) + mode = chunk.ModePutSync + case *ChunkDeliveryMsg: + msg = r + mode = chunk.ModePutSync + } - log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID()) + log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) go func() { defer osp.Finish() - if span != nil { - span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) - defer span.Finish() - } - - req.peer = sp - log.Trace("handle.chunk.delivery", "put", req.Addr) - err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + msg.peer = sp + log.Trace("handle.chunk.delivery", "put", msg.Addr) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs // TODO: Enable this log line - // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) - req.peer.Drop(err) + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) + msg.peer.Drop() } } - log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err) + log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) }() return nil } +func (d *Delivery) Close() { + close(d.quit) +} + // RequestFromPeers sends a chunk retrieve request to a peer // The most eligible peer that hasn't already been sent to is chosen // TODO: define "eligible" |