From 88b41a9e680a764aa079051aa7c71b3c6879d60a Mon Sep 17 00:00:00 2001 From: holisticode Date: Sun, 21 Oct 2018 02:30:41 -0500 Subject: =?UTF-8?q?swarm/network/stream:=20disambiguate=20chunk=20delivery?= =?UTF-8?q?=20messages=20(retrieval=E2=80=A6=20(#17920)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * swarm/network/stream: disambiguate chunk delivery messages (retrieval vs syncing) * swarm/network/stream: addressed PR comments * swarm/network/stream: stream protocol version change due to new message types in this PR --- swarm/network/stream/delivery.go | 13 ++++++++++++- swarm/network/stream/messages.go | 3 ++- swarm/network/stream/peer.go | 29 +++++++++++++++++++++++------ swarm/network/stream/stream.go | 14 ++++++++++---- 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 0429c4dff..9092ffe3e 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -173,7 +173,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return } if req.SkipCheck { - err = sp.Deliver(ctx, chunk, s.priority) + syncing := false + err = sp.Deliver(ctx, chunk, s.priority, syncing) if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } @@ -189,12 +190,22 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return nil } +//Chunk delivery always uses the same message type.... type ChunkDeliveryMsg struct { Addr storage.Address SData []byte // the stored chunk Data (incl size) peer *Peer // set in handleChunkDeliveryMsg } +//...but swap accounting needs to disambiguate if it is a delivery for syncing or for retrieval +//as it decides based on message type if it needs to account for this message or not + +//defines a chunk delivery for retrieval (with accounting) +type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg + +//defines a chunk delivery for syncing (without accounting) +type ChunkDeliveryMsgSyncing ChunkDeliveryMsg + // TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 0fe3e5eb4..eb1b2983e 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -357,7 +357,8 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } chunk := storage.NewChunk(hash, data) - if err := p.Deliver(ctx, chunk, s.priority); err != nil { + syncing := true + if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil { return err } } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 89d135ad5..4bccf56f5 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -128,17 +128,34 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { +// Depending on the `syncing` parameter we send different message types +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { var sp opentracing.Span + var msg interface{} + + spanName := "send.chunk.delivery" + + //we send different types of messages if delivery is for syncing or retrievals, + //even if handling and content of the message are the same, + //because swap accounting decides which messages need accounting based on the message type + if syncing { + msg = &ChunkDeliveryMsgSyncing{ + Addr: chunk.Address(), + SData: chunk.Data(), + } + spanName += ".syncing" + } else { + msg = &ChunkDeliveryMsgRetrieval{ + Addr: chunk.Address(), + SData: chunk.Data(), + } + spanName += ".retrieval" + } ctx, sp = spancontext.StartSpan( ctx, - "send.chunk.delivery") + spanName) defer sp.Finish() - msg := &ChunkDeliveryMsg{ - Addr: chunk.Address(), - SData: chunk.Data(), - } return p.SendPriority(ctx, msg, priority) } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 9d0e6c68b..0ac374def 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -489,8 +489,13 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { case *WantedHashesMsg: return p.handleWantedHashesMsg(ctx, msg) - case *ChunkDeliveryMsg: - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) + case *ChunkDeliveryMsgRetrieval: + //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + + case *ChunkDeliveryMsgSyncing: + //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *RetrieveRequestMsg: return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) @@ -681,7 +686,7 @@ func (c *clientParams) clientCreated() { // Spec is the spec of the streamer protocol var Spec = &protocols.Spec{ Name: "stream", - Version: 7, + Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, @@ -690,10 +695,11 @@ var Spec = &protocols.Spec{ TakeoverProofMsg{}, SubscribeMsg{}, RetrieveRequestMsg{}, - ChunkDeliveryMsg{}, + ChunkDeliveryMsgRetrieval{}, SubscribeErrorMsg{}, RequestSubscriptionMsg{}, QuitMsg{}, + ChunkDeliveryMsgSyncing{}, }, } -- cgit v1.2.3