From 11d0ff6578c34b724436dbeeede726b31b41c8b8 Mon Sep 17 00:00:00 2001
From: holisticode <holistic.computing@gmail.com>
Date: Mon, 8 Oct 2018 13:28:44 -0500
Subject: Fix retrieval tests and simulation backends (#17723)

* swarm/network/stream: introduced visualized snapshot sync test

* swarm/network/stream: non-existing hash visualization sim

* swarm/network/stream: fixed retrieval tests; new backend for visualization

* swarm/network/stream: cleanup of visualized_snapshot_sync_sim_test.go

* swarm/network/stream: rebased PR on master

* swarm/network/stream: fixed loop logic in retrieval tests

* swarm/network/stream: fixed iterations for snapshot tests

* swarm/network/stream: address PR comments

* swarm/network/stream: addressed PR comments
---
 swarm/network/stream/snapshot_retrieval_test.go    | 232 +++++++--------------
 swarm/network/stream/snapshot_sync_test.go         | 150 +++++++------
 swarm/network/stream/streamer_test.go              |   2 +-
 .../stream/visualized_snapshot_sync_sim_test.go    | 225 ++++++++++++++++++++
 4 files changed, 388 insertions(+), 221 deletions(-)
 create mode 100644 swarm/network/stream/visualized_snapshot_sync_sim_test.go

(limited to 'swarm')

diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 6498f599d..09d915d48 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -104,8 +104,47 @@ func TestRetrieval(t *testing.T) {
 	}
 }
 
-/*
+var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
+	"streamer": retrievalStreamerFunc,
+}
+
+func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+	n := ctx.Config.Node()
+	addr := network.NewAddr(n)
+	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+	if err != nil {
+		return nil, nil, err
+	}
+	bucket.Store(bucketKeyStore, store)
+
+	localStore := store.(*storage.LocalStore)
+	netStore, err := storage.NewNetStore(localStore, nil)
+	if err != nil {
+		return nil, nil, err
+	}
+	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+	delivery := NewDelivery(kad, netStore)
+	netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+		DoSync:          true,
+		SyncUpdateDelay: 3 * time.Second,
+		DoRetrieve:      true,
+	})
+
+	fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+	bucket.Store(bucketKeyFileStore, fileStore)
+
+	cleanup = func() {
+		os.RemoveAll(datadir)
+		netStore.Close()
+		r.Close()
+	}
+
+	return r, cleanup, nil
+}
 
+/*
 The test loads a snapshot file to construct the swarm network,
 assuming that the snapshot file identifies a healthy
 kademlia network. Nevertheless a health check runs in the
@@ -114,43 +153,7 @@ simulation's `action` function.
 The snapshot should have 'streamer' in its service list.
 */
 func runFileRetrievalTest(nodeCount int) error {
-	sim := simulation.New(map[string]simulation.ServiceFunc{
-		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyStore, store)
-
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-				DoSync:          true,
-				SyncUpdateDelay: 3 * time.Second,
-			})
-
-			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-			bucket.Store(bucketKeyFileStore, fileStore)
-
-			cleanup = func() {
-				os.RemoveAll(datadir)
-				netStore.Close()
-				r.Close()
-			}
-
-			return r, cleanup, nil
-
-		},
-	})
+	sim := simulation.New(retrievalSimServiceMap)
 	defer sim.Close()
 
 	log.Info("Initializing test config")
