From 88b41a9e680a764aa079051aa7c71b3c6879d60a Mon Sep 17 00:00:00 2001
From: holisticode <holistic.computing@gmail.com>
Date: Sun, 21 Oct 2018 02:30:41 -0500
Subject: =?UTF-8?q?swarm/network/stream:=20disambiguate=20chunk=20delivery?=
 =?UTF-8?q?=20messages=20(retrieval=E2=80=A6=20(#17920)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* swarm/network/stream: disambiguate chunk delivery messages (retrieval vs syncing)

* swarm/network/stream: addressed PR comments

* swarm/network/stream: stream protocol version change due to new message types in this PR
---
 swarm/network/stream/delivery.go | 13 ++++++++++++-
 swarm/network/stream/messages.go |  3 ++-
 swarm/network/stream/peer.go     | 29 +++++++++++++++++++++++------
 swarm/network/stream/stream.go   | 14 ++++++++++----
 4 files changed, 47 insertions(+), 12 deletions(-)

(limited to 'swarm')

diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 0429c4dff..9092ffe3e 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -173,7 +173,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 			return
 		}
 		if req.SkipCheck {
-			err = sp.Deliver(ctx, chunk, s.priority)
+			syncing := false
+			err = sp.Deliver(ctx, chunk, s.priority, syncing)
 			if err != nil {
 				log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
 			}
@@ -189,12 +190,22 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 	return nil
 }
 
+//Chunk delivery always uses the same message type....
 type ChunkDeliveryMsg struct {
 	Addr  storage.Address
 	SData []byte // the stored chunk Data (incl size)
 	peer  *Peer  // set in handleChunkDeliveryMsg
 }
 
+//...but swap accounting needs to disambiguate if it is a delivery for syncing or for retrieval
+//as it decides based on message type if it needs to account for this message or not
+
+//defines a chunk delivery for retrieval (with accounting)
+type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
+
+//defines a chunk delivery for syncing (without accounting)
+type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
+
 // TODO: Fix context SNAFU
 func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
 	var osp opentracing.Span
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 0fe3e5eb4..eb1b2983e 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -357,7 +357,8 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
 				return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
 			}
 			chunk := storage.NewChunk(hash, data)
-			if err := p.Deliver(ctx, chunk, s.priority); err != nil {
+			syncing := true
+			if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil {
 				return err
 			}
 		}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 89d135ad5..4bccf56f5 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -128,17 +128,34 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 }
 
 // Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
+// Depending on the `syncing` parameter we send different message types
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
 	var sp opentracing.Span
+	var msg interface{}
+
+	spanName := "send.chunk.delivery"
+
+	//we send different types of messages if delivery is for syncing or retrievals,
+	//even if handling and content of the message are the same,
+	//because swap accounting decides which messages need accounting based on the message type
+	if syncing {
+		msg = &ChunkDeliveryMsgSyncing{
+			Addr:  chunk.Address(),
+			SData: chunk.Data(),
+		}
+		spanName += ".syncing"
+	} else {
+		msg = &ChunkDeliveryMsgRetrieval{
+			Addr:  chunk.Address(),
+			SData: chunk.Data(),
+		}
+		spanName += ".retrieval"
+	}
 	ctx, sp = spancontext.StartSpan(
 		ctx,
-		"send.chunk.delivery")
+		spanName)
 	defer sp.Finish()
 
-	msg := &ChunkDeliveryMsg{
-		Addr:  chunk.Address(),
-		SData: chunk.Data(),
-	}
 	return p.SendPriority(ctx, msg, priority)
 }
 
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 9d0e6c68b..0ac374def 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -489,8 +489,13 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
 	case *WantedHashesMsg:
 		return p.handleWantedHashesMsg(ctx, msg)
 
-	case *ChunkDeliveryMsg:
-		return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
+	case *ChunkDeliveryMsgRetrieval:
+		//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+		return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+
+	case *ChunkDeliveryMsgSyncing:
+		//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+		return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
 
 	case *RetrieveRequestMsg:
 		return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
@@ -681,7 +686,7 @@ func (c *clientParams) clientCreated() {
 // Spec is the spec of the streamer protocol
 var Spec = &protocols.Spec{
 	Name:       "stream",
-	Version:    7,
+	Version:    8,
 	MaxMsgSize: 10 * 1024 * 1024,
 	Messages: []interface{}{
 		UnsubscribeMsg{},
@@ -690,10 +695,11 @@ var Spec = &protocols.Spec{
 		TakeoverProofMsg{},
 		SubscribeMsg{},
 		RetrieveRequestMsg{},
-		ChunkDeliveryMsg{},
+		ChunkDeliveryMsgRetrieval{},
 		SubscribeErrorMsg{},
 		RequestSubscriptionMsg{},
 		QuitMsg{},
+		ChunkDeliveryMsgSyncing{},
 	},
 }
 
-- 
cgit v1.2.3