diff options
author | Janos Guljas <janos@resenje.org> | 2018-09-27 15:43:00 +0800 |
---|---|---|
committer | Janos Guljas <janos@resenje.org> | 2018-09-27 15:43:00 +0800 |
commit | a5e6bf7eefbe6f56cf688b3542fe373c4670cb65 (patch) | |
tree | 8b712ef8fb72b354346c7b1092261c469ebd7d8a /swarm/network/stream | |
parent | 0d5e1e7bc9ad4044a679ab5429d118b2a0e8afe7 (diff) | |
parent | e39a9b3480af0ac8044294f46e0e9e4c3948d23c (diff) | |
download | go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.gz go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.bz2 go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.lz go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.xz go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.tar.zst go-tangerine-a5e6bf7eefbe6f56cf688b3542fe373c4670cb65.zip |
Merge branch 'master' into max-stream-peer-servers
Diffstat (limited to 'swarm/network/stream')
-rw-r--r-- | swarm/network/stream/delivery.go | 6 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 14 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 77 |
4 files changed, 88 insertions, 11 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 431136ab1..c2adb1009 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -128,6 +128,7 @@ func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, err type RetrieveRequestMsg struct { Addr storage.Address SkipCheck bool + HopCount uint8 } func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { @@ -148,7 +149,9 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future - ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + ctx = context.WithValue(ctx, "peer", sp.ID().String()) + ctx = context.WithValue(ctx, "hopcount", req.HopCount) + ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout) go func() { select { @@ -247,6 +250,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( err := sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: req.Addr, SkipCheck: req.SkipCheck, + HopCount: req.HopCount, }, Top) if err != nil { return nil, nil, err diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 1e47b7cf9..74c785d58 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -26,7 +26,7 @@ import ( bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" ) var syncBatchTimeout = 30 * time.Second @@ -197,10 +197,16 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if err != nil { return err } + hashes := req.Hashes - want, err := bv.New(len(hashes) / HashSize) + lenHashes := len(hashes) + if lenHashes%HashSize != 0 { + return fmt.Errorf("error invalid hashes length (len: %v)", lenHashes) + } + + want, err := bv.New(lenHashes / HashSize) if err != nil { - return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err) + return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) } ctr := 0 @@ -208,7 +214,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) ctx = context.WithValue(ctx, "source", p.ID().String()) - for i := 0; i < len(hashes); i += HashSize { + for i := 0; i < lenHashes; i += HashSize { hash := hashes[i : i+HashSize] if wait := c.NeedData(ctx, hash); wait != nil { diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 3b1b11d36..1eda06c6a 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -642,7 +642,7 @@ func (c *clientParams) clientCreated() { // Spec is the spec of the streamer protocol var Spec = &protocols.Spec{ Name: "stream", - Version: 6, + Version: 7, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 04366cd39..0bdebefa7 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -19,6 +19,7 @@ package stream import ( "bytes" "context" + "errors" "strconv" "testing" "time" @@ -56,11 +57,12 @@ func TestStreamerRequestSubscription(t *testing.T) { } var ( - hash0 = sha3.Sum256([]byte{0}) - hash1 = sha3.Sum256([]byte{1}) - hash2 = sha3.Sum256([]byte{2}) - hashesTmp = append(hash0[:], hash1[:]...) - hashes = append(hashesTmp, hash2[:]...) + hash0 = sha3.Sum256([]byte{0}) + hash1 = sha3.Sum256([]byte{1}) + hash2 = sha3.Sum256([]byte{2}) + hashesTmp = append(hash0[:], hash1[:]...) + hashes = append(hashesTmp, hash2[:]...) + corruptHashes = append(hashes[:40]) ) type testClient struct { @@ -460,6 +462,71 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } } +func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { + tester, streamer, _, teardown, err := newStreamerTester(t, nil) + defer teardown() + if err != nil { + t.Fatal(err) + } + + stream := NewStream("foo", "", true) + + var tc *testClient + + streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { + tc = newTestClient(t) + return tc, nil + }) + + node := tester.Nodes[0] + + err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: NewRange(5, 8), + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, + p2ptest.Exchange{ + Label: "Corrupt offered hash message", + Triggers: []p2ptest.Trigger{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: corruptHashes, + From: 5, + To: 8, + Stream: stream, + }, + Peer: node.ID(), + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil { + t.Fatal(err) + } +} + func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() |