@@ -200,49 +203,29 @@ func runFileRetrievalTest(nodeCount int) error {
 
 		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 		// or until the timeout is reached.
-		allSuccess := false
-		for !allSuccess {
+	REPEAT:
+		for {
 			for _, id := range nodeIDs {
-				//for each expected chunk, check if it is in the local store
-				localChunks := conf.idToChunksMap[id]
-				localSuccess := true
-				for _, ch := range localChunks {
-					//get the real chunk by the index in the index array
-					chunk := conf.hashes[ch]
-					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
-					//check if the expected chunk is indeed in the localstore
-					var err error
-					//check on the node's FileStore (netstore)
-					item, ok := sim.NodeItem(id, bucketKeyFileStore)
-					if !ok {
-						return fmt.Errorf("No registry")
-					}
-					fileStore := item.(*storage.FileStore)
-					//check all chunks
-					for i, hash := range conf.hashes {
-						reader, _ := fileStore.Retrieve(context.TODO(), hash)
-						//check that we can read the file size and that it corresponds to the generated file size
-						if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
-							allSuccess = false
-							log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
-						} else {
-							log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
-						}
-					}
-					if err != nil {
-						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
-						localSuccess = false
-					} else {
-						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+				//for each expected file, check if it is in the local store
+				item, ok := sim.NodeItem(id, bucketKeyFileStore)
+				if !ok {
+					return fmt.Errorf("No filestore")
+				}
+				fileStore := item.(*storage.FileStore)
+				//check all chunks
+				for i, hash := range conf.hashes {
+					reader, _ := fileStore.Retrieve(context.TODO(), hash)
+					//check that we can read the file size and that it corresponds to the generated file size
+					if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
+						log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
+						time.Sleep(500 * time.Millisecond)
+						continue REPEAT
 					}
+					log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
 				}
-				allSuccess = localSuccess
 			}
+			return nil
 		}
-		if !allSuccess {
-			return fmt.Errorf("Not all chunks succeeded!")
-		}
-		return nil
 	})
 
 	if result.Error != nil {
@@ -263,44 +246,7 @@ simulation's `action` function.
 The snapshot should have 'streamer' in its service list.
 */
 func runRetrievalTest(chunkCount int, nodeCount int) error {
-	sim := simulation.New(map[string]simulation.ServiceFunc{
-		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyStore, store)
-
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-				DoSync:          true,
-				SyncUpdateDelay: 0,
-			})
-
-			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-			bucketKeyFileStore = simulation.BucketKey("filestore")
-			bucket.Store(bucketKeyFileStore, fileStore)
-
-			cleanup = func() {
-				os.RemoveAll(datadir)
-				netStore.Close()
-				r.Close()
-			}
-
-			return r, cleanup, nil
-
-		},
-	})
+	sim := simulation.New(retrievalSimServiceMap)
 	defer sim.Close()
 
 	conf := &synctestConfig{}
@@ -330,8 +276,6 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
 			conf.addrToIDMap[string(a)] = n
 		}
 
-		//an array for the random files
-		var randomFiles []string
 		//this is the node selected for upload
 		node := sim.RandomUpNode()
 		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
@@ -349,49 +293,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
 
 		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 		// or until the timeout is reached.
-		allSuccess := false
-		for !allSuccess {
+	REPEAT:
+		for {
 			for _, id := range nodeIDs {
 				//for each expected chunk, check if it is in the local store
-				localChunks := conf.idToChunksMap[id]
-				localSuccess := true
-				for _, ch := range localChunks {
-					//get the real chunk by the index in the index array
-					chunk := conf.hashes[ch]
-					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
-					//check if the expected chunk is indeed in the localstore
-					var err error
-					//check on the node's FileStore (netstore)
-					item, ok := sim.NodeItem(id, bucketKeyFileStore)
-					if !ok {
-						return fmt.Errorf("No registry")
-					}
-					fileStore := item.(*storage.FileStore)
-					//check all chunks
-					for i, hash := range conf.hashes {
-						reader, _ := fileStore.Retrieve(context.TODO(), hash)
-						//check that we can read the file size and that it corresponds to the generated file size
-						if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
-							allSuccess = false
-							log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
-						} else {
-							log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
-						}
-					}
-					if err != nil {
-						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
-						localSuccess = false
-					} else {
-						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+				//check on the node's FileStore (netstore)
+				item, ok := sim.NodeItem(id, bucketKeyFileStore)
+				if !ok {
+					return fmt.Errorf("No filestore")
+				}
+				fileStore := item.(*storage.FileStore)
+				//check all chunks
+				for _, hash := range conf.hashes {
+					reader, _ := fileStore.Retrieve(context.TODO(), hash)
+					//check that we can read the chunk size and that it corresponds to the generated chunk size
+					if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
+						log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
+						time.Sleep(500 * time.Millisecond)
+						continue REPEAT
 					}
+					log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
 				}
-				allSuccess = localSuccess
 			}
+			// all nodes and files found, exit loop and return without error
+			return nil
 		}
-		if !allSuccess {
-			return fmt.Errorf("Not all chunks succeeded!")
-		}
-		return nil
 	})
 
 	if result.Error != nil {
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index d93afce1b..0d5849487 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -31,6 +31,7 @@ import (
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
+	"github.com/ethereum/go-ethereum/p2p/simulations"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
@@ -50,6 +51,17 @@ type synctestConfig struct {
 	addrToIDMap map[string]enode.ID
 }
 
+const (
+	// EventTypeNode is the type of event emitted when a node is either
+	// created, started or stopped
+	EventTypeChunkCreated   simulations.EventType = "chunkCreated"
+	EventTypeChunkOffered   simulations.EventType = "chunkOffered"
+	EventTypeChunkWanted    simulations.EventType = "chunkWanted"
+	EventTypeChunkDelivered simulations.EventType = "chunkDelivered"
+	EventTypeChunkArrived   simulations.EventType = "chunkArrived"
+	EventTypeSimTerminated  simulations.EventType = "simTerminated"
+)
+
 // Tests in this file should not request chunks from peers.
 // This function will panic indicating that there is a problem if request has been made.
 func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
@@ -131,41 +143,46 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
 	}
 }
 
