diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 593 |
1 files changed, 0 insertions, 593 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go deleted file mode 100644 index fc0f9d5df..000000000 --- a/swarm/network/stream/delivery_test.go +++ /dev/null @@ -1,593 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package stream - -import ( - "bytes" - "context" - "errors" - "fmt" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/swarm/chunk" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" - pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" - "github.com/ethereum/go-ethereum/swarm/network/simulation" - "github.com/ethereum/go-ethereum/swarm/state" - "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/testutil" -) - -//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) -//Should time out as the peer does not have the chunk (no syncing happened previously) -func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{ - Syncing: SyncingDisabled, //do no syncing - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - chunk := storage.NewChunk(storage.Address(hash0[:]), nil) - - //test the exchange - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { //then the actual RETRIEVE_REQUEST.... - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: chunk.Address()[:], - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { //to which the peer responds with offered hashes - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: nil, - Hashes: nil, - From: 0, - To: 0, - }, - Peer: node.ID(), - }, - }, - }) - - //should fail with a timeout as the peer we are requesting - //the chunk from does not have the chunk - expectedError := `exchange #0 "RetrieveRequestMsg": timed out` - if err == nil || err.Error() != expectedError { - t.Fatalf("Expected error %v, got %v", expectedError, err) - } -} - -// upstream request server receives a retrieve Request and responds with -// offered hashes or delivery if skipHash is set to true -func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Syncing: SyncingDisabled, - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - hash := storage.Address(hash1[:]) - ch := storage.NewChunk(hash, hash1[:]) - _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) - if err != nil { - t.Fatalf("Expected no err got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 6, - Msg: &ChunkDeliveryMsg{ - Addr: ch.Address(), - SData: ch.Data(), - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -// if there is one peer in the Kademlia, RequestFromPeers should return it -func TestRequestFromPeers(t *testing.T) { - dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") - - addr := network.RandomAddr() - to := network.NewKademlia(addr.OAddr, network.NewKadParams()) - delivery := NewDelivery(to, nil) - protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) - peer := network.NewPeer(&network.BzzPeer{ - BzzAddr: network.RandomAddr(), - LightNode: false, - Peer: protocolsPeer, - }, to) - to.On(peer) - r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) - - // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished - sp := &Peer{ - BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, - pq: pq.New(int(PriorityQueue), PriorityQueueCap), - streamer: r, - } - r.setPeer(sp) - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - ctx := context.Background() - id, _, err := delivery.RequestFromPeers(ctx, req) - - if err != nil { - t.Fatal(err) - } - if *id != dummyPeerID { - t.Fatalf("Expected an id, got %v", id) - } -} - -// RequestFromPeers should not return light nodes -func TestRequestFromPeersWithLightNode(t *testing.T) { - dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") - - addr := network.RandomAddr() - to := network.NewKademlia(addr.OAddr, network.NewKadParams()) - delivery := NewDelivery(to, nil) - - protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) - // setting up a lightnode - peer := network.NewPeer(&network.BzzPeer{ - BzzAddr: network.RandomAddr(), - LightNode: true, - Peer: protocolsPeer, - }, to) - to.On(peer) - r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) - // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished - sp := &Peer{ - BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, - pq: pq.New(int(PriorityQueue), PriorityQueueCap), - streamer: r, - } - r.setPeer(sp) - - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - - ctx := context.Background() - // making a request which should return with "no peer found" - _, _, err := delivery.RequestFromPeers(ctx, req) - - expectedError := "no peer found" - if err.Error() != expectedError { - t.Fatalf("expected '%v', got %v", expectedError, err) - } -} - -func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Syncing: SyncingDisabled, - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { - return &testClient{ - t: t, - }, nil - }) - - node := tester.Nodes[0] - - //subscribe to custom stream - stream := NewStream("foo", "", true) - err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - chunkKey := hash0[:] - chunkData := hash1[:] - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Expects: []p2ptest.Expect{ - { //first expect subscription to the custom stream... - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, - p2ptest.Exchange{ - Label: "ChunkDelivery message", - Triggers: []p2ptest.Trigger{ - { //...then trigger a chunk delivery for the given chunk from peer in order for - //local node to get the chunk delivered - Code: 6, - Msg: &ChunkDeliveryMsg{ - Addr: chunkKey, - SData: chunkData, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - // wait for the chunk to get stored - storedChunk, err := localStore.Get(ctx, chunk.ModeGetRequest, chunkKey) - for err != nil { - select { - case <-ctx.Done(): - t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) - default: - } - storedChunk, err = localStore.Get(ctx, chunk.ModeGetRequest, chunkKey) - time.Sleep(50 * time.Millisecond) - } - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - if !bytes.Equal(storedChunk.Data(), chunkData) { - t.Fatal("Retrieved chunk has different data than original") - } - -} - -func TestDeliveryFromNodes(t *testing.T) { - testDeliveryFromNodes(t, 2, dataChunkCount, true) - testDeliveryFromNodes(t, 2, dataChunkCount, false) - testDeliveryFromNodes(t, 4, dataChunkCount, true) - testDeliveryFromNodes(t, 4, dataChunkCount, false) - - if testutil.RaceEnabled { - // Travis cannot handle more nodes with -race; would time out. - return - } - - testDeliveryFromNodes(t, 8, dataChunkCount, true) - testDeliveryFromNodes(t, 8, dataChunkCount, false) - testDeliveryFromNodes(t, 16, dataChunkCount, true) - testDeliveryFromNodes(t, 16, dataChunkCount, false) -} - -func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { - t.Helper() - t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) - if err != nil { - return nil, nil, err - } - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - Syncing: SyncingDisabled, - }, nil) - bucket.Store(bucketKeyRegistry, r) - - cleanup = func() { - r.Close() - clean() - } - - return r, cleanup, nil - }, - }) - defer sim.Close() - - log.Info("Adding nodes to simulation") - _, err := sim.AddNodesAndConnectChain(nodes) - if err != nil { - t.Fatal(err) - } - - log.Info("Starting simulation") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - //determine the pivot node to be the first node of the simulation - pivot := nodeIDs[0] - - //distribute chunks of a random file into Stores of nodes 1 to nodes - //we will do this by creating a file store with an underlying round-robin store: - //the file store will create a hash for the uploaded file, but every chunk will be - //distributed to different nodes via round-robin scheduling - log.Debug("Writing file to round-robin file store") - //to do this, we create an array for chunkstores (length minus one, the pivot node) - stores := make([]storage.ChunkStore, len(nodeIDs)-1) - //we then need to get all stores from the sim.... - lStores := sim.NodesItems(bucketKeyStore) - i := 0 - //...iterate the buckets... - for id, bucketVal := range lStores { - //...and remove the one which is the pivot node - if id == pivot { - continue - } - //the other ones are added to the array... - stores[i] = bucketVal.(storage.ChunkStore) - i++ - } - //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags()) - //now we can actually upload a (random) file to the round-robin store - size := chunkCount * chunkSize - log.Debug("Storing data to file store") - fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) - // wait until all chunks stored - if err != nil { - return err - } - err = wait(ctx) - if err != nil { - return err - } - - //get the pivot node's filestore - item, ok := sim.NodeItem(pivot, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No filestore") - } - pivotFileStore := item.(*storage.FileStore) - log.Debug("Starting retrieval routine") - retErrC := make(chan error) - go func() { - // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks - // we must wait for the peer connections to have started before requesting - n, err := readAll(pivotFileStore, fileHash) - log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) - retErrC <- err - }() - - disconnected := watchDisconnections(ctx, sim) - defer func() { - if err != nil && disconnected.bool() { - err = errors.New("disconnect events received") - } - }() - - //finally check that the pivot node gets all chunks via the root hash - log.Debug("Check retrieval") - success := true - var total int64 - total, err = readAll(pivotFileStore, fileHash) - if err != nil { - return err - } - log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) - if err != nil || total != int64(size) { - success = false - } - - if !success { - return fmt.Errorf("Test failed, chunks not available on all nodes") - } - if err := <-retErrC; err != nil { - return fmt.Errorf("requesting chunks: %v", err) - } - log.Debug("Test terminated successfully") - return nil - }) - if result.Error != nil { - t.Fatal(result.Error) - } - }) -} - -func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { - for chunks := 32; chunks <= 128; chunks *= 2 { - for i := 2; i < 32; i *= 2 { - b.Run( - fmt.Sprintf("nodes=%v,chunks=%v", i, chunks), - func(b *testing.B) { - benchmarkDeliveryFromNodes(b, i, chunks, true) - }, - ) - } - } -} - -func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { - for chunks := 32; chunks <= 128; chunks *= 2 { - for i := 2; i < 32; i *= 2 { - b.Run( - fmt.Sprintf("nodes=%v,chunks=%v", i, chunks), - func(b *testing.B) { - benchmarkDeliveryFromNodes(b, i, chunks, false) - }, - ) - } - } -} - -func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) - if err != nil { - return nil, nil, err - } - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - Syncing: SyncingDisabled, - SyncUpdateDelay: 0, - }, nil) - bucket.Store(bucketKeyRegistry, r) - - cleanup = func() { - r.Close() - clean() - } - - return r, cleanup, nil - }, - }) - defer sim.Close() - - log.Info("Initializing test config") - _, err := sim.AddNodesAndConnectChain(nodes) - if err != nil { - b.Fatal(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - node := nodeIDs[len(nodeIDs)-1] - - item, ok := sim.NodeItem(node, bucketKeyFileStore) - if !ok { - return errors.New("No filestore") - } - remoteFileStore := item.(*storage.FileStore) - - pivotNode := nodeIDs[0] - item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) - if !ok { - return errors.New("No filestore") - } - netStore := item.(*storage.NetStore) - - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } - - disconnected := watchDisconnections(ctx, sim) - defer func() { - if err != nil && disconnected.bool() { - err = errors.New("disconnect events received") - } - }() - // benchmark loop - b.ResetTimer() - b.StopTimer() - Loop: - for i := 0; i < b.N; i++ { - // uploading chunkCount random chunks to the last node - hashes := make([]storage.Address, chunkCount) - for i := 0; i < chunkCount; i++ { - // create actual size real chunks - ctx := context.TODO() - hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false) - if err != nil { - return fmt.Errorf("store: %v", err) - } - // wait until all chunks stored - err = wait(ctx) - if err != nil { - return fmt.Errorf("wait store: %v", err) - } - // collect the hashes - hashes[i] = hash - } - // now benchmark the actual retrieval - // netstore.Get is called for each hash in a go routine and errors are collected - b.StartTimer() - errs := make(chan error) - for _, hash := range hashes { - go func(h storage.Address) { - _, err := netStore.Get(ctx, chunk.ModeGetRequest, h) - log.Warn("test check netstore get", "hash", h, "err", err) - errs <- err - }(hash) - } - // count and report retrieval errors - // if there are misses then chunk timeout is too low for the distance and volume (?) - var total, misses int - for err := range errs { - if err != nil { - log.Warn(err.Error()) - misses++ - } - total++ - if total == chunkCount { - break - } - } - b.StopTimer() - - if misses > 0 { - err = fmt.Errorf("%v chunk not found out of %v", misses, total) - break Loop - } - } - return err - }) - if result.Error != nil { - b.Fatal(result.Error) - } - -} |