aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r--swarm/network/stream/delivery_test.go593
1 files changed, 0 insertions, 593 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
deleted file mode 100644
index fc0f9d5df..000000000
--- a/swarm/network/stream/delivery_test.go
+++ /dev/null
@@ -1,593 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package stream
-
-import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "sync"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/protocols"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
- "github.com/ethereum/go-ethereum/swarm/chunk"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
- pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
- "github.com/ethereum/go-ethereum/swarm/network/simulation"
- "github.com/ethereum/go-ethereum/swarm/state"
- "github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/ethereum/go-ethereum/swarm/testutil"
-)
-
-//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
-//Should time out as the peer does not have the chunk (no syncing happened previously)
-func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
- tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{
- Syncing: SyncingDisabled, //do no syncing
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- node := tester.Nodes[0]
-
- chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
-
- //test the exchange
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "RetrieveRequestMsg",
- Triggers: []p2ptest.Trigger{
- { //then the actual RETRIEVE_REQUEST....
- Code: 5,
- Msg: &RetrieveRequestMsg{
- Addr: chunk.Address()[:],
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- { //to which the peer responds with offered hashes
- Code: 1,
- Msg: &OfferedHashesMsg{
- HandoverProof: nil,
- Hashes: nil,
- From: 0,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- })
-
- //should fail with a timeout as the peer we are requesting
- //the chunk from does not have the chunk
- expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
- if err == nil || err.Error() != expectedError {
- t.Fatalf("Expected error %v, got %v", expectedError, err)
- }
-}
-
-// upstream request server receives a retrieve Request and responds with
-// offered hashes or delivery if skipHash is set to true
-func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
- tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{
- Syncing: SyncingDisabled,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- node := tester.Nodes[0]
-
- hash := storage.Address(hash1[:])
- ch := storage.NewChunk(hash, hash1[:])
- _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
- if err != nil {
- t.Fatalf("Expected no err got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "RetrieveRequestMsg",
- Triggers: []p2ptest.Trigger{
- {
- Code: 5,
- Msg: &RetrieveRequestMsg{
- Addr: hash,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 6,
- Msg: &ChunkDeliveryMsg{
- Addr: ch.Address(),
- SData: ch.Data(),
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-// if there is one peer in the Kademlia, RequestFromPeers should return it
-func TestRequestFromPeers(t *testing.T) {
- dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
-
- addr := network.RandomAddr()
- to := network.NewKademlia(addr.OAddr, network.NewKadParams())
- delivery := NewDelivery(to, nil)
- protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
- peer := network.NewPeer(&network.BzzPeer{
- BzzAddr: network.RandomAddr(),
- LightNode: false,
- Peer: protocolsPeer,
- }, to)
- to.On(peer)
- r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
-
- // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
- sp := &Peer{
- BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr},
- pq: pq.New(int(PriorityQueue), PriorityQueueCap),
- streamer: r,
- }
- r.setPeer(sp)
- req := network.NewRequest(
- storage.Address(hash0[:]),
- true,
- &sync.Map{},
- )
- ctx := context.Background()
- id, _, err := delivery.RequestFromPeers(ctx, req)
-
- if err != nil {
- t.Fatal(err)
- }
- if *id != dummyPeerID {
- t.Fatalf("Expected an id, got %v", id)
- }
-}
-
-// RequestFromPeers should not return light nodes
-func TestRequestFromPeersWithLightNode(t *testing.T) {
- dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
-
- addr := network.RandomAddr()
- to := network.NewKademlia(addr.OAddr, network.NewKadParams())
- delivery := NewDelivery(to, nil)
-
- protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
- // setting up a lightnode
- peer := network.NewPeer(&network.BzzPeer{
- BzzAddr: network.RandomAddr(),
- LightNode: true,
- Peer: protocolsPeer,
- }, to)
- to.On(peer)
- r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
- // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
- sp := &Peer{
- BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr},
- pq: pq.New(int(PriorityQueue), PriorityQueueCap),
- streamer: r,
- }
- r.setPeer(sp)
-
- req := network.NewRequest(
- storage.Address(hash0[:]),
- true,
- &sync.Map{},
- )
-
- ctx := context.Background()
- // making a request which should return with "no peer found"
- _, _, err := delivery.RequestFromPeers(ctx, req)
-
- expectedError := "no peer found"
- if err.Error() != expectedError {
- t.Fatalf("expected '%v', got %v", expectedError, err)
- }
-}
-
-func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
- Syncing: SyncingDisabled,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
- return &testClient{
- t: t,
- }, nil
- })
-
- node := tester.Nodes[0]
-
- //subscribe to custom stream
- stream := NewStream("foo", "", true)
- err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- chunkKey := hash0[:]
- chunkData := hash1[:]
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Expects: []p2ptest.Expect{
- { //first expect subscription to the custom stream...
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- },
- p2ptest.Exchange{
- Label: "ChunkDelivery message",
- Triggers: []p2ptest.Trigger{
- { //...then trigger a chunk delivery for the given chunk from peer in order for
- //local node to get the chunk delivered
- Code: 6,
- Msg: &ChunkDeliveryMsg{
- Addr: chunkKey,
- SData: chunkData,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
-
- // wait for the chunk to get stored
- storedChunk, err := localStore.Get(ctx, chunk.ModeGetRequest, chunkKey)
- for err != nil {
- select {
- case <-ctx.Done():
- t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
- default:
- }
- storedChunk, err = localStore.Get(ctx, chunk.ModeGetRequest, chunkKey)
- time.Sleep(50 * time.Millisecond)
- }
-
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- if !bytes.Equal(storedChunk.Data(), chunkData) {
- t.Fatal("Retrieved chunk has different data than original")
- }
-
-}
-
-func TestDeliveryFromNodes(t *testing.T) {
- testDeliveryFromNodes(t, 2, dataChunkCount, true)
- testDeliveryFromNodes(t, 2, dataChunkCount, false)
- testDeliveryFromNodes(t, 4, dataChunkCount, true)
- testDeliveryFromNodes(t, 4, dataChunkCount, false)
-
- if testutil.RaceEnabled {
- // Travis cannot handle more nodes with -race; would time out.
- return
- }
-
- testDeliveryFromNodes(t, 8, dataChunkCount, true)
- testDeliveryFromNodes(t, 8, dataChunkCount, false)
- testDeliveryFromNodes(t, 16, dataChunkCount, true)
- testDeliveryFromNodes(t, 16, dataChunkCount, false)
-}
-
-func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
- t.Helper()
- t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
- if err != nil {
- return nil, nil, err
- }
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- Syncing: SyncingDisabled,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
-
- cleanup = func() {
- r.Close()
- clean()
- }
-
- return r, cleanup, nil
- },
- })
- defer sim.Close()
-
- log.Info("Adding nodes to simulation")
- _, err := sim.AddNodesAndConnectChain(nodes)
- if err != nil {
- t.Fatal(err)
- }
-
- log.Info("Starting simulation")
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
- nodeIDs := sim.UpNodeIDs()
- //determine the pivot node to be the first node of the simulation
- pivot := nodeIDs[0]
-
- //distribute chunks of a random file into Stores of nodes 1 to nodes
- //we will do this by creating a file store with an underlying round-robin store:
- //the file store will create a hash for the uploaded file, but every chunk will be
- //distributed to different nodes via round-robin scheduling
- log.Debug("Writing file to round-robin file store")
- //to do this, we create an array for chunkstores (length minus one, the pivot node)
- stores := make([]storage.ChunkStore, len(nodeIDs)-1)
- //we then need to get all stores from the sim....
- lStores := sim.NodesItems(bucketKeyStore)
- i := 0
- //...iterate the buckets...
- for id, bucketVal := range lStores {
- //...and remove the one which is the pivot node
- if id == pivot {
- continue
- }
- //the other ones are added to the array...
- stores[i] = bucketVal.(storage.ChunkStore)
- i++
- }
- //...which then gets passed to the round-robin file store
- roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags())
- //now we can actually upload a (random) file to the round-robin store
- size := chunkCount * chunkSize
- log.Debug("Storing data to file store")
- fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
- // wait until all chunks stored
- if err != nil {
- return err
- }
- err = wait(ctx)
- if err != nil {
- return err
- }
-
- //get the pivot node's filestore
- item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- pivotFileStore := item.(*storage.FileStore)
- log.Debug("Starting retrieval routine")
- retErrC := make(chan error)
- go func() {
- // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
- // we must wait for the peer connections to have started before requesting
- n, err := readAll(pivotFileStore, fileHash)
- log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
- retErrC <- err
- }()
-
- disconnected := watchDisconnections(ctx, sim)
- defer func() {
- if err != nil && disconnected.bool() {
- err = errors.New("disconnect events received")
- }
- }()
-
- //finally check that the pivot node gets all chunks via the root hash
- log.Debug("Check retrieval")
- success := true
- var total int64
- total, err = readAll(pivotFileStore, fileHash)
- if err != nil {
- return err
- }
- log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
- if err != nil || total != int64(size) {
- success = false
- }
-
- if !success {
- return fmt.Errorf("Test failed, chunks not available on all nodes")
- }
- if err := <-retErrC; err != nil {
- return fmt.Errorf("requesting chunks: %v", err)
- }
- log.Debug("Test terminated successfully")
- return nil
- })
- if result.Error != nil {
- t.Fatal(result.Error)
- }
- })
-}
-
-func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
- for chunks := 32; chunks <= 128; chunks *= 2 {
- for i := 2; i < 32; i *= 2 {
- b.Run(
- fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
- func(b *testing.B) {
- benchmarkDeliveryFromNodes(b, i, chunks, true)
- },
- )
- }
- }
-}
-
-func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
- for chunks := 32; chunks <= 128; chunks *= 2 {
- for i := 2; i < 32; i *= 2 {
- b.Run(
- fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
- func(b *testing.B) {
- benchmarkDeliveryFromNodes(b, i, chunks, false)
- },
- )
- }
- }
-}
-
-func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
- if err != nil {
- return nil, nil, err
- }
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- Syncing: SyncingDisabled,
- SyncUpdateDelay: 0,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
-
- cleanup = func() {
- r.Close()
- clean()
- }
-
- return r, cleanup, nil
- },
- })
- defer sim.Close()
-
- log.Info("Initializing test config")
- _, err := sim.AddNodesAndConnectChain(nodes)
- if err != nil {
- b.Fatal(err)
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
- nodeIDs := sim.UpNodeIDs()
- node := nodeIDs[len(nodeIDs)-1]
-
- item, ok := sim.NodeItem(node, bucketKeyFileStore)
- if !ok {
- return errors.New("No filestore")
- }
- remoteFileStore := item.(*storage.FileStore)
-
- pivotNode := nodeIDs[0]
- item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
- if !ok {
- return errors.New("No filestore")
- }
- netStore := item.(*storage.NetStore)
-
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
-
- disconnected := watchDisconnections(ctx, sim)
- defer func() {
- if err != nil && disconnected.bool() {
- err = errors.New("disconnect events received")
- }
- }()
- // benchmark loop
- b.ResetTimer()
- b.StopTimer()
- Loop:
- for i := 0; i < b.N; i++ {
- // uploading chunkCount random chunks to the last node
- hashes := make([]storage.Address, chunkCount)
- for i := 0; i < chunkCount; i++ {
- // create actual size real chunks
- ctx := context.TODO()
- hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
- if err != nil {
- return fmt.Errorf("store: %v", err)
- }
- // wait until all chunks stored
- err = wait(ctx)
- if err != nil {
- return fmt.Errorf("wait store: %v", err)
- }
- // collect the hashes
- hashes[i] = hash
- }
- // now benchmark the actual retrieval
- // netstore.Get is called for each hash in a go routine and errors are collected
- b.StartTimer()
- errs := make(chan error)
- for _, hash := range hashes {
- go func(h storage.Address) {
- _, err := netStore.Get(ctx, chunk.ModeGetRequest, h)
- log.Warn("test check netstore get", "hash", h, "err", err)
- errs <- err
- }(hash)
- }
- // count and report retrieval errors
- // if there are misses then chunk timeout is too low for the distance and volume (?)
- var total, misses int
- for err := range errs {
- if err != nil {
- log.Warn(err.Error())
- misses++
- }
- total++
- if total == chunkCount {
- break
- }
- }
- b.StopTimer()
-
- if misses > 0 {
- err = fmt.Errorf("%v chunk not found out of %v", misses, total)
- break Loop
- }
- }
- return err
- })
- if result.Error != nil {
- b.Fatal(result.Error)
- }
-
-}