-func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
-	sim := simulation.New(map[string]simulation.ServiceFunc{
-		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			n := ctx.Config.Node()
-			addr := network.NewAddr(n)
-			store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyStore, store)
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
+var simServiceMap = map[string]simulation.ServiceFunc{
+	"streamer": streamerFunc,
+}
 
-			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-				DoSync:          true,
-				SyncUpdateDelay: 3 * time.Second,
-			})
-			bucket.Store(bucketKeyRegistry, r)
+func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+	n := ctx.Config.Node()
+	addr := network.NewAddr(n)
+	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+	if err != nil {
+		return nil, nil, err
+	}
+	bucket.Store(bucketKeyStore, store)
+	localStore := store.(*storage.LocalStore)
+	netStore, err := storage.NewNetStore(localStore, nil)
+	if err != nil {
+		return nil, nil, err
+	}
+	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+	delivery := NewDelivery(kad, netStore)
+	netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
 
-			cleanup = func() {
-				os.RemoveAll(datadir)
-				netStore.Close()
-				r.Close()
-			}
+	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+		DoSync:          true,
+		SyncUpdateDelay: 3 * time.Second,
+	})
 
-			return r, cleanup, nil
+	bucket.Store(bucketKeyRegistry, r)
 
-		},
-	})
+	cleanup = func() {
+		os.RemoveAll(datadir)
+		netStore.Close()
+		r.Close()
+	}
+
+	return r, cleanup, nil
+
+}
+
+func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
+	sim := simulation.New(simServiceMap)
 	defer sim.Close()
 
 	log.Info("Initializing test config")
@@ -204,7 +221,17 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 		}
 	}()
 
-	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+	result := runSim(conf, ctx, sim, chunkCount)
+
+	if result.Error != nil {
+		t.Fatal(result.Error)
+	}
+	log.Info("Simulation ended")
+}
+
+func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
+
+	return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
 		nodeIDs := sim.UpNodeIDs()
 		for _, n := range nodeIDs {
 			//get the kademlia overlay address from this ID
@@ -229,12 +256,19 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 		if err != nil {
 			return err
 		}
+		for _, h := range hashes {
+			evt := &simulations.Event{
+				Type: EventTypeChunkCreated,
+				Node: sim.Net.GetNode(node.ID),
+				Data: h.String(),
+			}
+			sim.Net.Events().Send(evt)
+		}
 		conf.hashes = append(conf.hashes, hashes...)
 		mapKeysToNodes(conf)
 
 		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 		// or until the timeout is reached.
-		allSuccess := false
 		var gDir string
 		var globalStore *mockdb.GlobalStore
 		if *useMockStore {
@@ -250,12 +284,11 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 				}
 			}()
 		}
