aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/snapshot_retrieval_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/snapshot_retrieval_test.go')
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go484
1 files changed, 0 insertions, 484 deletions
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
deleted file mode 100644
index 50617b5cf..000000000
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ /dev/null
@@ -1,484 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-package stream
-
-import (
- "bytes"
- "context"
- "fmt"
- "io"
- "sync"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/chunk"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network/simulation"
- "github.com/ethereum/go-ethereum/swarm/state"
- "github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/ethereum/go-ethereum/swarm/testutil"
-)
-
-// constants for random file generation
-const (
- minFileSize = 2
- maxFileSize = 40
-)
-
-// TestFileRetrieval is a retrieval test for nodes.
-// A configurable number of nodes can be
-// provided to the test.
-// Files are uploaded to nodes, other nodes try to retrieve the file
-// Number of nodes can be provided via commandline too.
-func TestFileRetrieval(t *testing.T) {
- var nodeCount []int
-
- if *nodes != 0 {
- nodeCount = []int{*nodes}
- } else {
- nodeCount = []int{16}
-
- if *longrunning {
- nodeCount = append(nodeCount, 32, 64)
- } else if testutil.RaceEnabled {
- nodeCount = []int{4}
- }
-
- }
-
- for _, nc := range nodeCount {
- runFileRetrievalTest(t, nc)
- }
-}
-
-// TestPureRetrieval tests pure retrieval without syncing
-// A configurable number of nodes and chunks
-// can be provided to the test.
-// A number of random chunks is generated, then stored directly in
-// each node's localstore according to their address.
-// Each chunk is supposed to end up at certain nodes
-// With retrieval we then make sure that every node can actually retrieve
-// the chunks.
-func TestPureRetrieval(t *testing.T) {
- var nodeCount []int
- var chunkCount []int
-
- if *nodes != 0 && *chunks != 0 {
- nodeCount = []int{*nodes}
- chunkCount = []int{*chunks}
- } else {
- nodeCount = []int{16}
- chunkCount = []int{150}
-
- if *longrunning {
- nodeCount = append(nodeCount, 32, 64)
- chunkCount = append(chunkCount, 32, 256)
- } else if testutil.RaceEnabled {
- nodeCount = []int{4}
- chunkCount = []int{4}
- }
-
- }
-
- for _, nc := range nodeCount {
- for _, c := range chunkCount {
- runPureRetrievalTest(t, nc, c)
- }
- }
-}
-
-// TestRetrieval tests retrieval of chunks by random nodes.
-// One node is randomly selected to be the pivot node.
-// A configurable number of chunks and nodes can be
-// provided to the test, the number of chunks is uploaded
-// to the pivot node and other nodes try to retrieve the chunk(s).
-// Number of chunks and nodes can be provided via commandline too.
-func TestRetrieval(t *testing.T) {
- // if nodes/chunks have been provided via commandline,
- // run the tests with these values
- if *nodes != 0 && *chunks != 0 {
- runRetrievalTest(t, *chunks, *nodes)
- } else {
- nodeCnt := []int{16}
- chnkCnt := []int{32}
-
- if *longrunning {
- nodeCnt = []int{16, 32, 64}
- chnkCnt = []int{4, 32, 256}
- } else if testutil.RaceEnabled {
- nodeCnt = []int{4}
- chnkCnt = []int{4}
- }
-
- for _, n := range nodeCnt {
- for _, c := range chnkCnt {
- t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
- runRetrievalTest(t, c, n)
- })
- }
- }
- }
-}
-
-var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
- if err != nil {
- return nil, nil, err
- }
-
- syncUpdateDelay := 1 * time.Second
- if *longrunning {
- syncUpdateDelay = 3 * time.Second
- }
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: syncUpdateDelay,
- }, nil)
-
- cleanup = func() {
- r.Close()
- clean()
- }
-
- return r, cleanup, nil
- },
-}
-
-// runPureRetrievalTest by uploading a snapshot,
-// then starting a simulation, distribute chunks to nodes
-// and start retrieval.
-// The snapshot should have 'streamer' in its service list.
-func runPureRetrievalTest(t *testing.T, nodeCount int, chunkCount int) {
-
- t.Helper()
- // the pure retrieval test needs a different service map, as we want
- // syncing disabled and we don't need to set the syncUpdateDelay
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
- if err != nil {
- return nil, nil, err
- }
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Syncing: SyncingDisabled,
- }, nil)
-
- cleanup = func() {
- r.Close()
- clean()
- }
-
- return r, cleanup, nil
- },
- },
- )
- defer sim.Close()
-
- log.Info("Initializing test config", "node count", nodeCount)
-
- conf := &synctestConfig{}
- //map of discover ID to indexes of chunks expected at that ID
- conf.idToChunksMap = make(map[enode.ID][]int)
- //map of overlay address to discover ID
- conf.addrToIDMap = make(map[string]enode.ID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
-
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
- defer cancelSimRun()
-
- filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
- err := sim.UploadSnapshot(ctx, filename)
- if err != nil {
- t.Fatal(err)
- }
-
- log.Info("Starting simulation")
-
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
- nodeIDs := sim.UpNodeIDs()
- // first iteration: create addresses
- for _, n := range nodeIDs {
- //get the kademlia overlay address from this ID
- a := n.Bytes()
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- //the proximity calculation is on overlay addr,
- //the p2p/simulations check func triggers on enode.ID,
- //so we need to know which overlay addr maps to which nodeID
- conf.addrToIDMap[string(a)] = n
- }
-
- // now create random chunks
- chunks := storage.GenerateRandomChunks(int64(chunkSize), chunkCount)
- for _, chunk := range chunks {
- conf.hashes = append(conf.hashes, chunk.Address())
- }
-
- log.Debug("random chunks generated, mapping keys to nodes")
-
- // map addresses to nodes
- mapKeysToNodes(conf)
-
- // second iteration: storing chunks at the peer whose
- // overlay address is closest to a particular chunk's hash
- log.Debug("storing every chunk at correspondent node store")
- for _, id := range nodeIDs {
- // for every chunk for this node (which are only indexes)...
- for _, ch := range conf.idToChunksMap[id] {
- item, ok := sim.NodeItem(id, bucketKeyStore)
- if !ok {
- return fmt.Errorf("Error accessing localstore")
- }
- lstore := item.(chunk.Store)
- // ...get the actual chunk
- for _, chnk := range chunks {
- if bytes.Equal(chnk.Address(), conf.hashes[ch]) {
- // ...and store it in the localstore
- if _, err = lstore.Put(ctx, chunk.ModePutUpload, chnk); err != nil {
- return err
- }
- }
- }
- }
- }
-
- // now try to retrieve every chunk from every node
- log.Debug("starting retrieval")
- cnt := 0
-
- for _, id := range nodeIDs {
- item, ok := sim.NodeItem(id, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- fileStore := item.(*storage.FileStore)
- for _, chunk := range chunks {
- reader, _ := fileStore.Retrieve(context.TODO(), chunk.Address())
- content := make([]byte, chunkSize)
- size, err := reader.Read(content)
- //check chunk size and content
- ok := true
- if err != io.EOF {
- log.Debug("Retrieve error", "err", err, "hash", chunk.Address(), "nodeId", id)
- ok = false
- }
- if size != chunkSize {
- log.Debug("size not equal chunkSize", "size", size, "hash", chunk.Address(), "nodeId", id)
- ok = false
- }
- // skip chunk "metadata" for chunk.Data()
- if !bytes.Equal(content, chunk.Data()[8:]) {
- log.Debug("content not equal chunk data", "hash", chunk.Address(), "nodeId", id)
- ok = false
- }
- if !ok {
- return fmt.Errorf("Expected test to succeed at first run, but failed with chunk not found")
- }
- log.Debug(fmt.Sprintf("chunk with root hash %x successfully retrieved", chunk.Address()))
- cnt++
- }
- }
- log.Info("retrieval terminated, chunks retrieved: ", "count", cnt)
- return nil
-
- })
-
- log.Info("Simulation terminated")
-
- if result.Error != nil {
- t.Fatal(result.Error)
- }
-}
-
-// runFileRetrievalTest loads a snapshot file to construct the swarm network.
-// The snapshot should have 'streamer' in its service list.
-func runFileRetrievalTest(t *testing.T, nodeCount int) {
-
- t.Helper()
-
- sim := simulation.New(retrievalSimServiceMap)
- defer sim.Close()
-
- log.Info("Initializing test config", "node count", nodeCount)
-
- conf := &synctestConfig{}
- //map of discover ID to indexes of chunks expected at that ID
- conf.idToChunksMap = make(map[enode.ID][]int)
- //map of overlay address to discover ID
- conf.addrToIDMap = make(map[string]enode.ID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
-
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
- defer cancelSimRun()
-
- filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
- err := sim.UploadSnapshot(ctx, filename)
- if err != nil {
- t.Fatal(err)
- }
-
- log.Info("Starting simulation")
-
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
- nodeIDs := sim.UpNodeIDs()
- for _, n := range nodeIDs {
- //get the kademlia overlay address from this ID
- a := n.Bytes()
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- //the proximity calculation is on overlay addr,
- //the p2p/simulations check func triggers on enode.ID,
- //so we need to know which overlay addr maps to which nodeID
- conf.addrToIDMap[string(a)] = n
- }
-
- //an array for the random files
- var randomFiles []string
-
- conf.hashes, randomFiles, err = uploadFilesToNodes(sim)
- if err != nil {
- return err
- }
-
- log.Info("network healthy, start file checks")
-
- // File retrieval check is repeated until all uploaded files are retrieved from all nodes
- // or until the timeout is reached.
- REPEAT:
- for {
- for _, id := range nodeIDs {
- //for each expected file, check if it is in the local store
- item, ok := sim.NodeItem(id, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- fileStore := item.(*storage.FileStore)
- //check all chunks
- for i, hash := range conf.hashes {
- reader, _ := fileStore.Retrieve(context.TODO(), hash)
- //check that we can read the file size and that it corresponds to the generated file size
- if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
- log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id)
- time.Sleep(500 * time.Millisecond)
- continue REPEAT
- }
- log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
- }
- }
- return nil
- }
- })
-
- log.Info("Simulation terminated")
-
- if result.Error != nil {
- t.Fatal(result.Error)
- }
-}
-
-// runRetrievalTest generates the given number of chunks.
-// The test loads a snapshot file to construct the swarm network.
-// The snapshot should have 'streamer' in its service list.
-func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) {
-
- t.Helper()
-
- sim := simulation.New(retrievalSimServiceMap)
- defer sim.Close()
-
- conf := &synctestConfig{}
- //map of discover ID to indexes of chunks expected at that ID
- conf.idToChunksMap = make(map[enode.ID][]int)
- //map of overlay address to discover ID
- conf.addrToIDMap = make(map[string]enode.ID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
-
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
- defer cancel()
-
- filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
- err := sim.UploadSnapshot(ctx, filename)
- if err != nil {
- t.Fatal(err)
- }
-
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
- nodeIDs := sim.UpNodeIDs()
- for _, n := range nodeIDs {
- //get the kademlia overlay address from this ID
- a := n.Bytes()
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- //the proximity calculation is on overlay addr,
- //the p2p/simulations check func triggers on enode.ID,
- //so we need to know which overlay addr maps to which nodeID
- conf.addrToIDMap[string(a)] = n
- }
-
- //this is the node selected for upload
- node := sim.Net.GetRandomUpNode()
- item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
- if !ok {
- return fmt.Errorf("No localstore")
- }
- lstore := item.(chunk.Store)
- conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
- if err != nil {
- return err
- }
-
- // File retrieval check is repeated until all uploaded files are retrieved from all nodes
- // or until the timeout is reached.
- REPEAT:
- for {
- for _, id := range nodeIDs {
- //for each expected chunk, check if it is in the local store
- //check on the node's FileStore (netstore)
- item, ok := sim.NodeItem(id, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- fileStore := item.(*storage.FileStore)
- //check all chunks
- for _, hash := range conf.hashes {
- reader, _ := fileStore.Retrieve(context.TODO(), hash)
- //check that we can read the chunk size and that it corresponds to the generated chunk size
- if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
- log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
- time.Sleep(500 * time.Millisecond)
- continue REPEAT
- }
- log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
- }
- }
- // all nodes and files found, exit loop and return without error
- return nil
- }
- })
-
- if result.Error != nil {
- t.Fatal(result.Error)
- }
-}