aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/messages.go')
-rw-r--r--swarm/network/stream/messages.go87
1 files changed, 47 insertions, 40 deletions
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index a19f63589..2e1a81e82 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -18,9 +18,7 @@ package stream
import (
"context"
- "errors"
"fmt"
- "sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
@@ -31,6 +29,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
+var syncBatchTimeout = 30 * time.Second
+
// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
@@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
@@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
}
@@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
}
- wg := sync.WaitGroup{}
+
+ ctr := 0
+ errC := make(chan error)
+ ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
+
+ ctx = context.WithValue(ctx, "source", p.ID().String())
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
+ ctr++
want.Set(i/HashSize, true)
- wg.Add(1)
// create request and wait until the chunk data arrives and is stored
- go func(w func()) {
- w()
- wg.Done()
+ go func(w func(context.Context) error) {
+ select {
+ case errC <- w(ctx):
+ case <-ctx.Done():
+ }
}(wait)
}
}
- // done := make(chan bool)
- // go func() {
- // wg.Wait()
- // close(done)
- // }()
- // go func() {
- // select {
- // case <-done:
- // s.next <- s.batchDone(p, req, hashes)
- // case <-time.After(1 * time.Second):
- // p.Drop(errors.New("timeout waiting for batch to be delivered"))
- // }
- // }()
+
go func() {
- wg.Wait()
+ defer cancel()
+ for i := 0; i < ctr; i++ {
+ select {
+ case err := <-errC:
+ if err != nil {
+ log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ return
+ }
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
+ return
+ case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ }
+ }
select {
case c.next <- c.batchDone(p, req, hashes):
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
}
}()
// only send wantedKeysMsg if all missing chunks of the previous batch arrived
@@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
- log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
+ log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID())
if from == to {
return nil
}
@@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
- case <-time.After(120 * time.Second):
- log.Warn("handleOfferedHashesMsg timeout, so dropping peer")
- p.Drop(errors.New("handle offered hashes timeout"))
- return
case err := <-c.next:
if err != nil {
- log.Warn("c.next dropping peer", "err", err)
+ log.Warn("c.next error dropping peer", "err", err)
p.Drop(err)
return
}
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
- log.Warn("SendPriority err, so dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendPriority error", "err", err)
}
}()
return nil
@@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
@@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = data
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
- }
+ chunk := storage.NewChunk(hash, data)
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}