aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
authorJanos Guljas <janos@resenje.org>2018-09-27 15:43:00 +0800
committerJanos Guljas <janos@resenje.org>2018-09-27 15:43:00 +0800
commita5e6bf7eefbe6f56cf688b3542fe373c4670cb65 (patch)
tree8b712ef8fb72b354346c7b1092261c469ebd7d8a /swarm/network/stream
parent0d5e1e7bc9ad4044a679ab5429d118b2a0e8afe7 (diff)
parente39a9b3480af0ac8044294f46e0e9e4c3948d23c (diff)
downloadgo-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.gz
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.bz2
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.lz
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.xz
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.zst
go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.zip
Merge branch 'master' into max-stream-peer-servers
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/delivery.go6
-rw-r--r--swarm/network/stream/messages.go14
-rw-r--r--swarm/network/stream/stream.go2
-rw-r--r--swarm/network/stream/streamer_test.go77
4 files changed, 88 insertions, 11 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 431136ab1..c2adb1009 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -128,6 +128,7 @@ func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, err
type RetrieveRequestMsg struct {
Addr storage.Address
SkipCheck bool
+ HopCount uint8
}
func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
@@ -148,7 +149,9 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
- ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
+ ctx = context.WithValue(ctx, "peer", sp.ID().String())
+ ctx = context.WithValue(ctx, "hopcount", req.HopCount)
+ ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)
go func() {
select {
@@ -247,6 +250,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
+ HopCount: req.HopCount,
}, Top)
if err != nil {
return nil, nil, err
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 1e47b7cf9..74c785d58 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -26,7 +26,7 @@ import (
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go"
)
var syncBatchTimeout = 30 * time.Second
@@ -197,10 +197,16 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return err
}
+
hashes := req.Hashes
- want, err := bv.New(len(hashes) / HashSize)
+ lenHashes := len(hashes)
+ if lenHashes%HashSize != 0 {
+ return fmt.Errorf("error invalid hashes length (len: %v)", lenHashes)
+ }
+
+ want, err := bv.New(lenHashes / HashSize)
if err != nil {
- return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
+ return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err)
}
ctr := 0
@@ -208,7 +214,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
ctx = context.WithValue(ctx, "source", p.ID().String())
- for i := 0; i < len(hashes); i += HashSize {
+ for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 3b1b11d36..1eda06c6a 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -642,7 +642,7 @@ func (c *clientParams) clientCreated() {
// Spec is the spec of the streamer protocol
var Spec = &protocols.Spec{
Name: "stream",
- Version: 6,
+ Version: 7,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
UnsubscribeMsg{},
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 04366cd39..0bdebefa7 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -19,6 +19,7 @@ package stream
import (
"bytes"
"context"
+ "errors"
"strconv"
"testing"
"time"
@@ -56,11 +57,12 @@ func TestStreamerRequestSubscription(t *testing.T) {
}
var (
- hash0 = sha3.Sum256([]byte{0})
- hash1 = sha3.Sum256([]byte{1})
- hash2 = sha3.Sum256([]byte{2})
- hashesTmp = append(hash0[:], hash1[:]...)
- hashes = append(hashesTmp, hash2[:]...)
+ hash0 = sha3.Sum256([]byte{0})
+ hash1 = sha3.Sum256([]byte{1})
+ hash2 = sha3.Sum256([]byte{2})
+ hashesTmp = append(hash0[:], hash1[:]...)
+ hashes = append(hashesTmp, hash2[:]...)
+ corruptHashes = append(hashes[:40])
)
type testClient struct {
@@ -460,6 +462,71 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
}
+func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ stream := NewStream("foo", "", true)
+
+ var tc *testClient
+
+ streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
+ tc = newTestClient(t)
+ return tc, nil
+ })
+
+ node := tester.Nodes[0]
+
+ err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: NewRange(5, 8),
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ },
+ p2ptest.Exchange{
+ Label: "Corrupt offered hash message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: corruptHashes,
+ From: 5,
+ To: 8,
+ Stream: stream,
+ },
+ Peer: node.ID(),
+ },
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
+ if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil {
+ t.Fatal(err)
+ }
+}
+
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()