-		for !allSuccess {
-			allSuccess = true
+	REPEAT:
+		for {
 			for _, id := range nodeIDs {
 				//for each expected chunk, check if it is in the local store
 				localChunks := conf.idToChunksMap[id]
-				localSuccess := true
 				for _, ch := range localChunks {
 					//get the real chunk by the index in the index array
 					chunk := conf.hashes[ch]
@@ -277,29 +310,22 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 					}
 					if err != nil {
 						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
-						localSuccess = false
 						// Do not get crazy with logging the warn message
 						time.Sleep(500 * time.Millisecond)
-					} else {
-						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+						continue REPEAT
 					}
-				}
-				if !localSuccess {
-					allSuccess = false
-					break
+					evt := &simulations.Event{
+						Type: EventTypeChunkArrived,
+						Node: sim.Net.GetNode(id),
+						Data: chunk.String(),
+					}
+					sim.Net.Events().Send(evt)
+					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 				}
 			}
+			return nil
 		}
-		if !allSuccess {
-			return fmt.Errorf("Not all chunks succeeded!")
-		}
-		return nil
 	})
-
-	if result.Error != nil {
-		t.Fatal(result.Error)
-	}
-	log.Info("Simulation ended")
 }
 
 /*
@@ -459,13 +485,11 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 		}
 		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 		// or until the timeout is reached.
-		allSuccess := false
-		for !allSuccess {
-			allSuccess = true
+	REPEAT:
+		for {
 			for _, id := range nodeIDs {
 				//for each expected chunk, check if it is in the local store
 				localChunks := conf.idToChunksMap[id]
-				localSuccess := true
 				for _, ch := range localChunks {
 					//get the real chunk by the index in the index array
 					chunk := conf.hashes[ch]
@@ -487,23 +511,15 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 					}
 					if err != nil {
 						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
-						localSuccess = false
 						// Do not get crazy with logging the warn message
 						time.Sleep(500 * time.Millisecond)
-					} else {
-						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+						continue REPEAT
 					}
-				}
-				if !localSuccess {
-					allSuccess = false
-					break
+					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 				}
 			}
+			return nil
 		}
-		if !allSuccess {
-			return fmt.Errorf("Not all chunks succeeded!")
-		}
-		return nil
 	})
 
 	if result.Error != nil {
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 0bdebefa7..5d91eecfd 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -522,7 +522,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
 	}
 
 	expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
-	if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil {
+	if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
 		t.Fatal(err)
 	}
 }
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
new file mode 100644
index 000000000..437c17e5e
--- /dev/null
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -0,0 +1,225 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build withserver
+
+package stream
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+	"github.com/ethereum/go-ethereum/p2p/simulations"
+	"github.com/ethereum/go-ethereum/swarm/log"
+	"github.com/ethereum/go-ethereum/swarm/network/simulation"
+	"github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+/*
+The tests in this file need to be executed with
+
+			-tags=withserver
+
+Also, they will stall if executed stand-alone, because they wait
+for the visualization frontend to send a POST /runsim message.
+*/
+
+//setup the sim, evaluate nodeCount and chunkCount and create the sim
+func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulation.Simulation) {
+	nodeCount := *nodes
+	chunkCount := *chunks
+
+	if nodeCount == 0 || chunkCount == 0 {
+		nodeCount = 32
+		chunkCount = 1
+	}
+
+	//setup the simulation with server, which means the sim won't run
+	//until it receives a POST /runsim from the frontend
+	sim := simulation.New(serviceMap).WithServer(":8888")
+	return nodeCount, chunkCount, sim
+}
+
+//watch for disconnections and wait for healthy
+func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) {
+	ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+
+	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+		panic(err)
+	}
+
+	disconnections := sim.PeerEvents(
+		context.Background(),
+		sim.NodeIDs(),
+		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+	)
+
+	go func() {
+		for d := range disconnections {
+			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+			panic("unexpected disconnect")
+			cancelSimRun()
+		}
+	}()
+
+	return ctx, cancelSimRun
+}
+
+//This test requests bogus hashes into the network
+func TestNonExistingHashesWithServer(t *testing.T) {
+	nodeCount, _, sim := setupSim(retrievalSimServiceMap)
+	defer sim.Close()
+
+	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+	if err != nil {
+		panic(err)
+	}
+
+	ctx, cancelSimRun := watchSim(sim)
+	defer cancelSimRun()
+
+	//in order to get some meaningful visualization, it is beneficial
+	//to define a minimum duration of this test
+	testDuration := 20 * time.Second
+
+	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+		//check on the node's FileStore (netstore)
+		id := sim.RandomUpNode().ID
+		item, ok := sim.NodeItem(id, bucketKeyFileStore)
+		if !ok {
+			t.Fatalf("No filestore")
+		}
+		fileStore := item.(*storage.FileStore)
+		//create a bogus hash
+		fakeHash := storage.GenerateRandomChunk(1000).Address()
+		//try to retrieve it - will propagate RetrieveRequestMsg into the network
+		reader, _ := fileStore.Retrieve(context.TODO(), fakeHash)
+		if _, err := reader.Size(ctx, nil); err != nil {
+			log.Debug("expected error for non-existing chunk")
+		}
+		//sleep so that the frontend can have something to display
+		time.Sleep(testDuration)
+
+		return nil
+	})
+	if result.Error != nil {
+		sendSimTerminatedEvent(sim)
+		t.Fatal(result.Error)
+	}
+
+	sendSimTerminatedEvent(sim)
+
+}
+
+//send a termination event to the frontend
+func sendSimTerminatedEvent(sim *simulation.Simulation) {
+	evt := &simulations.Event{
+		Type:    EventTypeSimTerminated,
+		Control: false,
+	}
+	sim.Net.Events().Send(evt)
+}
+
+//This test is the same as the snapshot sync test,
+//but with a HTTP server
+//It also sends some custom events so that the frontend
+//can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg
+func TestSnapshotSyncWithServer(t *testing.T) {
+
+	nodeCount, chunkCount, sim := setupSim(simServiceMap)
+	defer sim.Close()
+
+	log.Info("Initializing test config")
+
+	conf := &synctestConfig{}
+	//map of discover ID to indexes of chunks expected at that ID
+	conf.idToChunksMap = make(map[discover.NodeID][]int)
+	//map of overlay address to discover ID
+	conf.addrToIDMap = make(map[string]discover.NodeID)
+	//array where the generated chunk hashes will be stored
+	conf.hashes = make([]storage.Address, 0)
+
+	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+	if err != nil {
+		panic(err)
+	}
+
+	ctx, cancelSimRun := watchSim(sim)
+	defer cancelSimRun()
+
+	//setup filters in the event feed
+	offeredHashesFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(1)
+	wantedFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(2)
+	deliveryFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(6)
+	eventC := sim.PeerEvents(ctx, sim.UpNodeIDs(), offeredHashesFilter, wantedFilter, deliveryFilter)
+
+	quit := make(chan struct{})
+
+	go func() {
+		for e := range eventC {
+			select {
+			case <-quit:
+				fmt.Println("quitting event loop")
+				return
+			default:
+			}
+			if e.Error != nil {
+				t.Fatal(e.Error)
+			}
+			if *e.Event.MsgCode == uint64(1) {
+				evt := &simulations.Event{
+					Type:    EventTypeChunkOffered,
+					Node:    sim.Net.GetNode(e.NodeID),
+					Control: false,
+				}
+				sim.Net.Events().Send(evt)
+			} else if *e.Event.MsgCode == uint64(2) {
+				evt := &simulations.Event{
+					Type:    EventTypeChunkWanted,
+					Node:    sim.Net.GetNode(e.NodeID),
+					Control: false,
+				}
+				sim.Net.Events().Send(evt)
+			} else if *e.Event.MsgCode == uint64(6) {
+				evt := &simulations.Event{
+					Type:    EventTypeChunkDelivered,
+					Node:    sim.Net.GetNode(e.NodeID),
+					Control: false,
+				}
+				sim.Net.Events().Send(evt)
+			}
+		}
+	}()
+	//run the sim
+	result := runSim(conf, ctx, sim, chunkCount)
+
+	//send terminated event
+	evt := &simulations.Event{
+		Type:    EventTypeSimTerminated,
+		Control: false,
+	}
+	sim.Net.Events().Send(evt)
+
+	if result.Error != nil {
+		panic(result.Error)
+	}
+	close(quit)
+	log.Info("Simulation ended")
+}
-- 
cgit v1.2.3