aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/common_test.go149
-rw-r--r--swarm/network/stream/delivery.go9
-rw-r--r--swarm/network/stream/delivery_test.go337
-rw-r--r--swarm/network/stream/intervals_test.go58
-rw-r--r--swarm/network/stream/lightnode_test.go16
-rw-r--r--swarm/network/stream/messages.go2
-rw-r--r--swarm/network/stream/norace_test.go24
-rw-r--r--swarm/network/stream/peer.go26
-rw-r--r--swarm/network/stream/race_test.go23
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go69
-rw-r--r--swarm/network/stream/snapshot_sync_test.go107
-rw-r--r--swarm/network/stream/stream.go40
-rw-r--r--swarm/network/stream/streamer_test.go295
-rw-r--r--swarm/network/stream/syncer_test.go161
-rw-r--r--swarm/network/stream/testing/snapshot_4.json1
-rw-r--r--swarm/network/stream/visualized_snapshot_sync_sim_test.go62
16 files changed, 823 insertions, 556 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 29b917d39..afd08d275 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -26,17 +26,19 @@ import (
"math/rand"
"os"
"strings"
+ "sync"
"sync/atomic"
- "testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
@@ -67,7 +69,81 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
-func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
+// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
+func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
+func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ n := ctx.Config.Node()
+
+ store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ if *useMockStore {
+ store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
+ }
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ localStore := store.(*storage.LocalStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, netStore)
+
+ bucket.Store(bucketKeyStore, store)
+ bucket.Store(bucketKeyDB, netStore)
+ bucket.Store(bucketKeyDelivery, delivery)
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ cleanup := func() {
+ netStore.Close()
+ os.RemoveAll(datadir)
+ }
+
+ return netStore, delivery, cleanup, nil
+}
+
+func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@@ -75,7 +151,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
- return nil, nil, nil, func() {}, err
+ return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
@@ -87,12 +163,14 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
delivery := NewDelivery(to, netStore)
@@ -102,10 +180,11 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
streamer.Close()
removeDataDir()
}
- protocolTester := p2ptest.NewProtocolTester(t, addr.ID(), 1, streamer.runProtocol)
+ protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
- err = waitForPeers(streamer, 1*time.Second, 1)
+ err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
+ teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}
@@ -138,6 +217,11 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
+// not used in this context, only to fulfill ChunkStore interface
+func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool {
+ panic("RoundRobinStor doesn't support HasChunk")
+}
+
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
@@ -236,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
}
return store, datadir, nil
}
+
+// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
+// disconnected to true in case of a disconnect event.
+func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ ctx,
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Drop(),
+ )
+ disconnected = new(boolean)
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-disconnections:
+ if d.Error != nil {
+ log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
+ } else {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
+ }
+ disconnected.set(true)
+ }
+ }
+ }()
+ return disconnected
+}
+
+// boolean is used to concurrently set
+// and read a boolean value.
+type boolean struct {
+ v bool
+ mu sync.RWMutex
+}
+
+// set sets the value.
+func (b *boolean) set(v bool) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.v = v
+}
+
+// bool reads the value.
+func (b *boolean) bool() bool {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ return b.v
+}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index e1a13fe8d..fae6994f0 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
ctx, osp = spancontext.StartSpan(
ctx,
"retrieve.request")
- defer osp.Finish()
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
}()
go func() {
+ defer osp.Finish()
chunk, err := d.chunkStore.Get(ctx, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
ctx, osp = spancontext.StartSpan(
ctx,
"chunk.delivery")
- defer osp.Finish()
processReceivedChunksCount.Inc(1)
go func() {
+ defer osp.Finish()
+
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
@@ -255,8 +256,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
return true
}
sp = d.getPeer(id)
+ // sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
if sp == nil {
- //log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
return true
}
spID = &id
@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
- }, Top)
+ }, Top, "request.from.peers")
if err != nil {
return nil, nil, err
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 70d3829b3..49e4a423a 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -21,9 +21,7 @@ import (
"context"
"errors"
"fmt"
- "os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -48,11 +46,11 @@ func TestStreamerRetrieveRequest(t *testing.T) {
Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
}
- tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(regOpts)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -97,14 +95,14 @@ func TestStreamerRetrieveRequest(t *testing.T) {
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled, //do no syncing
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -169,14 +167,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -359,14 +357,14 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
}
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return &testClient{
@@ -455,164 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
}
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
-
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- Syncing: SyncingDisabled,
- Retrieval: RetrievalEnabled,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
-
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ t.Helper()
+ t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- return r, cleanup, nil
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
+ }, nil)
+ bucket.Store(bucketKeyRegistry, r)
- },
- })
- defer sim.Close()
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- log.Info("Adding nodes to simulation")
- _, err := sim.AddNodesAndConnectChain(nodes)
- if err != nil {
- t.Fatal(err)
- }
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
- log.Info("Starting simulation")
- ctx := context.Background()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
- nodeIDs := sim.UpNodeIDs()
- //determine the pivot node to be the first node of the simulation
- pivot := nodeIDs[0]
-
- //distribute chunks of a random file into Stores of nodes 1 to nodes
- //we will do this by creating a file store with an underlying round-robin store:
- //the file store will create a hash for the uploaded file, but every chunk will be
- //distributed to different nodes via round-robin scheduling
- log.Debug("Writing file to round-robin file store")
- //to do this, we create an array for chunkstores (length minus one, the pivot node)
- stores := make([]storage.ChunkStore, len(nodeIDs)-1)
- //we then need to get all stores from the sim....
- lStores := sim.NodesItems(bucketKeyStore)
- i := 0
- //...iterate the buckets...
- for id, bucketVal := range lStores {
- //...and remove the one which is the pivot node
- if id == pivot {
- continue
- }
- //the other ones are added to the array...
- stores[i] = bucketVal.(storage.ChunkStore)
- i++
- }
- //...which then gets passed to the round-robin file store
- roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
- //now we can actually upload a (random) file to the round-robin store
- size := chunkCount * chunkSize
- log.Debug("Storing data to file store")
- fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
- // wait until all chunks stored
- if err != nil {
- return err
- }
- err = wait(ctx)
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- return err
+ t.Fatal(err)
}
- log.Debug("Waiting for kademlia")
- // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
-
- //get the pivot node's filestore
- item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- pivotFileStore := item.(*storage.FileStore)
- log.Debug("Starting retrieval routine")
- retErrC := make(chan error)
- go func() {
- // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
- // we must wait for the peer connections to have started before requesting
- n, err := readAll(pivotFileStore, fileHash)
- log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
- retErrC <- err
- }()
-
- log.Debug("Watching for disconnections")
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
+ log.Info("Starting simulation")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ nodeIDs := sim.UpNodeIDs()
+ //determine the pivot node to be the first node of the simulation
+ pivot := nodeIDs[0]
+
+ //distribute chunks of a random file into Stores of nodes 1 to nodes
+ //we will do this by creating a file store with an underlying round-robin store:
+ //the file store will create a hash for the uploaded file, but every chunk will be
+ //distributed to different nodes via round-robin scheduling
+ log.Debug("Writing file to round-robin file store")
+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
+ stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+ //we then need to get all stores from the sim....
+ lStores := sim.NodesItems(bucketKeyStore)
+ i := 0
+ //...iterate the buckets...
+ for id, bucketVal := range lStores {
+ //...and remove the one which is the pivot node
+ if id == pivot {
+ continue
}
+ //the other ones are added to the array...
+ stores[i] = bucketVal.(storage.ChunkStore)
+ i++
}
- }()
- defer func() {
+ //...which then gets passed to the round-robin file store
+ roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
+ //now we can actually upload a (random) file to the round-robin store
+ size := chunkCount * chunkSize
+ log.Debug("Storing data to file store")
+ fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
+ // wait until all chunks stored
+ if err != nil {
+ return err
+ }
+ err = wait(ctx)
if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
+ return err
+ }
+
+ log.Debug("Waiting for kademlia")
+ // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
+ if _, err := sim.WaitTillHealthy(ctx); err != nil {
+ return err
+ }
+
+ //get the pivot node's filestore
+ item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ pivotFileStore := item.(*storage.FileStore)
+ log.Debug("Starting retrieval routine")
+ retErrC := make(chan error)
+ go func() {
+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+ // we must wait for the peer connections to have started before requesting
+ n, err := readAll(pivotFileStore, fileHash)
+ log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+ retErrC <- err
+ }()
+
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
- }
- }()
+ }()
- //finally check that the pivot node gets all chunks via the root hash
- log.Debug("Check retrieval")
- success := true
- var total int64
- total, err = readAll(pivotFileStore, fileHash)
- if err != nil {
- return err
- }
- log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
- if err != nil || total != int64(size) {
- success = false
- }
+ //finally check that the pivot node gets all chunks via the root hash
+ log.Debug("Check retrieval")
+ success := true
+ var total int64
+ total, err = readAll(pivotFileStore, fileHash)
+ if err != nil {
+ return err
+ }
+ log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+ if err != nil || total != int64(size) {
+ success = false
+ }
- if !success {
- return fmt.Errorf("Test failed, chunks not available on all nodes")
- }
- if err := <-retErrC; err != nil {
- t.Fatalf("requesting chunks: %v", err)
+ if !success {
+ return fmt.Errorf("Test failed, chunks not available on all nodes")
+ }
+ if err := <-retErrC; err != nil {
+ return fmt.Errorf("requesting chunks: %v", err)
+ }
+ log.Debug("Test terminated successfully")
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
}
- log.Debug("Test terminated successfully")
- return nil
})
- if result.Error != nil {
- t.Fatal(result.Error)
- }
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
@@ -644,25 +614,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
@@ -670,12 +625,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -686,21 +643,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
b.Fatal(err)
}
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]
item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
- b.Fatal("No filestore")
+ return errors.New("No filestore")
}
remoteFileStore := item.(*storage.FileStore)
pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
- b.Fatal("No filestore")
+ return errors.New("No filestore")
}
netStore := item.(*storage.NetStore)
@@ -708,26 +666,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
return err
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
// benchmark loop
@@ -742,12 +684,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
if err != nil {
- b.Fatalf("expected no error. got %v", err)
+ return fmt.Errorf("store: %v", err)
}
// wait until all chunks stored
err = wait(ctx)
if err != nil {
- b.Fatalf("expected no error. got %v", err)
+ return fmt.Errorf("wait store: %v", err)
}
// collect the hashes
hashes[i] = hash
@@ -783,10 +725,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
break Loop
}
}
- if err != nil {
- b.Fatal(err)
- }
- return nil
+ return err
})
if result.Error != nil {
b.Fatal(result.Error)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 8f2bed9d6..009a941ef 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -21,9 +21,7 @@ import (
"encoding/binary"
"errors"
"fmt"
- "os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -31,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -62,26 +59,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)
sim := simulation.New(map[string]simulation.ServiceFunc{
- "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -97,11 +79,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
- fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup := func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -134,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
if err != nil {
- log.Error("Store error: %v", "err", err)
- t.Fatal(err)
+ return fmt.Errorf("store: %v", err)
}
err = wait(ctx)
if err != nil {
- log.Error("Wait error: %v", "err", err)
- t.Fatal(err)
+ return fmt.Errorf("wait store: %v", err)
}
item, ok = sim.NodeItem(checker, bucketKeyRegistry)
@@ -152,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- log.Debug("Watching for disconnections")
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 65cde2411..501660fab 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -28,11 +28,11 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -67,11 +67,11 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -111,11 +111,11 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -156,11 +156,11 @@ func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..de4e8a3bb 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(ctx, msg, c.priority)
+ err := p.SendPriority(ctx, msg, c.priority, "")
if err != nil {
log.Warn("SendPriority error", "err", err)
}
diff --git a/swarm/network/stream/norace_test.go b/swarm/network/stream/norace_test.go
new file mode 100644
index 000000000..b324f6939
--- /dev/null
+++ b/swarm/network/stream/norace_test.go
@@ -0,0 +1,24 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !race
+
+package stream
+
+// Provide a flag to reduce the scope of tests when running them
+// with race detector. Some of the tests are doing a lot of allocations
+// on the heap, and race detector uses much more memory to track them.
+const raceTest = false
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 4bccf56f5..68da8f44a 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -65,6 +65,7 @@ type Peer struct {
// on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams
quit chan struct{}
+ spans sync.Map
}
type WrappedPriorityMsg struct {
@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
+ spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
+ defer p.spans.Delete(wmsg.Context)
+ sp, ok := p.spans.Load(wmsg.Context)
+ if ok {
+ defer sp.(opentracing.Span).Finish()
+ }
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
- var sp opentracing.Span
var msg interface{}
spanName := "send.chunk.delivery"
@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
}
spanName += ".retrieval"
}
- ctx, sp = spancontext.StartSpan(
- ctx,
- spanName)
- defer sp.Finish()
- return p.SendPriority(ctx, msg, priority)
+ return p.SendPriority(ctx, msg, priority, spanName)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
+ if traceId != "" {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ traceId,
+ )
+ p.spans.Store(ctx, sp)
+ }
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(ctx, msg, s.priority)
+ return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
}
func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/race_test.go b/swarm/network/stream/race_test.go
new file mode 100644
index 000000000..8aed3542b
--- /dev/null
+++ b/swarm/network/stream/race_test.go
@@ -0,0 +1,23 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build race
+
+package stream
+
+// Reduce the scope of some tests when running with race detector,
+// as it raises the memory consumption significantly.
+const raceTest = true
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index d345ac8d0..afb023ae2 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
- "os"
"sync"
"testing"
"time"
@@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -76,7 +74,7 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
- err := runRetrievalTest(*chunks, *nodes)
+ err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -95,53 +93,37 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
- err := runRetrievalTest(c, n)
- if err != nil {
- t.Fatal(err)
- }
+ t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
+ err := runRetrievalTest(t, c, n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
}
}
}
}
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
- "streamer": retrievalStreamerFunc,
-}
-
-func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
-
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalEnabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- return r, cleanup, nil
+ return r, cleanup, nil
+ },
}
/*
@@ -171,7 +153,7 @@ func runFileRetrievalTest(nodeCount int) error {
return err
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
@@ -245,7 +227,8 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
-func runRetrievalTest(chunkCount int, nodeCount int) error {
+func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
+ t.Helper()
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 6af19c12a..b45d0aed5 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -17,11 +17,12 @@ package stream
import (
"context"
+ "errors"
"fmt"
+ "io/ioutil"
"os"
"runtime"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -92,6 +93,15 @@ func TestSyncingViaGlobalSync(t *testing.T) {
if *longrunning {
chnkCnt = []int{1, 8, 32, 256, 1024}
nodeCnt = []int{16, 32, 64, 128, 256}
+ } else if raceTest {
+ // TestSyncingViaGlobalSync allocates a lot of memory
+ // with race detector. By reducing the number of chunks
+ // and nodes, memory consumption is lower and data races
+ // are still checked, while correctness of syncing is
+ // tested with more chunks and nodes in regular (!race)
+ // tests.
+ chnkCnt = []int{4}
+ nodeCnt = []int{16}
} else {
//default test
chnkCnt = []int{4, 32}
@@ -107,42 +117,43 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
var simServiceMap = map[string]simulation.ServiceFunc{
- "streamer": streamerFunc,
-}
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
-func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
-
- bucket.Store(bucketKeyRegistry, r)
-
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ var dir string
+ var store *state.DBStore
+ if raceTest {
+ // Use on-disk DBStore to reduce memory consumption in race tests.
+ dir, err = ioutil.TempDir("", "swarm-stream-")
+ if err != nil {
+ return nil, nil, err
+ }
+ store, err = state.NewDBStore(dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ } else {
+ store = state.NewInmemoryStore()
+ }
+
+ r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- return r, cleanup, nil
+ bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
}
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
@@ -171,36 +182,24 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
t.Fatal(err)
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
-
result := runSim(conf, ctx, sim, chunkCount)
if result.Error != nil {
t.Fatal(result.Error)
}
- if yes, ok := disconnected.Load().(bool); ok && yes {
- t.Fatal("disconnect events received")
- }
log.Info("Simulation ended")
}
func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
- return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
+ }
+ }()
+
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index fb571c856..cb5912185 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.SendPriority(context.TODO(), msg, priority, "")
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod
// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
// from deepest bins backwards
kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
+ // nodes that do not provide stream protocol
+ // should not be subscribed, e.g. bootnodes
+ if !p.HasCap("stream") {
+ return true
+ }
//if the peer's bin is shallower than the kademlia depth,
//only the peer's bin should be subscribed
if po < kadDepth {
@@ -725,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil {
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+
+ if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
@@ -929,3 +935,33 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior
func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
return api.streamer.Unsubscribe(peerId, s)
}
+
+/*
+GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
+It can be called via RPC.
+It returns a map of node IDs with an array of string representations of Stream objects.
+*/
+func (api *API) GetPeerSubscriptions() map[string][]string {
+ //create the empty map
+ pstreams := make(map[string][]string)
+
+ //iterate all streamer peers
+ api.streamer.peersMu.RLock()
+ defer api.streamer.peersMu.RUnlock()
+
+ for id, p := range api.streamer.peers {
+ var streams []string
+ //every peer has a map of stream servers
+ //every stream server represents a subscription
+ p.serverMu.RLock()
+ for s := range p.servers {
+ //append the string representation of the stream
+ //to the list for this peer
+ streams = append(streams, s.String())
+ }
+ p.serverMu.RUnlock()
+ //set the array of stream servers to the map
+ pstreams[id.String()] = streams
+ }
+ return pstreams
+}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index cdaeb92d0..e92ee3783 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -22,23 +22,29 @@ import (
"errors"
"fmt"
"strconv"
+ "strings"
+ "sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"golang.org/x/crypto/sha3"
)
func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -48,11 +54,11 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -139,11 +145,11 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
@@ -232,11 +238,11 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
@@ -299,11 +305,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -365,11 +371,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -409,11 +415,11 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -472,11 +478,11 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -537,11 +543,11 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -636,11 +642,11 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
@@ -769,15 +775,15 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
// leaving place for new streams.
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -845,13 +851,13 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
// error message exchange.
func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -930,14 +936,14 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
//TestHasPriceImplementation is to check that the Registry has a
//`Price` interface implementation
func TestHasPriceImplementation(t *testing.T) {
- _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ _, r, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
@@ -1105,7 +1111,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
-
// print some output
for p, subs := range fakeSubscriptions {
log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
@@ -1114,3 +1119,239 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
+
+// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
+func TestGetSubscriptions(t *testing.T) {
+ // create an amount of dummy peers
+ testPeerCount := 8
+ // every peer will have this amount of dummy servers
+ testServerCount := 4
+ // the peerMap which will store this data for the registry
+ peerMap := make(map[enode.ID]*Peer)
+ // create the registry
+ r := &Registry{}
+ api := NewAPI(r)
+ // call once, at this point should be empty
+ regs := api.GetPeerSubscriptions()
+ if len(regs) != 0 {
+ t.Fatal("Expected subscription count to be 0, but it is not")
+ }
+
+ // now create a number of dummy servers for each node
+ for i := 0; i < testPeerCount; i++ {
+ addr := network.RandomAddr()
+ id := addr.ID()
+ p := &Peer{}
+ p.servers = make(map[Stream]*server)
+ for k := 0; k < testServerCount; k++ {
+ s := Stream{
+ Name: strconv.Itoa(k),
+ Key: "",
+ Live: false,
+ }
+ p.servers[s] = &server{}
+ }
+ peerMap[id] = p
+ }
+ r.peers = peerMap
+
+ // call the subscriptions again
+ regs = api.GetPeerSubscriptions()
+ // count how many (fake) subscriptions there are
+ cnt := 0
+ for _, reg := range regs {
+ for range reg {
+ cnt++
+ }
+ }
+ // check expected value
+ expectedCount := testPeerCount * testServerCount
+ if cnt != expectedCount {
+ t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt)
+ }
+}
+
+/*
+TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
+starts the simulation, waits for SyncUpdateDelay in order to kick off
+stream registration, then tests that there are subscriptions.
+*/
+func TestGetSubscriptionsRPC(t *testing.T) {
+
+ // arbitrarily set to 4
+ nodeCount := 4
+ // run with more nodes if `longrunning` flag is set
+ if *longrunning {
+ nodeCount = 64
+ }
+ // set the syncUpdateDelay for sync registrations to start
+ syncUpdateDelay := 200 * time.Millisecond
+ // holds the msg code for SubscribeMsg
+ var subscribeMsgCode uint64
+ var ok bool
+ var expectedMsgCount counter
+
+ // this channel signalizes that the expected amount of subscriptiosn is done
+ allSubscriptionsDone := make(chan struct{})
+ // after the test, we need to reset the subscriptionFunc to the default
+ defer func() { subscriptionFunc = doRequestSubscription }()
+
+ // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
+ subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
+ expectedMsgCount.inc()
+ doRequestSubscription(r, p, bin, subs)
+ return true
+ }
+ // create a standard sim
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // configure so that sync registrations actually happen
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe, //enable sync registrations
+ SyncUpdateDelay: syncUpdateDelay,
+ }, nil)
+
+ // get the SubscribeMsg code
+ subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
+ if !ok {
+ t.Fatal("Message code for SubscribeMsg not found")
+ }
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
+
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ // upload a snapshot
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // setup the filter for SubscribeMsg
+ msgs := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
+ )
+
+ // strategy: listen to all SubscribeMsg events; after every event we wait
+ // if after `waitDuration` no more messages are being received, we assume the
+ // subscription phase has terminated!
+
+ // the loop in this go routine will either wait for new message events
+ // or times out after 1 second, which signals that we are not receiving
+ // any new subscriptions any more
+ go func() {
+ //for long running sims, waiting 1 sec will not be enough
+ waitDuration := time.Duration(nodeCount/16) * time.Second
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case m := <-msgs: // just reset the loop
+ if m.Error != nil {
+ log.Error("stream message", "err", m.Error)
+ continue
+ }
+ log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
+ case <-time.After(waitDuration):
+ // one second passed, don't assume more subscriptions
+ allSubscriptionsDone <- struct{}{}
+ log.Info("All subscriptions received")
+ return
+
+ }
+ }
+ }()
+
+ //run the simulation
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ log.Info("Simulation running")
+ nodes := sim.Net.Nodes
+
+ //wait until all subscriptions are done
+ select {
+ case <-allSubscriptionsDone:
+ case <-ctx.Done():
+ return errors.New("Context timed out")
+ }
+
+ log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
+ //now iterate again, this time we call each node via RPC to get its subscriptions
+ realCount := 0
+ for _, node := range nodes {
+ //create rpc client
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("create node 1 rpc client fail: %v", err)
+ }
+
+ //ask it for subscriptions
+ pstreams := make(map[string][]string)
+ err = client.Call(&pstreams, "stream_getPeerSubscriptions")
+ if err != nil {
+ return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
+ }
+ //length of the subscriptions can not be smaller than number of peers
+ log.Debug("node subscriptions", "node", node.String())
+ for p, ps := range pstreams {
+ log.Debug("... with", "peer", p)
+ for _, s := range ps {
+ log.Debug(".......", "stream", s)
+ // each node also has subscriptions to RETRIEVE_REQUEST streams,
+ // we need to ignore those, we are only counting SYNC streams
+ if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
+ realCount++
+ }
+ }
+ }
+ }
+ // every node is mutually subscribed to each other, so the actual count is half of it
+ emc := expectedMsgCount.count()
+ if realCount/2 != emc {
+ return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
+ }
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
+
+// counter is used to concurrently increment
+// and read an integer value.
+type counter struct {
+ v int
+ mu sync.RWMutex
+}
+
+// Increment the counter.
+func (c *counter) inc() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v++
+}
+
+// Read the counter value.
+func (c *counter) count() int {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ return c.v
+}
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 014ec9a98..be0752a9d 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -24,7 +24,6 @@ import (
"math"
"os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -38,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
- mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
@@ -46,9 +44,15 @@ const dataChunkCount = 200
func TestSyncerSimulation(t *testing.T) {
testSyncBetweenNodes(t, 2, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+ // This test uses much more memory when running with
+ // race detector. Allow it to finish successfully by
+ // reducing its scope, and still check for data races
+ // with the smallest number of nodes.
+ if !raceTest {
+ testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
+ testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
+ testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+ }
}
func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
@@ -73,50 +77,46 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
+ addr := network.NewAddr(ctx.Config.Node())
//hack to put addresses in same space
addr.OAddr[0] = byte(0)
- if *useMockStore {
- store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr)
- } else {
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
- }
+ netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- bucket.Store(bucketKeyDelivery, delivery)
+ var dir string
+ var store *state.DBStore
+ if raceTest {
+ // Use on-disk DBStore to reduce memory consumption in race tests.
+ dir, err = ioutil.TempDir("", "swarm-stream-")
+ if err != nil {
+ return nil, nil, err
+ }
+ store, err = state.NewDBStore(dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ } else {
+ store = state.NewInmemoryStore()
+ }
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
}, nil)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ if dir != "" {
+ os.RemoveAll(dir)
+ }
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -139,26 +139,10 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
nodeIndex[id] = i
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
@@ -167,7 +151,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
id := nodeIDs[j]
client, err := sim.Net.GetNode(id).Client()
if err != nil {
- t.Fatal(err)
+ return fmt.Errorf("node %s client: %v", id, err)
}
sid := nodeIDs[j+1]
client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
@@ -183,7 +167,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
size := chunkCount * chunkSize
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false)
if err != nil {
- t.Fatal(err.Error())
+ return fmt.Errorf("fileStore.Store: %v", err)
}
wait(ctx)
}
@@ -251,44 +235,26 @@ func TestSameVersionID(t *testing.T) {
v := uint(1)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
+
//assign to each node the same version ID
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -316,7 +282,7 @@ func TestSameVersionID(t *testing.T) {
//the peers should connect, thus getting the peer should not return nil
if registry.getPeer(nodes[1]) == nil {
- t.Fatal("Expected the peer to not be nil, but it is")
+ return errors.New("Expected the peer to not be nil, but it is")
}
return nil
})
@@ -333,46 +299,27 @@ func TestDifferentVersionID(t *testing.T) {
v := uint(0)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
//increase the version ID for each node
v++
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -400,7 +347,7 @@ func TestDifferentVersionID(t *testing.T) {
//getting the other peer should fail due to the different version numbers
if registry.getPeer(nodes[1]) != nil {
- t.Fatal("Expected the peer to be nil, but it is not")
+ return errors.New("Expected the peer to be nil, but it is not")
}
return nil
})
diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json
new file mode 100644
index 000000000..a8b617407
--- /dev/null
+++ b/swarm/network/stream/testing/snapshot_4.json
@@ -0,0 +1 @@
+{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
index 18b4c8fb0..cf4405ec1 100644
--- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
- "os"
"sync"
"testing"
"time"
@@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -68,31 +66,6 @@ func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulati
return nodeCount, chunkCount, sim
}
-//watch for disconnections and wait for healthy
-func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) {
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
-
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- panic(err)
- }
-
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- go func() {
- for d := range disconnections {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- panic("unexpected disconnect")
- cancelSimRun()
- }
- }()
-
- return ctx, cancelSimRun
-}
-
//This test requests bogus hashes into the network
func TestNonExistingHashesWithServer(t *testing.T) {
@@ -104,19 +77,25 @@ func TestNonExistingHashesWithServer(t *testing.T) {
panic(err)
}
- ctx, cancelSimRun := watchSim(sim)
- defer cancelSimRun()
-
//in order to get some meaningful visualization, it is beneficial
//to define a minimum duration of this test
testDuration := 20 * time.Second
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil {
+ if yes, ok := disconnected.Load().(bool); ok && yes {
+ err = errors.New("disconnect events received")
+ }
+ }
+ }()
+
//check on the node's FileStore (netstore)
id := sim.Net.GetRandomUpNode().ID()
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
- t.Fatalf("No filestore")
+ return errors.New("No filestore")
}
fileStore := item.(*storage.FileStore)
//create a bogus hash
@@ -171,21 +150,10 @@ func TestSnapshotSyncWithServer(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -201,9 +169,8 @@ func TestSnapshotSyncWithServer(t *testing.T) {
bucket.Store(bucketKeyRegistry, tr)
cleanup = func() {
- netStore.Close()
tr.Close()
- os.RemoveAll(datadir)
+ clean()
}
return tr, cleanup, nil
@@ -229,9 +196,6 @@ func TestSnapshotSyncWithServer(t *testing.T) {
panic(err)
}
- ctx, cancelSimRun := watchSim(sim)
- defer cancelSimRun()
-
//run the sim
result := runSim(conf, ctx, sim, chunkCount)