diff options
Diffstat (limited to 'swarm/network/stream')
-rw-r--r-- | swarm/network/stream/common_test.go | 149 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 9 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 337 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 58 | ||||
-rw-r--r-- | swarm/network/stream/lightnode_test.go | 16 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/norace_test.go | 24 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 26 | ||||
-rw-r--r-- | swarm/network/stream/race_test.go | 23 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 69 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 107 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 40 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 295 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 161 | ||||
-rw-r--r-- | swarm/network/stream/testing/snapshot_4.json | 1 | ||||
-rw-r--r-- | swarm/network/stream/visualized_snapshot_sync_sim_test.go | 62 |
16 files changed, 823 insertions, 556 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 29b917d39..afd08d275 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -26,17 +26,19 @@ import ( "math/rand" "os" "strings" + "sync" "sync/atomic" - "testing" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" ) @@ -67,7 +69,81 @@ func init() { log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } -func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { +// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations +func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { + addr := network.NewAddr(ctx.Config.Node()) + + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + return addr, netStore, delivery, cleanup, nil +} + +// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr +func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) { + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + return netStore, delivery, cleanup, nil +} + +// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc +func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { + addr := network.NewAddr(ctx.Config.Node()) + + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New + + return addr, netStore, delivery, cleanup, nil +} + +func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) { + n := ctx.Config.Node() + + store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + if *useMockStore { + store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr) + } + if err != nil { + return nil, nil, nil, err + } + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, err + } + + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + + bucket.Store(bucketKeyStore, store) + bucket.Store(bucketKeyDB, netStore) + bucket.Store(bucketKeyDelivery, delivery) + bucket.Store(bucketKeyFileStore, fileStore) + + cleanup := func() { + netStore.Close() + os.RemoveAll(datadir) + } + + return netStore, delivery, cleanup, nil +} + +func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) @@ -75,7 +151,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest // temp datadir datadir, err := ioutil.TempDir("", "streamer") if err != nil { - return nil, nil, nil, func() {}, err + return nil, nil, nil, nil, err } removeDataDir := func() { os.RemoveAll(datadir) @@ -87,12 +163,14 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest localStore, err := storage.NewTestLocalStoreForAddr(params) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } netStore, err := storage.NewNetStore(localStore, nil) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } delivery := NewDelivery(to, netStore) @@ -102,10 +180,11 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest streamer.Close() removeDataDir() } - protocolTester := p2ptest.NewProtocolTester(t, addr.ID(), 1, streamer.runProtocol) + protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol) - err = waitForPeers(streamer, 1*time.Second, 1) + err = waitForPeers(streamer, 10*time.Second, 1) if err != nil { + teardown() return nil, nil, nil, nil, errors.New("timeout: peer is not created") } @@ -138,6 +217,11 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } +// not used in this context, only to fulfill ChunkStore interface +func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool { + panic("RoundRobinStor doesn't support HasChunk") +} + func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } @@ -236,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch } return store, datadir, nil } + +// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value +// disconnected to true in case of a disconnect event. +func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) { + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + ctx, + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Drop(), + ) + disconnected = new(boolean) + go func() { + for { + select { + case <-ctx.Done(): + return + case d := <-disconnections: + if d.Error != nil { + log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error) + } else { + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) + } + disconnected.set(true) + } + } + }() + return disconnected +} + +// boolean is used to concurrently set +// and read a boolean value. +type boolean struct { + v bool + mu sync.RWMutex +} + +// set sets the value. +func (b *boolean) set(v bool) { + b.mu.Lock() + defer b.mu.Unlock() + + b.v = v +} + +// bool reads the value. +func (b *boolean) bool() bool { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.v +} diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index e1a13fe8d..fae6994f0 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * ctx, osp = spancontext.StartSpan( ctx, "retrieve.request") - defer osp.Finish() s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) if err != nil { @@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * }() go func() { + defer osp.Finish() chunk, err := d.chunkStore.Get(ctx, req.Addr) if err != nil { retrieveChunkFail.Inc(1) @@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch ctx, osp = spancontext.StartSpan( ctx, "chunk.delivery") - defer osp.Finish() processReceivedChunksCount.Inc(1) go func() { + defer osp.Finish() + req.peer = sp err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) if err != nil { @@ -255,8 +256,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( return true } sp = d.getPeer(id) + // sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol if sp == nil { - //log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) return true } spID = &id @@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( Addr: req.Addr, SkipCheck: req.SkipCheck, HopCount: req.HopCount, - }, Top) + }, Top, "request.from.peers") if err != nil { return nil, nil, err } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 70d3829b3..49e4a423a 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -21,9 +21,7 @@ import ( "context" "errors" "fmt" - "os" "sync" - "sync/atomic" "testing" "time" @@ -48,11 +46,11 @@ func TestStreamerRetrieveRequest(t *testing.T) { Retrieval: RetrievalClientOnly, Syncing: SyncingDisabled, } - tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(regOpts) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -97,14 +95,14 @@ func TestStreamerRetrieveRequest(t *testing.T) { //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, //do no syncing }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -169,14 +167,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -359,14 +357,14 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { } func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { return &testClient{ @@ -455,164 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, - }, nil) - bucket.Store(bucketKeyRegistry, r) - - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + t.Helper() + t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } - return r, cleanup, nil + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, + }, nil) + bucket.Store(bucketKeyRegistry, r) - }, - }) - defer sim.Close() + cleanup = func() { + r.Close() + clean() + } - log.Info("Adding nodes to simulation") - _, err := sim.AddNodesAndConnectChain(nodes) - if err != nil { - t.Fatal(err) - } + return r, cleanup, nil + }, + }) + defer sim.Close() - log.Info("Starting simulation") - ctx := context.Background() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - //determine the pivot node to be the first node of the simulation - pivot := nodeIDs[0] - - //distribute chunks of a random file into Stores of nodes 1 to nodes - //we will do this by creating a file store with an underlying round-robin store: - //the file store will create a hash for the uploaded file, but every chunk will be - //distributed to different nodes via round-robin scheduling - log.Debug("Writing file to round-robin file store") - //to do this, we create an array for chunkstores (length minus one, the pivot node) - stores := make([]storage.ChunkStore, len(nodeIDs)-1) - //we then need to get all stores from the sim.... - lStores := sim.NodesItems(bucketKeyStore) - i := 0 - //...iterate the buckets... - for id, bucketVal := range lStores { - //...and remove the one which is the pivot node - if id == pivot { - continue - } - //the other ones are added to the array... - stores[i] = bucketVal.(storage.ChunkStore) - i++ - } - //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) - //now we can actually upload a (random) file to the round-robin store - size := chunkCount * chunkSize - log.Debug("Storing data to file store") - fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) - // wait until all chunks stored - if err != nil { - return err - } - err = wait(ctx) + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - return err + t.Fatal(err) } - log.Debug("Waiting for kademlia") - // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } - - //get the pivot node's filestore - item, ok := sim.NodeItem(pivot, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No filestore") - } - pivotFileStore := item.(*storage.FileStore) - log.Debug("Starting retrieval routine") - retErrC := make(chan error) - go func() { - // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks - // we must wait for the peer connections to have started before requesting - n, err := readAll(pivotFileStore, fileHash) - log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) - retErrC <- err - }() - - log.Debug("Watching for disconnections") - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) + log.Info("Starting simulation") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + pivot := nodeIDs[0] + + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) + i := 0 + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == pivot { + continue } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) + i++ } - }() - defer func() { + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { + return err + } + + log.Debug("Waiting for kademlia") + // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias + if _, err := sim.WaitTillHealthy(ctx); err != nil { + return err + } + + //get the pivot node's filestore + item, ok := sim.NodeItem(pivot, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") + retErrC := make(chan error) + go func() { + // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks + // we must wait for the peer connections to have started before requesting + n, err := readAll(pivotFileStore, fileHash) + log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) + retErrC <- err + }() + + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { err = errors.New("disconnect events received") } - } - }() + }() - //finally check that the pivot node gets all chunks via the root hash - log.Debug("Check retrieval") - success := true - var total int64 - total, err = readAll(pivotFileStore, fileHash) - if err != nil { - return err - } - log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) - if err != nil || total != int64(size) { - success = false - } + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true + var total int64 + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } + log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) + if err != nil || total != int64(size) { + success = false + } - if !success { - return fmt.Errorf("Test failed, chunks not available on all nodes") - } - if err := <-retErrC; err != nil { - t.Fatalf("requesting chunks: %v", err) + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + if err := <-retErrC; err != nil { + return fmt.Errorf("requesting chunks: %v", err) + } + log.Debug("Test terminated successfully") + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) } - log.Debug("Test terminated successfully") - return nil }) - if result.Error != nil { - t.Fatal(result.Error) - } } func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { @@ -644,25 +614,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, @@ -670,12 +625,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) + bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -686,21 +643,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b b.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { nodeIDs := sim.UpNodeIDs() node := nodeIDs[len(nodeIDs)-1] item, ok := sim.NodeItem(node, bucketKeyFileStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } remoteFileStore := item.(*storage.FileStore) pivotNode := nodeIDs[0] item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } netStore := item.(*storage.NetStore) @@ -708,26 +666,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b return err } - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() // benchmark loop @@ -742,12 +684,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b ctx := context.TODO() hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("store: %v", err) } // wait until all chunks stored err = wait(ctx) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("wait store: %v", err) } // collect the hashes hashes[i] = hash @@ -783,10 +725,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b break Loop } } - if err != nil { - b.Fatal(err) - } - return nil + return err }) if result.Error != nil { b.Fatal(result.Error) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 8f2bed9d6..009a941ef 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -21,9 +21,7 @@ import ( "encoding/binary" "errors" "fmt" - "os" "sync" - "sync/atomic" "testing" "time" @@ -31,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -62,26 +59,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { externalStreamMaxKeys := uint64(100) sim := simulation.New(map[string]simulation.ServiceFunc{ - "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, @@ -97,11 +79,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil }) - fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup := func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -134,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { _, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) if err != nil { - log.Error("Store error: %v", "err", err) - t.Fatal(err) + return fmt.Errorf("store: %v", err) } err = wait(ctx) if err != nil { - log.Error("Wait error: %v", "err", err) - t.Fatal(err) + return fmt.Errorf("wait store: %v", err) } item, ok = sim.NodeItem(checker, bucketKeyRegistry) @@ -152,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { liveErrC := make(chan error) historyErrC := make(chan error) - log.Debug("Watching for disconnections") - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) if err != nil { return err } - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 65cde2411..501660fab 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -28,11 +28,11 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { Retrieval: RetrievalClientOnly, Syncing: SyncingDisabled, } - tester, _, _, teardown, err := newStreamerTester(t, registryOptions) - defer teardown() + tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -67,11 +67,11 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, } - tester, _, _, teardown, err := newStreamerTester(t, registryOptions) - defer teardown() + tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -111,11 +111,11 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, } - tester, _, _, teardown, err := newStreamerTester(t, registryOptions) - defer teardown() + tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -156,11 +156,11 @@ func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, } - tester, _, _, teardown, err := newStreamerTester(t, registryOptions) - defer teardown() + tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..de4e8a3bb 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(ctx, msg, c.priority) + err := p.SendPriority(ctx, msg, c.priority, "") if err != nil { log.Warn("SendPriority error", "err", err) } diff --git a/swarm/network/stream/norace_test.go b/swarm/network/stream/norace_test.go new file mode 100644 index 000000000..b324f6939 --- /dev/null +++ b/swarm/network/stream/norace_test.go @@ -0,0 +1,24 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build !race + +package stream + +// Provide a flag to reduce the scope of tests when running them +// with race detector. Some of the tests are doing a lot of allocations +// on the heap, and race detector uses much more memory to track them. +const raceTest = false diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 4bccf56f5..68da8f44a 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -65,6 +65,7 @@ type Peer struct { // on creating a new client in offered hashes handler. clientParams map[Stream]*clientParams quit chan struct{} + spans sync.Map } type WrappedPriorityMsg struct { @@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { clients: make(map[Stream]*client), clientParams: make(map[Stream]*clientParams), quit: make(chan struct{}), + spans: sync.Map{}, } ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) + defer p.spans.Delete(wmsg.Context) + sp, ok := p.spans.Load(wmsg.Context) + if ok { + defer sp.(opentracing.Span).Finish() + } err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) @@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { // Deliver sends a storeRequestMsg protocol message to the peer // Depending on the `syncing` parameter we send different message types func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { - var sp opentracing.Span var msg interface{} spanName := "send.chunk.delivery" @@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, } spanName += ".retrieval" } - ctx, sp = spancontext.StartSpan( - ctx, - spanName) - defer sp.Finish() - return p.SendPriority(ctx, msg, priority) + return p.SendPriority(ctx, msg, priority, spanName) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) + if traceId != "" { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + traceId, + ) + p.spans.Store(ctx, sp) + } wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, @@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(ctx, msg, s.priority) + return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes") } func (p *Peer) getServer(s Stream) (*server, error) { diff --git a/swarm/network/stream/race_test.go b/swarm/network/stream/race_test.go new file mode 100644 index 000000000..8aed3542b --- /dev/null +++ b/swarm/network/stream/race_test.go @@ -0,0 +1,23 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build race + +package stream + +// Reduce the scope of some tests when running with race detector, +// as it raises the memory consumption significantly. +const raceTest = true diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index d345ac8d0..afb023ae2 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -18,7 +18,6 @@ package stream import ( "context" "fmt" - "os" "sync" "testing" "time" @@ -27,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -76,7 +74,7 @@ func TestRetrieval(t *testing.T) { //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { - err := runRetrievalTest(*chunks, *nodes) + err := runRetrievalTest(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -95,53 +93,37 @@ func TestRetrieval(t *testing.T) { } for _, n := range nodeCnt { for _, c := range chnkCnt { - err := runRetrievalTest(c, n) - if err != nil { - t.Fatal(err) - } + t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) { + err := runRetrievalTest(t, c, n) + if err != nil { + t.Fatal(err) + } + }) } } } } var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ - "streamer": retrievalStreamerFunc, -} - -func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingAutoSubscribe, - SyncUpdateDelay: 3 * time.Second, - }, nil) + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 3 * time.Second, + }, nil) - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + cleanup = func() { + r.Close() + clean() + } - return r, cleanup, nil + return r, cleanup, nil + }, } /* @@ -171,7 +153,7 @@ func runFileRetrievalTest(nodeCount int) error { return err } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) defer cancelSimRun() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { @@ -245,7 +227,8 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ -func runRetrievalTest(chunkCount int, nodeCount int) error { +func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { + t.Helper() sim := simulation.New(retrievalSimServiceMap) defer sim.Close() diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 6af19c12a..b45d0aed5 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -17,11 +17,12 @@ package stream import ( "context" + "errors" "fmt" + "io/ioutil" "os" "runtime" "sync" - "sync/atomic" "testing" "time" @@ -92,6 +93,15 @@ func TestSyncingViaGlobalSync(t *testing.T) { if *longrunning { chnkCnt = []int{1, 8, 32, 256, 1024} nodeCnt = []int{16, 32, 64, 128, 256} + } else if raceTest { + // TestSyncingViaGlobalSync allocates a lot of memory + // with race detector. By reducing the number of chunks + // and nodes, memory consumption is lower and data races + // are still checked, while correctness of syncing is + // tested with more chunks and nodes in regular (!race) + // tests. + chnkCnt = []int{4} + nodeCnt = []int{16} } else { //default test chnkCnt = []int{4, 32} @@ -107,42 +117,43 @@ func TestSyncingViaGlobalSync(t *testing.T) { } var simServiceMap = map[string]simulation.ServiceFunc{ - "streamer": streamerFunc, -} + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) + if err != nil { + return nil, nil, err + } -func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, - SyncUpdateDelay: 3 * time.Second, - }, nil) - - bucket.Store(bucketKeyRegistry, r) - - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + var dir string + var store *state.DBStore + if raceTest { + // Use on-disk DBStore to reduce memory consumption in race tests. + dir, err = ioutil.TempDir("", "swarm-stream-") + if err != nil { + return nil, nil, err + } + store, err = state.NewDBStore(dir) + if err != nil { + return nil, nil, err + } + } else { + store = state.NewInmemoryStore() + } + + r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 3 * time.Second, + }, nil) - return r, cleanup, nil + bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + r.Close() + clean() + } + + return r, cleanup, nil + }, } func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { @@ -171,36 +182,24 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() - result := runSim(conf, ctx, sim, chunkCount) if result.Error != nil { t.Fatal(result.Error) } - if yes, ok := disconnected.Load().(bool); ok && yes { - t.Fatal("disconnect events received") - } log.Info("Simulation ended") } func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result { - return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") + } + }() + nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { //get the kademlia overlay address from this ID diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index fb571c856..cb5912185 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.SendPriority(context.TODO(), msg, priority, "") } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod // nil as base takes the node's base; we need to pass 255 as `EachConn` runs // from deepest bins backwards kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { + // nodes that do not provide stream protocol + // should not be subscribed, e.g. bootnodes + if !p.HasCap("stream") { + return true + } //if the peer's bin is shallower than the kademlia depth, //only the peer's bin should be subscribed if po < kadDepth { @@ -725,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + + if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -929,3 +935,33 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { return api.streamer.Unsubscribe(peerId, s) } + +/* +GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +It can be called via RPC. +It returns a map of node IDs with an array of string representations of Stream objects. +*/ +func (api *API) GetPeerSubscriptions() map[string][]string { + //create the empty map + pstreams := make(map[string][]string) + + //iterate all streamer peers + api.streamer.peersMu.RLock() + defer api.streamer.peersMu.RUnlock() + + for id, p := range api.streamer.peers { + var streams []string + //every peer has a map of stream servers + //every stream server represents a subscription + p.serverMu.RLock() + for s := range p.servers { + //append the string representation of the stream + //to the list for this peer + streams = append(streams, s.String()) + } + p.serverMu.RUnlock() + //set the array of stream servers to the map + pstreams[id.String()] = streams + } + return pstreams +} diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index cdaeb92d0..e92ee3783 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -22,23 +22,29 @@ import ( "errors" "fmt" "strconv" + "strings" + "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "golang.org/x/crypto/sha3" ) func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", true) err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) @@ -48,11 +54,11 @@ func TestStreamerSubscribe(t *testing.T) { } func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", false) err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) @@ -139,11 +145,11 @@ func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { return newTestClient(t), nil @@ -232,11 +238,11 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", false) @@ -299,11 +305,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", true) @@ -365,11 +371,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t, 0), nil @@ -409,11 +415,11 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", true) @@ -472,11 +478,11 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", true) @@ -537,11 +543,11 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() stream := NewStream("foo", "", true) @@ -636,11 +642,11 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) - defer teardown() + tester, streamer, _, teardown, err := newStreamerTester(nil) if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t, 10), nil @@ -769,15 +775,15 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { // leaving place for new streams. func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 - tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t, 0), nil @@ -845,13 +851,13 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { // error message exchange. func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { var maxPeerServers = 6 - tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ MaxPeerServers: maxPeerServers, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t, 0), nil @@ -930,14 +936,14 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { //TestHasPriceImplementation is to check that the Registry has a //`Price` interface implementation func TestHasPriceImplementation(t *testing.T) { - _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() if r.prices == nil { t.Fatal("No prices implementation available for the stream protocol") @@ -1105,7 +1111,6 @@ func TestRequestPeerSubscriptions(t *testing.T) { } } } - // print some output for p, subs := range fakeSubscriptions { log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p)) @@ -1114,3 +1119,239 @@ func TestRequestPeerSubscriptions(t *testing.T) { } } } + +// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function +func TestGetSubscriptions(t *testing.T) { + // create an amount of dummy peers + testPeerCount := 8 + // every peer will have this amount of dummy servers + testServerCount := 4 + // the peerMap which will store this data for the registry + peerMap := make(map[enode.ID]*Peer) + // create the registry + r := &Registry{} + api := NewAPI(r) + // call once, at this point should be empty + regs := api.GetPeerSubscriptions() + if len(regs) != 0 { + t.Fatal("Expected subscription count to be 0, but it is not") + } + + // now create a number of dummy servers for each node + for i := 0; i < testPeerCount; i++ { + addr := network.RandomAddr() + id := addr.ID() + p := &Peer{} + p.servers = make(map[Stream]*server) + for k := 0; k < testServerCount; k++ { + s := Stream{ + Name: strconv.Itoa(k), + Key: "", + Live: false, + } + p.servers[s] = &server{} + } + peerMap[id] = p + } + r.peers = peerMap + + // call the subscriptions again + regs = api.GetPeerSubscriptions() + // count how many (fake) subscriptions there are + cnt := 0 + for _, reg := range regs { + for range reg { + cnt++ + } + } + // check expected value + expectedCount := testPeerCount * testServerCount + if cnt != expectedCount { + t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt) + } +} + +/* +TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, +starts the simulation, waits for SyncUpdateDelay in order to kick off +stream registration, then tests that there are subscriptions. +*/ +func TestGetSubscriptionsRPC(t *testing.T) { + + // arbitrarily set to 4 + nodeCount := 4 + // run with more nodes if `longrunning` flag is set + if *longrunning { + nodeCount = 64 + } + // set the syncUpdateDelay for sync registrations to start + syncUpdateDelay := 200 * time.Millisecond + // holds the msg code for SubscribeMsg + var subscribeMsgCode uint64 + var ok bool + var expectedMsgCount counter + + // this channel signalizes that the expected amount of subscriptiosn is done + allSubscriptionsDone := make(chan struct{}) + // after the test, we need to reset the subscriptionFunc to the default + defer func() { subscriptionFunc = doRequestSubscription }() + + // we use this subscriptionFunc for this test: just increases count and calls the actual subscription + subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { + expectedMsgCount.inc() + doRequestSubscription(r, p, bin, subs) + return true + } + // create a standard sim + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) + if err != nil { + return nil, nil, err + } + + // configure so that sync registrations actually happen + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, //enable sync registrations + SyncUpdateDelay: syncUpdateDelay, + }, nil) + + // get the SubscribeMsg code + subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{}) + if !ok { + t.Fatal("Message code for SubscribeMsg not found") + } + + cleanup = func() { + r.Close() + clean() + } + + return r, cleanup, nil + }, + }) + defer sim.Close() + + ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelSimRun() + + // upload a snapshot + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + if err != nil { + t.Fatal(err) + } + + // setup the filter for SubscribeMsg + msgs := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode), + ) + + // strategy: listen to all SubscribeMsg events; after every event we wait + // if after `waitDuration` no more messages are being received, we assume the + // subscription phase has terminated! + + // the loop in this go routine will either wait for new message events + // or times out after 1 second, which signals that we are not receiving + // any new subscriptions any more + go func() { + //for long running sims, waiting 1 sec will not be enough + waitDuration := time.Duration(nodeCount/16) * time.Second + for { + select { + case <-ctx.Done(): + return + case m := <-msgs: // just reset the loop + if m.Error != nil { + log.Error("stream message", "err", m.Error) + continue + } + log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID) + case <-time.After(waitDuration): + // one second passed, don't assume more subscriptions + allSubscriptionsDone <- struct{}{} + log.Info("All subscriptions received") + return + + } + } + }() + + //run the simulation + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + log.Info("Simulation running") + nodes := sim.Net.Nodes + + //wait until all subscriptions are done + select { + case <-allSubscriptionsDone: + case <-ctx.Done(): + return errors.New("Context timed out") + } + + log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count()) + //now iterate again, this time we call each node via RPC to get its subscriptions + realCount := 0 + for _, node := range nodes { + //create rpc client + client, err := node.Client() + if err != nil { + return fmt.Errorf("create node 1 rpc client fail: %v", err) + } + + //ask it for subscriptions + pstreams := make(map[string][]string) + err = client.Call(&pstreams, "stream_getPeerSubscriptions") + if err != nil { + return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) + } + //length of the subscriptions can not be smaller than number of peers + log.Debug("node subscriptions", "node", node.String()) + for p, ps := range pstreams { + log.Debug("... with", "peer", p) + for _, s := range ps { + log.Debug(".......", "stream", s) + // each node also has subscriptions to RETRIEVE_REQUEST streams, + // we need to ignore those, we are only counting SYNC streams + if !strings.HasPrefix(s, "RETRIEVE_REQUEST") { + realCount++ + } + } + } + } + // every node is mutually subscribed to each other, so the actual count is half of it + emc := expectedMsgCount.count() + if realCount/2 != emc { + return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc) + } + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} + +// counter is used to concurrently increment +// and read an integer value. +type counter struct { + v int + mu sync.RWMutex +} + +// Increment the counter. +func (c *counter) inc() { + c.mu.Lock() + defer c.mu.Unlock() + + c.v++ +} + +// Read the counter value. +func (c *counter) count() int { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.v +} diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 014ec9a98..be0752a9d 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -24,7 +24,6 @@ import ( "math" "os" "sync" - "sync/atomic" "testing" "time" @@ -38,7 +37,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mock" - mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -46,9 +44,15 @@ const dataChunkCount = 200 func TestSyncerSimulation(t *testing.T) { testSyncBetweenNodes(t, 2, dataChunkCount, true, 1) - testSyncBetweenNodes(t, 4, dataChunkCount, true, 1) - testSyncBetweenNodes(t, 8, dataChunkCount, true, 1) - testSyncBetweenNodes(t, 16, dataChunkCount, true, 1) + // This test uses much more memory when running with + // race detector. Allow it to finish successfully by + // reducing its scope, and still check for data races + // with the smallest number of nodes. + if !raceTest { + testSyncBetweenNodes(t, 4, dataChunkCount, true, 1) + testSyncBetweenNodes(t, 8, dataChunkCount, true, 1) + testSyncBetweenNodes(t, 16, dataChunkCount, true, 1) + } } func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { @@ -73,50 +77,46 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) + addr := network.NewAddr(ctx.Config.Node()) //hack to put addresses in same space addr.OAddr[0] = byte(0) - if *useMockStore { - store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr) - } else { - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) - } + netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - bucket.Store(bucketKeyDelivery, delivery) + var dir string + var store *state.DBStore + if raceTest { + // Use on-disk DBStore to reduce memory consumption in race tests. + dir, err = ioutil.TempDir("", "swarm-stream-") + if err != nil { + return nil, nil, err + } + store, err = state.NewDBStore(dir) + if err != nil { + return nil, nil, err + } + } else { + store = state.NewInmemoryStore() + } - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }, nil) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + if dir != "" { + os.RemoveAll(dir) + } + } return r, cleanup, nil - }, }) defer sim.Close() @@ -139,26 +139,10 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p nodeIndex[id] = i } - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() @@ -167,7 +151,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p id := nodeIDs[j] client, err := sim.Net.GetNode(id).Client() if err != nil { - t.Fatal(err) + return fmt.Errorf("node %s client: %v", id, err) } sid := nodeIDs[j+1] client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) @@ -183,7 +167,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p size := chunkCount * chunkSize _, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false) if err != nil { - t.Fatal(err.Error()) + return fmt.Errorf("fileStore.Store: %v", err) } wait(ctx) } @@ -251,44 +235,26 @@ func TestSameVersionID(t *testing.T) { v := uint(1) sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) - - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, }, nil) + bucket.Store(bucketKeyRegistry, r) + //assign to each node the same version ID r.spec.Version = v - bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -316,7 +282,7 @@ func TestSameVersionID(t *testing.T) { //the peers should connect, thus getting the peer should not return nil if registry.getPeer(nodes[1]) == nil { - t.Fatal("Expected the peer to not be nil, but it is") + return errors.New("Expected the peer to not be nil, but it is") } return nil }) @@ -333,46 +299,27 @@ func TestDifferentVersionID(t *testing.T) { v := uint(0) sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) - - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, }, nil) + bucket.Store(bucketKeyRegistry, r) //increase the version ID for each node v++ r.spec.Version = v - bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -400,7 +347,7 @@ func TestDifferentVersionID(t *testing.T) { //getting the other peer should fail due to the different version numbers if registry.getPeer(nodes[1]) != nil { - t.Fatal("Expected the peer to be nil, but it is not") + return errors.New("Expected the peer to be nil, but it is not") } return nil }) diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json new file mode 100644 index 000000000..a8b617407 --- /dev/null +++ b/swarm/network/stream/testing/snapshot_4.json @@ -0,0 +1 @@ +{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]} diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go index 18b4c8fb0..cf4405ec1 100644 --- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go +++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "io" - "os" "sync" "testing" "time" @@ -37,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -68,31 +66,6 @@ func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulati return nodeCount, chunkCount, sim } -//watch for disconnections and wait for healthy -func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) { - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) - - if _, err := sim.WaitTillHealthy(ctx); err != nil { - panic(err) - } - - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - go func() { - for d := range disconnections { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - panic("unexpected disconnect") - cancelSimRun() - } - }() - - return ctx, cancelSimRun -} - //This test requests bogus hashes into the network func TestNonExistingHashesWithServer(t *testing.T) { @@ -104,19 +77,25 @@ func TestNonExistingHashesWithServer(t *testing.T) { panic(err) } - ctx, cancelSimRun := watchSim(sim) - defer cancelSimRun() - //in order to get some meaningful visualization, it is beneficial //to define a minimum duration of this test testDuration := 20 * time.Second - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil { + if yes, ok := disconnected.Load().(bool); ok && yes { + err = errors.New("disconnect events received") + } + } + }() + //check on the node's FileStore (netstore) id := sim.Net.GetRandomUpNode().ID() item, ok := sim.NodeItem(id, bucketKeyFileStore) if !ok { - t.Fatalf("No filestore") + return errors.New("No filestore") } fileStore := item.(*storage.FileStore) //create a bogus hash @@ -171,21 +150,10 @@ func TestSnapshotSyncWithServer(t *testing.T) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, @@ -201,9 +169,8 @@ func TestSnapshotSyncWithServer(t *testing.T) { bucket.Store(bucketKeyRegistry, tr) cleanup = func() { - netStore.Close() tr.Close() - os.RemoveAll(datadir) + clean() } return tr, cleanup, nil @@ -229,9 +196,6 @@ func TestSnapshotSyncWithServer(t *testing.T) { panic(err) } - ctx, cancelSimRun := watchSim(sim) - defer cancelSimRun() - //run the sim result := runSim(conf, ctx, sim, chunkCount) |