From 11d0ff6578c34b724436dbeeede726b31b41c8b8 Mon Sep 17 00:00:00 2001 From: holisticode <holistic.computing@gmail.com> Date: Mon, 8 Oct 2018 13:28:44 -0500 Subject: Fix retrieval tests and simulation backends (#17723) * swarm/network/stream: introduced visualized snapshot sync test * swarm/network/stream: non-existing hash visualization sim * swarm/network/stream: fixed retrieval tests; new backend for visualization * swarm/network/stream: cleanup of visualized_snapshot_sync_sim_test.go * swarm/network/stream: rebased PR on master * swarm/network/stream: fixed loop logic in retrieval tests * swarm/network/stream: fixed iterations for snapshot tests * swarm/network/stream: address PR comments * swarm/network/stream: addressed PR comments --- swarm/network/stream/snapshot_retrieval_test.go | 232 +++++++-------------- swarm/network/stream/snapshot_sync_test.go | 150 +++++++------ swarm/network/stream/streamer_test.go | 2 +- .../stream/visualized_snapshot_sync_sim_test.go | 225 ++++++++++++++++++++ 4 files changed, 388 insertions(+), 221 deletions(-) create mode 100644 swarm/network/stream/visualized_snapshot_sync_sim_test.go (limited to 'swarm') diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 6498f599d..09d915d48 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -104,8 +104,47 @@ func TestRetrieval(t *testing.T) { } } -/* +var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ + "streamer": retrievalStreamerFunc, +} + +func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + n := ctx.Config.Node() + addr := network.NewAddr(n) + store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 3 * time.Second, + DoRetrieve: true, + }) + + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + + return r, cleanup, nil +} +/* The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. Nevertheless a health check runs in the @@ -114,43 +153,7 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ func runFileRetrievalTest(nodeCount int) error { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - SyncUpdateDelay: 3 * time.Second, - }) - - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) - - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } - - return r, cleanup, nil - - }, - }) + sim := simulation.New(retrievalSimServiceMap) defer sim.Close() log.Info("Initializing test config") @@ -200,49 +203,29 @@ func runFileRetrievalTest(nodeCount int) error { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false - for !allSuccess { + REPEAT: + for { for _, id := range nodeIDs { - //for each expected chunk, check if it is in the local store - localChunks := conf.idToChunksMap[id] - localSuccess := true - for _, ch := range localChunks { - //get the real chunk by the index in the index array - chunk := conf.hashes[ch] - log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) - //check if the expected chunk is indeed in the localstore - var err error - //check on the node's FileStore (netstore) - item, ok := sim.NodeItem(id, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No registry") - } - fileStore := item.(*storage.FileStore) - //check all chunks - for i, hash := range conf.hashes { - reader, _ := fileStore.Retrieve(context.TODO(), hash) - //check that we can read the file size and that it corresponds to the generated file size - if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) { - allSuccess = false - log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) - } else { - log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) - } - } - if err != nil { - log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + //for each expected file, check if it is in the local store + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + //check all chunks + for i, hash := range conf.hashes { + reader, _ := fileStore.Retrieve(context.TODO(), hash) + //check that we can read the file size and that it corresponds to the generated file size + if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) { + log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) + time.Sleep(500 * time.Millisecond) + continue REPEAT } + log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) } - allSuccess = localSuccess } + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) if result.Error != nil { @@ -263,44 +246,7 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ func runRetrievalTest(chunkCount int, nodeCount int) error { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - SyncUpdateDelay: 0, - }) - - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucketKeyFileStore = simulation.BucketKey("filestore") - bucket.Store(bucketKeyFileStore, fileStore) - - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } - - return r, cleanup, nil - - }, - }) + sim := simulation.New(retrievalSimServiceMap) defer sim.Close() conf := &synctestConfig{} @@ -330,8 +276,6 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { conf.addrToIDMap[string(a)] = n } - //an array for the random files - var randomFiles []string //this is the node selected for upload node := sim.RandomUpNode() item, ok := sim.NodeItem(node.ID, bucketKeyStore) @@ -349,49 +293,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false - for !allSuccess { + REPEAT: + for { for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store - localChunks := conf.idToChunksMap[id] - localSuccess := true - for _, ch := range localChunks { - //get the real chunk by the index in the index array - chunk := conf.hashes[ch] - log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) - //check if the expected chunk is indeed in the localstore - var err error - //check on the node's FileStore (netstore) - item, ok := sim.NodeItem(id, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No registry") - } - fileStore := item.(*storage.FileStore) - //check all chunks - for i, hash := range conf.hashes { - reader, _ := fileStore.Retrieve(context.TODO(), hash) - //check that we can read the file size and that it corresponds to the generated file size - if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) { - allSuccess = false - log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) - } else { - log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) - } - } - if err != nil { - log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + //check on the node's FileStore (netstore) + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + //check all chunks + for _, hash := range conf.hashes { + reader, _ := fileStore.Retrieve(context.TODO(), hash) + //check that we can read the chunk size and that it corresponds to the generated chunk size + if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) { + log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s) + time.Sleep(500 * time.Millisecond) + continue REPEAT } + log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash)) } - allSuccess = localSuccess } + // all nodes and files found, exit loop and return without error + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) if result.Error != nil { diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index d93afce1b..0d5849487 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" @@ -50,6 +51,17 @@ type synctestConfig struct { addrToIDMap map[string]enode.ID } +const ( + // EventTypeNode is the type of event emitted when a node is either + // created, started or stopped + EventTypeChunkCreated simulations.EventType = "chunkCreated" + EventTypeChunkOffered simulations.EventType = "chunkOffered" + EventTypeChunkWanted simulations.EventType = "chunkWanted" + EventTypeChunkDelivered simulations.EventType = "chunkDelivered" + EventTypeChunkArrived simulations.EventType = "chunkArrived" + EventTypeSimTerminated simulations.EventType = "simTerminated" +) + // Tests in this file should not request chunks from peers. // This function will panic indicating that there is a problem if request has been made. func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { @@ -131,41 +143,46 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { } } -func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New +var simServiceMap = map[string]simulation.ServiceFunc{ + "streamer": streamerFunc, +} - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - SyncUpdateDelay: 3 * time.Second, - }) - bucket.Store(bucketKeyRegistry, r) +func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + n := ctx.Config.Node() + addr := network.NewAddr(n) + store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 3 * time.Second, + }) - return r, cleanup, nil + bucket.Store(bucketKeyRegistry, r) - }, - }) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + + return r, cleanup, nil + +} + +func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { + sim := simulation.New(simServiceMap) defer sim.Close() log.Info("Initializing test config") @@ -204,7 +221,17 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } }() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + result := runSim(conf, ctx, sim, chunkCount) + + if result.Error != nil { + t.Fatal(result.Error) + } + log.Info("Simulation ended") +} + +func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result { + + return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { //get the kademlia overlay address from this ID @@ -229,12 +256,19 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if err != nil { return err } + for _, h := range hashes { + evt := &simulations.Event{ + Type: EventTypeChunkCreated, + Node: sim.Net.GetNode(node.ID), + Data: h.String(), + } + sim.Net.Events().Send(evt) + } conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false var gDir string var globalStore *mockdb.GlobalStore if *useMockStore { @@ -250,12 +284,11 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } }() } - for !allSuccess { - allSuccess = true + REPEAT: + for { for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] - localSuccess := true for _, ch := range localChunks { //get the real chunk by the index in the index array chunk := conf.hashes[ch] @@ -277,29 +310,22 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false // Do not get crazy with logging the warn message time.Sleep(500 * time.Millisecond) - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + continue REPEAT } - } - if !localSuccess { - allSuccess = false - break + evt := &simulations.Event{ + Type: EventTypeChunkArrived, + Node: sim.Net.GetNode(id), + Data: chunk.String(), + } + sim.Net.Events().Send(evt) + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) - - if result.Error != nil { - t.Fatal(result.Error) - } - log.Info("Simulation ended") } /* @@ -459,13 +485,11 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) } // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false - for !allSuccess { - allSuccess = true + REPEAT: + for { for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] - localSuccess := true for _, ch := range localChunks { //get the real chunk by the index in the index array chunk := conf.hashes[ch] @@ -487,23 +511,15 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false // Do not get crazy with logging the warn message time.Sleep(500 * time.Millisecond) - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + continue REPEAT } - } - if !localSuccess { - allSuccess = false - break + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) if result.Error != nil { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 0bdebefa7..5d91eecfd 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -522,7 +522,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { } expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") - if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil { + if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } } diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go new file mode 100644 index 000000000..437c17e5e --- /dev/null +++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go @@ -0,0 +1,225 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build withserver + +package stream + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +/* +The tests in this file need to be executed with + + -tags=withserver + +Also, they will stall if executed stand-alone, because they wait +for the visualization frontend to send a POST /runsim message. +*/ + +//setup the sim, evaluate nodeCount and chunkCount and create the sim +func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulation.Simulation) { + nodeCount := *nodes + chunkCount := *chunks + + if nodeCount == 0 || chunkCount == 0 { + nodeCount = 32 + chunkCount = 1 + } + + //setup the simulation with server, which means the sim won't run + //until it receives a POST /runsim from the frontend + sim := simulation.New(serviceMap).WithServer(":8888") + return nodeCount, chunkCount, sim +} + +//watch for disconnections and wait for healthy +func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) { + ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + panic(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + panic("unexpected disconnect") + cancelSimRun() + } + }() + + return ctx, cancelSimRun +} + +//This test requests bogus hashes into the network +func TestNonExistingHashesWithServer(t *testing.T) { + nodeCount, _, sim := setupSim(retrievalSimServiceMap) + defer sim.Close() + + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + if err != nil { + panic(err) + } + + ctx, cancelSimRun := watchSim(sim) + defer cancelSimRun() + + //in order to get some meaningful visualization, it is beneficial + //to define a minimum duration of this test + testDuration := 20 * time.Second + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + //check on the node's FileStore (netstore) + id := sim.RandomUpNode().ID + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + t.Fatalf("No filestore") + } + fileStore := item.(*storage.FileStore) + //create a bogus hash + fakeHash := storage.GenerateRandomChunk(1000).Address() + //try to retrieve it - will propagate RetrieveRequestMsg into the network + reader, _ := fileStore.Retrieve(context.TODO(), fakeHash) + if _, err := reader.Size(ctx, nil); err != nil { + log.Debug("expected error for non-existing chunk") + } + //sleep so that the frontend can have something to display + time.Sleep(testDuration) + + return nil + }) + if result.Error != nil { + sendSimTerminatedEvent(sim) + t.Fatal(result.Error) + } + + sendSimTerminatedEvent(sim) + +} + +//send a termination event to the frontend +func sendSimTerminatedEvent(sim *simulation.Simulation) { + evt := &simulations.Event{ + Type: EventTypeSimTerminated, + Control: false, + } + sim.Net.Events().Send(evt) +} + +//This test is the same as the snapshot sync test, +//but with a HTTP server +//It also sends some custom events so that the frontend +//can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg +func TestSnapshotSyncWithServer(t *testing.T) { + + nodeCount, chunkCount, sim := setupSim(simServiceMap) + defer sim.Close() + + log.Info("Initializing test config") + + conf := &synctestConfig{} + //map of discover ID to indexes of chunks expected at that ID + conf.idToChunksMap = make(map[discover.NodeID][]int) + //map of overlay address to discover ID + conf.addrToIDMap = make(map[string]discover.NodeID) + //array where the generated chunk hashes will be stored + conf.hashes = make([]storage.Address, 0) + + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + if err != nil { + panic(err) + } + + ctx, cancelSimRun := watchSim(sim) + defer cancelSimRun() + + //setup filters in the event feed + offeredHashesFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(1) + wantedFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(2) + deliveryFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(6) + eventC := sim.PeerEvents(ctx, sim.UpNodeIDs(), offeredHashesFilter, wantedFilter, deliveryFilter) + + quit := make(chan struct{}) + + go func() { + for e := range eventC { + select { + case <-quit: + fmt.Println("quitting event loop") + return + default: + } + if e.Error != nil { + t.Fatal(e.Error) + } + if *e.Event.MsgCode == uint64(1) { + evt := &simulations.Event{ + Type: EventTypeChunkOffered, + Node: sim.Net.GetNode(e.NodeID), + Control: false, + } + sim.Net.Events().Send(evt) + } else if *e.Event.MsgCode == uint64(2) { + evt := &simulations.Event{ + Type: EventTypeChunkWanted, + Node: sim.Net.GetNode(e.NodeID), + Control: false, + } + sim.Net.Events().Send(evt) + } else if *e.Event.MsgCode == uint64(6) { + evt := &simulations.Event{ + Type: EventTypeChunkDelivered, + Node: sim.Net.GetNode(e.NodeID), + Control: false, + } + sim.Net.Events().Send(evt) + } + } + }() + //run the sim + result := runSim(conf, ctx, sim, chunkCount) + + //send terminated event + evt := &simulations.Event{ + Type: EventTypeSimTerminated, + Control: false, + } + sim.Net.Events().Send(evt) + + if result.Error != nil { + panic(result.Error) + } + close(quit) + log.Info("Simulation ended") +} -- cgit v1.2.3