aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/common_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r--swarm/network/stream/common_test.go149
1 files changed, 142 insertions, 7 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 29b917d39..afd08d275 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -26,17 +26,19 @@ import (
"math/rand"
"os"
"strings"
+ "sync"
"sync/atomic"
- "testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
@@ -67,7 +69,81 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
-func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
+// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
+func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
+func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ n := ctx.Config.Node()
+
+ store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ if *useMockStore {
+ store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
+ }
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ localStore := store.(*storage.LocalStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, netStore)
+
+ bucket.Store(bucketKeyStore, store)
+ bucket.Store(bucketKeyDB, netStore)
+ bucket.Store(bucketKeyDelivery, delivery)
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ cleanup := func() {
+ netStore.Close()
+ os.RemoveAll(datadir)
+ }
+
+ return netStore, delivery, cleanup, nil
+}
+
+func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@@ -75,7 +151,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
- return nil, nil, nil, func() {}, err
+ return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
@@ -87,12 +163,14 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
delivery := NewDelivery(to, netStore)
@@ -102,10 +180,11 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
streamer.Close()
removeDataDir()
}
- protocolTester := p2ptest.NewProtocolTester(t, addr.ID(), 1, streamer.runProtocol)
+ protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
- err = waitForPeers(streamer, 1*time.Second, 1)
+ err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
+ teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}
@@ -138,6 +217,11 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
+// not used in this context, only to fulfill ChunkStore interface
+func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool {
+ panic("RoundRobinStor doesn't support HasChunk")
+}
+
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
@@ -236,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
}
return store, datadir, nil
}
+
+// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
+// disconnected to true in case of a disconnect event.
+func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ ctx,
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Drop(),
+ )
+ disconnected = new(boolean)
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-disconnections:
+ if d.Error != nil {
+ log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
+ } else {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
+ }
+ disconnected.set(true)
+ }
+ }
+ }()
+ return disconnected
+}
+
+// boolean is used to concurrently set
+// and read a boolean value.
+type boolean struct {
+ v bool
+ mu sync.RWMutex
+}
+
+// set sets the value.
+func (b *boolean) set(v bool) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.v = v
+}
+
+// bool reads the value.
+func (b *boolean) bool() bool {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ return b.v
+}