aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery_test.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/stream/delivery_test.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r--swarm/network/stream/delivery_test.go699
1 files changed, 699 insertions, 0 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
new file mode 100644
index 000000000..b03028c88
--- /dev/null
+++ b/swarm/network/stream/delivery_test.go
@@ -0,0 +1,699 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package stream
+
+import (
+ "bytes"
+ "context"
+ crand "crypto/rand"
+ "fmt"
+ "io"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+func TestStreamerRetrieveRequest(t *testing.T) {
+ tester, streamer, _, teardown, err := newStreamerTester(t)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ peerID := tester.IDs[0]
+
+ streamer.delivery.RequestFromPeers(hash0[:], true)
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "RetrieveRequestMsg",
+ Expects: []p2ptest.Expect{
+ {
+ Code: 5,
+ Msg: &RetrieveRequestMsg{
+ Addr: hash0[:],
+ SkipCheck: true,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+}
+
+func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
+ tester, streamer, _, teardown, err := newStreamerTester(t)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ peerID := tester.IDs[0]
+
+ chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
+
+ peer := streamer.getPeer(peerID)
+
+ peer.handleSubscribeMsg(&SubscribeMsg{
+ Stream: NewStream(swarmChunkServerStreamName, "", false),
+ History: nil,
+ Priority: Top,
+ })
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "RetrieveRequestMsg",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 5,
+ Msg: &RetrieveRequestMsg{
+ Addr: chunk.Addr[:],
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ HandoverProof: nil,
+ Hashes: nil,
+ From: 0,
+ To: 0,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
+ if err == nil || err.Error() != expectedError {
+ t.Fatalf("Expected error %v, got %v", expectedError, err)
+ }
+}
+
+// upstream request server receives a retrieve Request and responds with
+// offered hashes or delivery if skipHash is set to true
+func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
+ tester, streamer, localStore, teardown, err := newStreamerTester(t)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ peerID := tester.IDs[0]
+ peer := streamer.getPeer(peerID)
+
+ stream := NewStream(swarmChunkServerStreamName, "", false)
+
+ peer.handleSubscribeMsg(&SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ })
+
+ hash := storage.Address(hash0[:])
+ chunk := storage.NewChunk(hash, nil)
+ chunk.SData = hash
+ localStore.Put(chunk)
+ chunk.WaitToStore()
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "RetrieveRequestMsg",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 5,
+ Msg: &RetrieveRequestMsg{
+ Addr: hash,
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: hash,
+ From: 0,
+ // TODO: why is this 32???
+ To: 32,
+ Stream: stream,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ hash = storage.Address(hash1[:])
+ chunk = storage.NewChunk(hash, nil)
+ chunk.SData = hash1[:]
+ localStore.Put(chunk)
+ chunk.WaitToStore()
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "RetrieveRequestMsg",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 5,
+ Msg: &RetrieveRequestMsg{
+ Addr: hash,
+ SkipCheck: true,
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 6,
+ Msg: &ChunkDeliveryMsg{
+ Addr: hash,
+ SData: hash,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
+ tester, streamer, localStore, teardown, err := newStreamerTester(t)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
+ return &testClient{
+ t: t,
+ }, nil
+ })
+
+ peerID := tester.IDs[0]
+
+ stream := NewStream("foo", "", true)
+ err = streamer.Subscribe(peerID, stream, NewRange(5, 8), Top)
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ chunkKey := hash0[:]
+ chunkData := hash1[:]
+ chunk, created := localStore.GetOrCreateRequest(chunkKey)
+
+ if !created {
+ t.Fatal("chunk already exists")
+ }
+ select {
+ case <-chunk.ReqC:
+ t.Fatal("chunk is already received")
+ default:
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: NewRange(5, 8),
+ Priority: Top,
+ },
+ Peer: peerID,
+ },
+ },
+ },
+ p2ptest.Exchange{
+ Label: "ChunkDeliveryRequest message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 6,
+ Msg: &ChunkDeliveryMsg{
+ Addr: chunkKey,
+ SData: chunkData,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ timeout := time.NewTimer(1 * time.Second)
+
+ select {
+ case <-timeout.C:
+ t.Fatal("timeout receiving chunk")
+ case <-chunk.ReqC:
+ }
+
+ storedChunk, err := localStore.Get(chunkKey)
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if !bytes.Equal(storedChunk.SData, chunkData) {
+ t.Fatal("Retrieved chunk has different data than original")
+ }
+
+}
+
+func TestDeliveryFromNodes(t *testing.T) {
+ testDeliveryFromNodes(t, 2, 1, dataChunkCount, true)
+ testDeliveryFromNodes(t, 2, 1, dataChunkCount, false)
+ testDeliveryFromNodes(t, 4, 1, dataChunkCount, true)
+ testDeliveryFromNodes(t, 4, 1, dataChunkCount, false)
+ testDeliveryFromNodes(t, 8, 1, dataChunkCount, true)
+ testDeliveryFromNodes(t, 8, 1, dataChunkCount, false)
+ testDeliveryFromNodes(t, 16, 1, dataChunkCount, true)
+ testDeliveryFromNodes(t, 16, 1, dataChunkCount, false)
+}
+
+func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
+ defaultSkipCheck = skipCheck
+ toAddr = network.NewAddrFromNodeID
+ createStoreFunc = createTestLocalStorageFromSim
+ conf := &streamTesting.RunConfig{
+ Adapter: *adapter,
+ NodeCount: nodes,
+ ConnLevel: conns,
+ ToAddr: toAddr,
+ Services: services,
+ EnableMsgEvents: false,
+ }
+
+ sim, teardown, err := streamTesting.NewSimulation(conf)
+ var rpcSubscriptionsWg sync.WaitGroup
+ defer func() {
+ rpcSubscriptionsWg.Wait()
+ teardown()
+ }()
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ stores = make(map[discover.NodeID]storage.ChunkStore)
+ for i, id := range sim.IDs {
+ stores[id] = sim.Stores[i]
+ }
+ registries = make(map[discover.NodeID]*TestRegistry)
+ deliveries = make(map[discover.NodeID]*Delivery)
+ peerCount = func(id discover.NodeID) int {
+ if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
+ return 1
+ }
+ return 2
+ }
+
+ // here we distribute chunks of a random file into Stores of nodes 1 to nodes
+ rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
+ size := chunkCount * chunkSize
+ fileHash, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
+ // wait until all chunks stored
+ wait()
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ errc := make(chan error, 1)
+ waitPeerErrC = make(chan error)
+ quitC := make(chan struct{})
+ defer close(quitC)
+
+ action := func(ctx context.Context) error {
+ // each node Subscribes to each other's swarmChunkServerStreamName
+ // need to wait till an aynchronous process registers the peers in streamer.peers
+ // that is used by Subscribe
+ // using a global err channel to share betweem action and node service
+ i := 0
+ for err := range waitPeerErrC {
+ if err != nil {
+ return fmt.Errorf("error waiting for peers: %s", err)
+ }
+ i++
+ if i == nodes {
+ break
+ }
+ }
+
+ // each node subscribes to the upstream swarm chunk server stream
+ // which responds to chunk retrieve requests all but the last node in the chain does not
+ for j := 0; j < nodes-1; j++ {
+ id := sim.IDs[j]
+ err := sim.CallClient(id, func(client *rpc.Client) error {
+ doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
+ if err != nil {
+ return err
+ }
+ rpcSubscriptionsWg.Add(1)
+ go func() {
+ <-doneC
+ rpcSubscriptionsWg.Done()
+ }()
+ ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
+ defer cancel()
+ sid := sim.IDs[j+1]
+ return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
+ })
+ if err != nil {
+ return err
+ }
+ }
+ // create a retriever FileStore for the pivot node
+ delivery := deliveries[sim.IDs[0]]
+ retrieveFunc := func(chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
+ }
+ netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+ go func() {
+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+ // we must wait for the peer connections to have started before requesting
+ n, err := readAll(fileStore, fileHash)
+ log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+ if err != nil {
+ errc <- fmt.Errorf("requesting chunks action error: %v", err)
+ }
+ }()
+ return nil
+ }
+ check := func(ctx context.Context, id discover.NodeID) (bool, error) {
+ select {
+ case err := <-errc:
+ return false, err
+ case <-ctx.Done():
+ return false, ctx.Err()
+ default:
+ }
+ var total int64
+ err := sim.CallClient(id, func(client *rpc.Client) error {
+ ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+ return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash))
+ })
+ log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+ if err != nil || total != int64(size) {
+ return false, nil
+ }
+ return true, nil
+ }
+
+ conf.Step = &simulations.Step{
+ Action: action,
+ Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
+ // we are only testing the pivot node (net.Nodes[0])
+ Expect: &simulations.Expectation{
+ Nodes: sim.IDs[0:1],
+ Check: check,
+ },
+ }
+ startedAt := time.Now()
+ timeout := 300 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ result, err := sim.Run(ctx, conf)
+ finishedAt := time.Now()
+ if err != nil {
+ t.Fatalf("Setting up simulation failed: %v", err)
+ }
+ if result.Error != nil {
+ t.Fatalf("Simulation failed: %s", result.Error)
+ }
+ streamTesting.CheckResult(t, result, startedAt, finishedAt)
+}
+
+func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
+ for chunks := 32; chunks <= 128; chunks *= 2 {
+ for i := 2; i < 32; i *= 2 {
+ b.Run(
+ fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
+ func(b *testing.B) {
+ benchmarkDeliveryFromNodes(b, i, 1, chunks, true)
+ },
+ )
+ }
+ }
+}
+
+func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
+ for chunks := 32; chunks <= 128; chunks *= 2 {
+ for i := 2; i < 32; i *= 2 {
+ b.Run(
+ fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
+ func(b *testing.B) {
+ benchmarkDeliveryFromNodes(b, i, 1, chunks, false)
+ },
+ )
+ }
+ }
+}
+
+func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
+ defaultSkipCheck = skipCheck
+ toAddr = network.NewAddrFromNodeID
+ createStoreFunc = createTestLocalStorageFromSim
+ registries = make(map[discover.NodeID]*TestRegistry)
+
+ timeout := 300 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ conf := &streamTesting.RunConfig{
+ Adapter: *adapter,
+ NodeCount: nodes,
+ ConnLevel: conns,
+ ToAddr: toAddr,
+ Services: services,
+ EnableMsgEvents: false,
+ }
+ sim, teardown, err := streamTesting.NewSimulation(conf)
+ var rpcSubscriptionsWg sync.WaitGroup
+ defer func() {
+ rpcSubscriptionsWg.Wait()
+ teardown()
+ }()
+ if err != nil {
+ b.Fatal(err.Error())
+ }
+
+ stores = make(map[discover.NodeID]storage.ChunkStore)
+ deliveries = make(map[discover.NodeID]*Delivery)
+ for i, id := range sim.IDs {
+ stores[id] = sim.Stores[i]
+ }
+ peerCount = func(id discover.NodeID) int {
+ if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
+ return 1
+ }
+ return 2
+ }
+ // wait channel for all nodes all peer connections to set up
+ waitPeerErrC = make(chan error)
+
+ // create a FileStore for the last node in the chain which we are gonna write to
+ remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams())
+
+ // channel to signal simulation initialisation with action call complete
+ // or node disconnections
+ disconnectC := make(chan error)
+ quitC := make(chan struct{})
+
+ initC := make(chan error)
+
+ action := func(ctx context.Context) error {
+ // each node Subscribes to each other's swarmChunkServerStreamName
+ // need to wait till an aynchronous process registers the peers in streamer.peers
+ // that is used by Subscribe
+ // waitPeerErrC using a global err channel to share betweem action and node service
+ i := 0
+ for err := range waitPeerErrC {
+ if err != nil {
+ return fmt.Errorf("error waiting for peers: %s", err)
+ }
+ i++
+ if i == nodes {
+ break
+ }
+ }
+ var err error
+ // each node except the last one subscribes to the upstream swarm chunk server stream
+ // which responds to chunk retrieve requests
+ for j := 0; j < nodes-1; j++ {
+ id := sim.IDs[j]
+ err = sim.CallClient(id, func(client *rpc.Client) error {
+ doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
+ if err != nil {
+ return err
+ }
+ rpcSubscriptionsWg.Add(1)
+ go func() {
+ <-doneC
+ rpcSubscriptionsWg.Done()
+ }()
+ ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
+ defer cancel()
+ sid := sim.IDs[j+1] // the upstream peer's id
+ return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
+ })
+ if err != nil {
+ break
+ }
+ }
+ initC <- err
+ return nil
+ }
+
+ // the check function is only triggered when the benchmark finishes
+ trigger := make(chan discover.NodeID)
+ check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) {
+ return true, nil
+ }
+
+ conf.Step = &simulations.Step{
+ Action: action,
+ Trigger: trigger,
+ // we are only testing the pivot node (net.Nodes[0])
+ Expect: &simulations.Expectation{
+ Nodes: sim.IDs[0:1],
+ Check: check,
+ },
+ }
+
+ // run the simulation in the background
+ errc := make(chan error)
+ go func() {
+ _, err := sim.Run(ctx, conf)
+ close(quitC)
+ errc <- err
+ }()
+
+ // wait for simulation action to complete stream subscriptions
+ err = <-initC
+ if err != nil {
+ b.Fatalf("simulation failed to initialise. expected no error. got %v", err)
+ }
+
+ // create a retriever FileStore for the pivot node
+ // by now deliveries are set for each node by the streamer service
+ delivery := deliveries[sim.IDs[0]]
+ retrieveFunc := func(chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
+ }
+ netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
+
+ // benchmark loop
+ b.ResetTimer()
+ b.StopTimer()
+Loop:
+ for i := 0; i < b.N; i++ {
+ // uploading chunkCount random chunks to the last node
+ hashes := make([]storage.Address, chunkCount)
+ for i := 0; i < chunkCount; i++ {
+ // create actual size real chunks
+ hash, wait, err := remoteFileStore.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
+ // wait until all chunks stored
+ wait()
+ if err != nil {
+ b.Fatalf("expected no error. got %v", err)
+ }
+ // collect the hashes
+ hashes[i] = hash
+ }
+ // now benchmark the actual retrieval
+ // netstore.Get is called for each hash in a go routine and errors are collected
+ b.StartTimer()
+ errs := make(chan error)
+ for _, hash := range hashes {
+ go func(h storage.Address) {
+ _, err := netStore.Get(h)
+ log.Warn("test check netstore get", "hash", h, "err", err)
+ errs <- err
+ }(hash)
+ }
+ // count and report retrieval errors
+ // if there are misses then chunk timeout is too low for the distance and volume (?)
+ var total, misses int
+ for err := range errs {
+ if err != nil {
+ log.Warn(err.Error())
+ misses++
+ }
+ total++
+ if total == chunkCount {
+ break
+ }
+ }
+ b.StopTimer()
+
+ select {
+ case err = <-disconnectC:
+ if err != nil {
+ break Loop
+ }
+ default:
+ }
+
+ if misses > 0 {
+ err = fmt.Errorf("%v chunk not found out of %v", misses, total)
+ break Loop
+ }
+ }
+
+ select {
+ case <-quitC:
+ case trigger <- sim.IDs[0]:
+ }
+ if err == nil {
+ err = <-errc
+ } else {
+ if e := <-errc; e != nil {
+ b.Errorf("sim.Run function error: %v", e)
+ }
+ }
+
+ // benchmark over, trigger the check function to conclude the simulation
+ if err != nil {
+ b.Fatalf("expected no error. got %v", err)
+ }
+}
+
+func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
+ return stores[id], nil
+}