aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2019-02-01 16:58:46 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2019-02-01 16:58:46 +0800
commit597597e8b27ee60a25b4533771702892e72898a5 (patch)
treeedb8194e485c9aecdb1234486bbd95e09396a07c /swarm/network/stream
parenta89170cfb2acd33aea99551cb9524bcdfaad96ec (diff)
downloadgo-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar.gz
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar.bz2
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar.lz
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar.xz
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.tar.zst
go-tangerine-597597e8b27ee60a25b4533771702892e72898a5.zip
swarm/network: refactor simulation tests bootstrap (#18975)
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/common_test.go77
-rw-r--r--swarm/network/stream/delivery_test.go51
-rw-r--r--swarm/network/stream/intervals_test.go28
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go52
-rw-r--r--swarm/network/stream/snapshot_sync_test.go51
-rw-r--r--swarm/network/stream/streamer_test.go24
-rw-r--r--swarm/network/stream/syncer_test.go100
-rw-r--r--swarm/network/stream/testing/snapshot_4.json2
-rw-r--r--swarm/network/stream/visualized_snapshot_sync_sim_test.go18
9 files changed, 156 insertions, 247 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 7b2962608..3b6e4a946 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -26,16 +26,19 @@ import (
"math/rand"
"os"
"strings"
+ "sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
@@ -66,6 +69,80 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
+// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
+func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
+func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ n := ctx.Config.Node()
+
+ store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ if *useMockStore {
+ store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
+ }
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ localStore := store.(*storage.LocalStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, netStore)
+
+ bucket.Store(bucketKeyStore, store)
+ bucket.Store(bucketKeyDB, netStore)
+ bucket.Store(bucketKeyDelivery, delivery)
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ cleanup := func() {
+ netStore.Close()
+ os.RemoveAll(datadir)
+ }
+
+ return netStore, delivery, cleanup, nil
+}
+
func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 6f1ddc659..cb7690f3e 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
- "os"
"sync"
"sync/atomic"
"testing"
@@ -457,27 +456,11 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
@@ -485,11 +468,12 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}, nil)
bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -644,25 +628,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
@@ -670,12 +639,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 8f2bed9d6..248ba0c84 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -21,7 +21,6 @@ import (
"encoding/binary"
"errors"
"fmt"
- "os"
"sync"
"sync/atomic"
"testing"
@@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)
sim := simulation.New(map[string]simulation.ServiceFunc{
- "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
- fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup := func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index d345ac8d0..f097e4180 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
- "os"
"sync"
"testing"
"time"
@@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) {
}
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
- "streamer": retrievalStreamerFunc,
-}
-
-func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
-
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalEnabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- return r, cleanup, nil
+ return r, cleanup, nil
+ },
}
/*
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 6af19c12a..c32ed7d07 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
var simServiceMap = map[string]simulation.ServiceFunc{
- "streamer": streamerFunc,
-}
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
-func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
-
- bucket.Store(bucketKeyRegistry, r)
-
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- return r, cleanup, nil
+ bucket.Store(bucketKeyRegistry, r)
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+ return r, cleanup, nil
+ },
}
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index b83521f06..c2aee61b7 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
- "os"
"strconv"
"strings"
"sync"
@@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
- "github.com/ethereum/go-ethereum/swarm/storage"
"golang.org/x/crypto/sha3"
)
@@ -1209,26 +1207,18 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// create a standard sim
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
+
// configure so that sync registrations actually happen
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe, //enable sync registrations
SyncUpdateDelay: syncUpdateDelay,
}, nil)
+
// get the SubscribeMsg code
subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
if !ok {
@@ -1236,13 +1226,11 @@ func TestGetSubscriptionsRPC(t *testing.T) {
}
cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
r.Close()
+ clean()
}
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -1322,9 +1310,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
t.Fatal(err)
}
//length of the subscriptions can not be smaller than number of peers
- log.Debug("node subscriptions:", "node", node.String())
+ log.Debug("node subscriptions", "node", node.String())
for p, ps := range pstreams {
- log.Debug("... with: ", "peer", p)
+ log.Debug("... with", "peer", p)
for _, s := range ps {
log.Debug(".......", "stream", s)
// each node also has subscriptions to RETRIEVE_REQUEST streams,
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 014ec9a98..5656963d9 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -22,7 +22,6 @@ import (
"fmt"
"io/ioutil"
"math"
- "os"
"sync"
"sync/atomic"
"testing"
@@ -38,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
- mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
@@ -73,38 +71,14 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
+ addr := network.NewAddr(ctx.Config.Node())
//hack to put addresses in same space
addr.OAddr[0] = byte(0)
- if *useMockStore {
- store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr)
- } else {
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
- }
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -112,11 +86,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
SkipCheck: skipCheck,
}, nil)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -251,44 +226,26 @@ func TestSameVersionID(t *testing.T) {
v := uint(1)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
+
//assign to each node the same version ID
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -333,46 +290,27 @@ func TestDifferentVersionID(t *testing.T) {
v := uint(0)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
//increase the version ID for each node
v++
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json
index a64f31375..a8b617407 100644
--- a/swarm/network/stream/testing/snapshot_4.json
+++ b/swarm/network/stream/testing/snapshot_4.json
@@ -1 +1 @@
-{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]} \ No newline at end of file
+{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
index 3b7d0d743..3694dd311 100644
--- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
- "os"
"sync"
"testing"
"time"
@@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -169,21 +167,10 @@ func TestSnapshotSyncWithServer(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -199,9 +186,8 @@ func TestSnapshotSyncWithServer(t *testing.T) {
bucket.Store(bucketKeyRegistry, tr)
cleanup = func() {
- netStore.Close()
tr.Close()
- os.RemoveAll(datadir)
+ clean()
}
return tr, cleanup, nil