diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 572 |
1 files changed, 253 insertions, 319 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index f3da893a2..ae007e5b0 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -22,18 +22,19 @@ import ( crand "crypto/rand" "fmt" "io" + "os" "sync" "testing" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -308,159 +309,164 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) { - defaultSkipCheck = skipCheck - toAddr = network.NewAddrFromNodeID - createStoreFunc = createTestLocalStorageFromSim - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - t.Fatal(err.Error()) - } - stores = make(map[discover.NodeID]storage.ChunkStore) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } - registries = make(map[discover.NodeID]*TestRegistry) - deliveries = make(map[discover.NodeID]*Delivery) - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) - // here we distribute chunks of a random file into Stores of nodes 1 to nodes - rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) - size := chunkCount * chunkSize - ctx := context.TODO() - fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - // wait until all chunks stored - if err != nil { - t.Fatal(err.Error()) - } - err = wait(ctx) + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + bucket.Store(bucketKeyRegistry, r) + + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) + } + netStore := storage.NewNetStore(localStore, retrieveFunc) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } - errc := make(chan error, 1) - waitPeerErrC = make(chan error) - quitC := make(chan struct{}) - defer close(quitC) - - action := func(ctx context.Context) error { - // each node Subscribes to each other's swarmChunkServerStreamName - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // using a global err channel to share betweem action and node service + + log.Info("Starting simulation") + ctx := context.Background() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + sim.SetPivotNode(nodeIDs[0]) + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) i := 0 - for err := range waitPeerErrC { - if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == *sim.PivotNodeID() { + continue } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) i++ - if i == nodes { - break - } + } + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) + if err != nil { + return err } - // each node subscribes to the upstream swarm chunk server stream - // which responds to chunk retrieve requests all but the last node in the chain does not - for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - err := sim.CallClient(id, func(client *rpc.Client) error { - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - sid := sim.IDs[j+1] - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) - }) + //each of the nodes (except pivot node) subscribes to the stream of the next node + for j, node := range nodeIDs[0 : nodes-1] { + sid := nodeIDs[j+1] + item, ok := sim.NodeItem(node, bucketKeyRegistry) + if !ok { + return fmt.Errorf("No registry") + } + registry := item.(*Registry) + err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) if err != nil { return err } } - // create a retriever FileStore for the pivot node - delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + //get the pivot node's filestore + item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") go func() { // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks // we must wait for the peer connections to have started before requesting - n, err := readAll(fileStore, fileHash) + n, err := readAll(pivotFileStore, fileHash) log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) if err != nil { - errc <- fmt.Errorf("requesting chunks action error: %v", err) + t.Fatalf("requesting chunks action error: %v", err) } }() - return nil - } - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: + + log.Debug("Waiting for kademlia") + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err } + + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } + } + }() + + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true var total int64 - err := sim.CallClient(id, func(client *rpc.Client) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash)) - }) + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) if err != nil || total != int64(size) { - return false, nil + success = false } - return true, nil - } - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]), - // we are only testing the pivot node (net.Nodes[0]) - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, - }, - } - startedAt := time.Now() - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) - } + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + log.Debug("Test terminated successfully") + return nil + }) if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) + t.Fatal(result.Error) } - streamTesting.CheckResult(t, result, startedAt, finishedAt) } func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { @@ -490,218 +496,146 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { } func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { - defaultSkipCheck = skipCheck - toAddr = network.NewAddrFromNodeID - createStoreFunc = createTestLocalStorageFromSim - registries = make(map[discover.NodeID]*TestRegistry) - - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - b.Fatal(err.Error()) - } - - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } - // wait channel for all nodes all peer connections to set up - waitPeerErrC = make(chan error) + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - // create a FileStore for the last node in the chain which we are gonna write to - remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams()) - - // channel to signal simulation initialisation with action call complete - // or node disconnections - disconnectC := make(chan error) - quitC := make(chan struct{}) - - initC := make(chan error) - - action := func(ctx context.Context) error { - // each node Subscribes to each other's swarmChunkServerStreamName - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // waitPeerErrC using a global err channel to share betweem action and node service - i := 0 - for err := range waitPeerErrC { + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + return nil, nil, err } - i++ - if i == nodes { - break + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() } - } - var err error - // each node except the last one subscribes to the upstream swarm chunk server stream - // which responds to chunk retrieve requests - for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - err = sim.CallClient(id, func(client *rpc.Client) error { - doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - sid := sim.IDs[j+1] // the upstream peer's id - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + DoSync: true, + SyncUpdateDelay: 0, }) - if err != nil { - break + + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } - } - initC <- err - return nil - } + netStore := storage.NewNetStore(localStore, retrieveFunc) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) - // the check function is only triggered when the benchmark finishes - trigger := make(chan discover.NodeID) - check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) { - return true, nil - } + return r, cleanup, nil - conf.Step = &simulations.Step{ - Action: action, - Trigger: trigger, - // we are only testing the pivot node (net.Nodes[0]) - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, }, - } - - // run the simulation in the background - errc := make(chan error) - go func() { - _, err := sim.Run(ctx, conf) - close(quitC) - errc <- err - }() + }) + defer sim.Close() - // wait for simulation action to complete stream subscriptions - err = <-initC + log.Info("Initializing test config") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - b.Fatalf("simulation failed to initialise. expected no error. got %v", err) + b.Fatal(err) } - // create a retriever FileStore for the pivot node - // by now deliveries are set for each node by the streamer service - delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) - - // benchmark loop - b.ResetTimer() - b.StopTimer() -Loop: - for i := 0; i < b.N; i++ { - // uploading chunkCount random chunks to the last node - hashes := make([]storage.Address, chunkCount) - for i := 0; i < chunkCount; i++ { - // create actual size real chunks - ctx := context.TODO() - hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) - if err != nil { - b.Fatalf("expected no error. got %v", err) - } - // wait until all chunks stored - err = wait(ctx) - if err != nil { - b.Fatalf("expected no error. got %v", err) - } - // collect the hashes - hashes[i] = hash + ctx := context.Background() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + node := nodeIDs[len(nodeIDs)-1] + + item, ok := sim.NodeItem(node, bucketKeyFileStore) + if !ok { + b.Fatal("No filestore") } - // now benchmark the actual retrieval - // netstore.Get is called for each hash in a go routine and errors are collected - b.StartTimer() - errs := make(chan error) - for _, hash := range hashes { - go func(h storage.Address) { - _, err := netStore.Get(ctx, h) - log.Warn("test check netstore get", "hash", h, "err", err) - errs <- err - }(hash) + remoteFileStore := item.(*storage.FileStore) + + pivotNode := nodeIDs[0] + item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) + if !ok { + b.Fatal("No filestore") } - // count and report retrieval errors - // if there are misses then chunk timeout is too low for the distance and volume (?) - var total, misses int - for err := range errs { - if err != nil { - log.Warn(err.Error()) - misses++ - } - total++ - if total == chunkCount { - break - } + netStore := item.(*storage.NetStore) + + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + b.Fatal(d.Error) + } + } + }() + // benchmark loop + b.ResetTimer() b.StopTimer() + Loop: + for i := 0; i < b.N; i++ { + // uploading chunkCount random chunks to the last node + hashes := make([]storage.Address, chunkCount) + for i := 0; i < chunkCount; i++ { + // create actual size real chunks + ctx := context.TODO() + hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) + if err != nil { + b.Fatalf("expected no error. got %v", err) + } + // wait until all chunks stored + err = wait(ctx) + if err != nil { + b.Fatalf("expected no error. got %v", err) + } + // collect the hashes + hashes[i] = hash + } + // now benchmark the actual retrieval + // netstore.Get is called for each hash in a go routine and errors are collected + b.StartTimer() + errs := make(chan error) + for _, hash := range hashes { + go func(h storage.Address) { + _, err := netStore.Get(ctx, h) + log.Warn("test check netstore get", "hash", h, "err", err) + errs <- err + }(hash) + } + // count and report retrieval errors + // if there are misses then chunk timeout is too low for the distance and volume (?) + var total, misses int + for err := range errs { + if err != nil { + log.Warn(err.Error()) + misses++ + } + total++ + if total == chunkCount { + break + } + } + b.StopTimer() - select { - case err = <-disconnectC: - if err != nil { + if misses > 0 { + err = fmt.Errorf("%v chunk not found out of %v", misses, total) break Loop } - default: - } - - if misses > 0 { - err = fmt.Errorf("%v chunk not found out of %v", misses, total) - break Loop } - } - - select { - case <-quitC: - case trigger <- sim.IDs[0]: - } - if err == nil { - err = <-errc - } else { - if e := <-errc; e != nil { - b.Errorf("sim.Run function error: %v", e) + if err != nil { + b.Fatal(err) } + return nil + }) + if result.Error != nil { + b.Fatal(result.Error) } - // benchmark over, trigger the check function to conclude the simulation - if err != nil { - b.Fatalf("expected no error. got %v", err) - } -} - -func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) { - return stores[id], nil } |