diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 337 |
1 files changed, 138 insertions, 199 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 70d3829b3..49e4a423a 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -21,9 +21,7 @@ import ( "context" "errors" "fmt" - "os" "sync" - "sync/atomic" "testing" "time" @@ -48,11 +46,11 @@ func TestStreamerRetrieveRequest(t *testing.T) { Retrieval: RetrievalClientOnly, Syncing: SyncingDisabled, } - tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(regOpts) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -97,14 +95,14 @@ func TestStreamerRetrieveRequest(t *testing.T) { //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, //do no syncing }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -169,14 +167,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -359,14 +357,14 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { } func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { return &testClient{ @@ -455,164 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, - }, nil) - bucket.Store(bucketKeyRegistry, r) - - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + t.Helper() + t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } - return r, cleanup, nil + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, + }, nil) + bucket.Store(bucketKeyRegistry, r) - }, - }) - defer sim.Close() + cleanup = func() { + r.Close() + clean() + } - log.Info("Adding nodes to simulation") - _, err := sim.AddNodesAndConnectChain(nodes) - if err != nil { - t.Fatal(err) - } + return r, cleanup, nil + }, + }) + defer sim.Close() - log.Info("Starting simulation") - ctx := context.Background() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - //determine the pivot node to be the first node of the simulation - pivot := nodeIDs[0] - - //distribute chunks of a random file into Stores of nodes 1 to nodes - //we will do this by creating a file store with an underlying round-robin store: - //the file store will create a hash for the uploaded file, but every chunk will be - //distributed to different nodes via round-robin scheduling - log.Debug("Writing file to round-robin file store") - //to do this, we create an array for chunkstores (length minus one, the pivot node) - stores := make([]storage.ChunkStore, len(nodeIDs)-1) - //we then need to get all stores from the sim.... - lStores := sim.NodesItems(bucketKeyStore) - i := 0 - //...iterate the buckets... - for id, bucketVal := range lStores { - //...and remove the one which is the pivot node - if id == pivot { - continue - } - //the other ones are added to the array... - stores[i] = bucketVal.(storage.ChunkStore) - i++ - } - //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) - //now we can actually upload a (random) file to the round-robin store - size := chunkCount * chunkSize - log.Debug("Storing data to file store") - fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) - // wait until all chunks stored - if err != nil { - return err - } - err = wait(ctx) + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - return err + t.Fatal(err) } - log.Debug("Waiting for kademlia") - // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } - - //get the pivot node's filestore - item, ok := sim.NodeItem(pivot, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No filestore") - } - pivotFileStore := item.(*storage.FileStore) - log.Debug("Starting retrieval routine") - retErrC := make(chan error) - go func() { - // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks - // we must wait for the peer connections to have started before requesting - n, err := readAll(pivotFileStore, fileHash) - log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) - retErrC <- err - }() - - log.Debug("Watching for disconnections") - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) + log.Info("Starting simulation") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + pivot := nodeIDs[0] + + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) + i := 0 + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == pivot { + continue } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) + i++ } - }() - defer func() { + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { + return err + } + + log.Debug("Waiting for kademlia") + // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias + if _, err := sim.WaitTillHealthy(ctx); err != nil { + return err + } + + //get the pivot node's filestore + item, ok := sim.NodeItem(pivot, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") + retErrC := make(chan error) + go func() { + // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks + // we must wait for the peer connections to have started before requesting + n, err := readAll(pivotFileStore, fileHash) + log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) + retErrC <- err + }() + + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { err = errors.New("disconnect events received") } - } - }() + }() - //finally check that the pivot node gets all chunks via the root hash - log.Debug("Check retrieval") - success := true - var total int64 - total, err = readAll(pivotFileStore, fileHash) - if err != nil { - return err - } - log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) - if err != nil || total != int64(size) { - success = false - } + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true + var total int64 + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } + log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) + if err != nil || total != int64(size) { + success = false + } - if !success { - return fmt.Errorf("Test failed, chunks not available on all nodes") - } - if err := <-retErrC; err != nil { - t.Fatalf("requesting chunks: %v", err) + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + if err := <-retErrC; err != nil { + return fmt.Errorf("requesting chunks: %v", err) + } + log.Debug("Test terminated successfully") + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) } - log.Debug("Test terminated successfully") - return nil }) - if result.Error != nil { - t.Fatal(result.Error) - } } func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { @@ -644,25 +614,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, @@ -670,12 +625,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) + bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -686,21 +643,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b b.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { nodeIDs := sim.UpNodeIDs() node := nodeIDs[len(nodeIDs)-1] item, ok := sim.NodeItem(node, bucketKeyFileStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } remoteFileStore := item.(*storage.FileStore) pivotNode := nodeIDs[0] item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } netStore := item.(*storage.NetStore) @@ -708,26 +666,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b return err } - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() // benchmark loop @@ -742,12 +684,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b ctx := context.TODO() hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("store: %v", err) } // wait until all chunks stored err = wait(ctx) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("wait store: %v", err) } // collect the hashes hashes[i] = hash @@ -783,10 +725,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b break Loop } } - if err != nil { - b.Fatal(err) - } - return nil + return err }) if result.Error != nil { b.Fatal(result.Error) |