aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--swarm/network/stream/delivery.go13
-rw-r--r--swarm/network/stream/messages.go3
-rw-r--r--swarm/network/stream/peer.go29
-rw-r--r--swarm/network/stream/stream.go14
4 files changed, 47 insertions, 12 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 0429c4dff..9092ffe3e 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -173,7 +173,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return
}
if req.SkipCheck {
- err = sp.Deliver(ctx, chunk, s.priority)
+ syncing := false
+ err = sp.Deliver(ctx, chunk, s.priority, syncing)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
@@ -189,12 +190,22 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return nil
}
+//Chunk delivery always uses the same message type....
type ChunkDeliveryMsg struct {
Addr storage.Address
SData []byte // the stored chunk Data (incl size)
peer *Peer // set in handleChunkDeliveryMsg
}
+//...but swap accounting needs to disambiguate if it is a delivery for syncing or for retrieval
+//as it decides based on message type if it needs to account for this message or not
+
+//defines a chunk delivery for retrieval (with accounting)
+type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
+
+//defines a chunk delivery for syncing (without accounting)
+type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
+
// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 0fe3e5eb4..eb1b2983e 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -357,7 +357,8 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
chunk := storage.NewChunk(hash, data)
- if err := p.Deliver(ctx, chunk, s.priority); err != nil {
+ syncing := true
+ if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil {
return err
}
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 89d135ad5..4bccf56f5 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -128,17 +128,34 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
+// Depending on the `syncing` parameter we send different message types
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
var sp opentracing.Span
+ var msg interface{}
+
+ spanName := "send.chunk.delivery"
+
+ //we send different types of messages if delivery is for syncing or retrievals,
+ //even if handling and content of the message are the same,
+ //because swap accounting decides which messages need accounting based on the message type
+ if syncing {
+ msg = &ChunkDeliveryMsgSyncing{
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
+ }
+ spanName += ".syncing"
+ } else {
+ msg = &ChunkDeliveryMsgRetrieval{
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
+ }
+ spanName += ".retrieval"
+ }
ctx, sp = spancontext.StartSpan(
ctx,
- "send.chunk.delivery")
+ spanName)
defer sp.Finish()
- msg := &ChunkDeliveryMsg{
- Addr: chunk.Address(),
- SData: chunk.Data(),
- }
return p.SendPriority(ctx, msg, priority)
}
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 9d0e6c68b..0ac374def 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -489,8 +489,13 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
case *WantedHashesMsg:
return p.handleWantedHashesMsg(ctx, msg)
- case *ChunkDeliveryMsg:
- return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
+ case *ChunkDeliveryMsgRetrieval:
+ //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+
+ case *ChunkDeliveryMsgSyncing:
+ //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *RetrieveRequestMsg:
return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
@@ -681,7 +686,7 @@ func (c *clientParams) clientCreated() {
// Spec is the spec of the streamer protocol
var Spec = &protocols.Spec{
Name: "stream",
- Version: 7,
+ Version: 8,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
UnsubscribeMsg{},
@@ -690,10 +695,11 @@ var Spec = &protocols.Spec{
TakeoverProofMsg{},
SubscribeMsg{},
RetrieveRequestMsg{},
- ChunkDeliveryMsg{},
+ ChunkDeliveryMsgRetrieval{},
SubscribeErrorMsg{},
RequestSubscriptionMsg{},
QuitMsg{},
+ ChunkDeliveryMsgSyncing{},
},
}