aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r--swarm/network/stream/delivery.go231
1 files changed, 94 insertions, 137 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 627352535..d0f27eebc 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -19,12 +19,11 @@ package stream
import (
"context"
"errors"
- "time"
- "github.com/ethereum/go-ethereum/common"
+ "fmt"
+
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
- cp "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -46,39 +45,34 @@ var (
)
type Delivery struct {
- db *storage.DBAPI
- kad *network.Kademlia
- receiveC chan *ChunkDeliveryMsg
- getPeer func(discover.NodeID) *Peer
+ chunkStore storage.SyncChunkStore
+ kad *network.Kademlia
+ getPeer func(discover.NodeID) *Peer
}
-func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery {
- d := &Delivery{
- db: db,
- kad: kad,
- receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
+func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
+ return &Delivery{
+ chunkStore: chunkStore,
+ kad: kad,
}
-
- go d.processReceivedChunks()
- return d
}
// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
- db *storage.DBAPI
+ chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}
// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer {
+func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- db: db,
- quit: make(chan struct{}),
+ deliveryC: make(chan []byte, deliveryCap),
+ batchC: make(chan []byte),
+ chunkStore: chunkStore,
+ quit: make(chan struct{}),
}
go s.processDeliveries()
return s
@@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() {
// GetData retrives chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
@@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
- if chunk.ReqC != nil {
- if created {
- if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
- log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
- chunk.SetErrored(storage.ErrChunkForward)
- return nil
- }
+
+ var cancel func()
+ // TODO: do something with this hardcoded timeout, maybe use TTL in the future
+ ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
+
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-streamer.quit:
}
- go func() {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "waiting.delivery")
- defer osp.Finish()
-
- t := time.NewTimer(10 * time.Minute)
- defer t.Stop()
-
- log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created)
- start := time.Now()
- select {
- case <-chunk.ReqC:
- log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start))
- case <-t.C:
- log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr)
- chunk.SetErrored(storage.ErrChunkTimeout)
- return
- }
- chunk.SetErrored(nil)
-
- if req.SkipCheck {
- err := sp.Deliver(ctx, chunk, s.priority)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
- sp.Drop(err)
- }
+ cancel()
+ }()
+
+ go func() {
+ chunk, err := d.chunkStore.Get(ctx, req.Addr)
+ if err != nil {
+ log.Warn("ChunkStore.Get can not retrieve chunk", "err", err)
+ return
+ }
+ if req.SkipCheck {
+ err = sp.Deliver(ctx, chunk, s.priority)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
- streamer.deliveryC <- chunk.Addr[:]
- }()
- return nil
- }
- // TODO: call the retrieve function of the outgoing syncer
- if req.SkipCheck {
- log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr)
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
+ return
}
- return sp.Deliver(ctx, chunk, s.priority)
- }
- streamer.deliveryC <- chunk.Addr[:]
+ select {
+ case streamer.deliveryC <- chunk.Address()[:]:
+ case <-streamer.quit:
+ }
+
+ }()
+
return nil
}
@@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
+// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
@@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
"chunk.delivery")
defer osp.Finish()
- req.peer = sp
- d.receiveC <- req
- return nil
-}
+ processReceivedChunksCount.Inc(1)
-func (d *Delivery) processReceivedChunks() {
-R:
- for req := range d.receiveC {
- processReceivedChunksCount.Inc(1)
-
- if len(req.SData) > cp.DefaultSize+8 {
- log.Warn("received chunk is bigger than expected", "len", len(req.SData))
- continue R
- }
-
- // this should be has locally
- chunk, err := d.db.Get(context.TODO(), req.Addr)
- if err == nil {
- continue R
- }
- if err != storage.ErrFetching {
- log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk)
- continue R
- }
- select {
- case <-chunk.ReqC:
- log.Error("someone else delivered?", "hash", chunk.Addr.Hex())
- continue R
- default:
- }
-
- chunk.SData = req.SData
- d.db.Put(context.TODO(), chunk)
-
- go func(req *ChunkDeliveryMsg) {
- err := chunk.WaitToStore()
+ go func() {
+ req.peer = sp
+ err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
+ if err != nil {
if err == storage.ErrChunkInvalid {
+ // we removed this log because it spams the logs
+ // TODO: Enable this log line
+ // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
}
- }(req)
- }
+ }
+ }()
+ return nil
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
- var success bool
- var err error
+func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
+ var sp *Peer
+ spID := req.Source
- d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool {
- spId := p.ID()
- for _, p := range peersToSkip {
- if p == spId {
- log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
+ if spID != nil {
+ sp = d.getPeer(*spID)
+ if sp == nil {
+ return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
+ }
+ } else {
+ d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
+ id := p.ID()
+ // TODO: skip light nodes that do not accept retrieve requests
+ if req.SkipPeer(id.String()) {
+ log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
- }
- sp := d.getPeer(spId)
+ sp = d.getPeer(id)
+ if sp == nil {
+ log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
+ return true
+ }
+ spID = &id
+ return false
+ })
if sp == nil {
- log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
- return true
+ return nil, nil, errors.New("no peer found")
}
- err = sp.SendPriority(ctx, &RetrieveRequestMsg{
- Addr: hash,
- SkipCheck: skipCheck,
- }, Top)
- if err != nil {
- return true
- }
- requestFromPeersEachCount.Inc(1)
- success = true
- return false
- })
- if success {
- return nil
}
- return errors.New("no peer found")
+
+ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
+ Addr: req.Addr,
+ SkipCheck: req.SkipCheck,
+ }, Top)
+ if err != nil {
+ return nil, nil, err
+ }
+ requestFromPeersEachCount.Inc(1)
+
+ return spID, sp.quit, nil
}