aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/common_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r--swarm/network/stream/common_test.go364
1 files changed, 85 insertions, 279 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 4d55c6ee3..491dc9fd5 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -18,135 +18,70 @@ package stream
import (
"context"
- "encoding/binary"
+ crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
+ "math/rand"
"os"
+ "strings"
"sync/atomic"
"testing"
"time"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
- "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
- "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
+ mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
colorable "github.com/mattn/go-colorable"
)
var (
- deliveries map[discover.NodeID]*Delivery
- stores map[discover.NodeID]storage.ChunkStore
- toAddr func(discover.NodeID) *network.BzzAddr
- peerCount func(discover.NodeID) int
- adapter = flag.String("adapter", "sim", "type of simulation: sim|exec|docker")
loglevel = flag.Int("loglevel", 2, "verbosity of logs")
nodes = flag.Int("nodes", 0, "number of nodes")
chunks = flag.Int("chunks", 0, "number of chunks")
useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
-)
+ longrunning = flag.Bool("longrunning", false, "do run long-running tests")
-var (
- defaultSkipCheck bool
- waitPeerErrC chan error
- chunkSize = 4096
- registries map[discover.NodeID]*TestRegistry
- createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error)
- getRetrieveFunc = defaultRetrieveFunc
- subscriptionCount = 0
- globalStore mock.GlobalStorer
- globalStoreDir string
-)
+ bucketKeyDB = simulation.BucketKey("db")
+ bucketKeyStore = simulation.BucketKey("store")
+ bucketKeyFileStore = simulation.BucketKey("filestore")
+ bucketKeyNetStore = simulation.BucketKey("netstore")
+ bucketKeyDelivery = simulation.BucketKey("delivery")
+ bucketKeyRegistry = simulation.BucketKey("registry")
-var services = adapters.Services{
- "streamer": NewStreamerService,
- "intervalsStreamer": newIntervalsStreamerService,
-}
+ chunkSize = 4096
+ pof = pot.DefaultPof(256)
+)
func init() {
flag.Parse()
- // register the Delivery service which will run as a devp2p
- // protocol when using the exec adapter
- adapters.RegisterServices(services)
+ rand.Seed(time.Now().UnixNano())
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
-func createGlobalStore() {
- var err error
- globalStoreDir, err = ioutil.TempDir("", "global.store")
+func createGlobalStore() (string, *mockdb.GlobalStore, error) {
+ var globalStore *mockdb.GlobalStore
+ globalStoreDir, err := ioutil.TempDir("", "global.store")
if err != nil {
log.Error("Error initiating global store temp directory!", "err", err)
- return
+ return "", nil, err
}
- globalStore, err = db.NewGlobalStore(globalStoreDir)
+ globalStore, err = mockdb.NewGlobalStore(globalStoreDir)
if err != nil {
log.Error("Error initiating global store!", "err", err)
+ return "", nil, err
}
-}
-
-// NewStreamerService
-func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
- var err error
- id := ctx.Config.ID
- addr := toAddr(id)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- stores[id], err = createStoreFunc(id, addr)
- if err != nil {
- return nil, err
- }
- store := stores[id].(*storage.LocalStore)
- db := storage.NewDBAPI(store)
- delivery := NewDelivery(kad, db)
- deliveries[id] = delivery
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: defaultSkipCheck,
- DoRetrieve: false,
- })
- RegisterSwarmSyncerServer(r, db)
- RegisterSwarmSyncerClient(r, db)
- go func() {
- waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
- }()
- fileStore := storage.NewFileStore(storage.NewNetStore(store, getRetrieveFunc(id)), storage.NewFileStoreParams())
- testRegistry := &TestRegistry{Registry: r, fileStore: fileStore}
- registries[id] = testRegistry
- return testRegistry, nil
-}
-
-func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
- return nil
-}
-
-func datadirsCleanup() {
- for _, id := range ids {
- os.RemoveAll(datadirs[id])
- }
- if globalStoreDir != "" {
- os.RemoveAll(globalStoreDir)
- }
-}
-
-//local stores need to be cleaned up after the sim is done
-func localStoreCleanup() {
- log.Info("Cleaning up...")
- for _, id := range ids {
- registries[id].Close()
- stores[id].Close()
- }
- log.Info("Local store cleanup done")
+ return globalStoreDir, globalStore, nil
}
func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
@@ -174,9 +109,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
db := storage.NewDBAPI(localStore)
delivery := NewDelivery(to, db)
- streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: defaultSkipCheck,
- })
+ streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@@ -233,22 +166,6 @@ func (rrs *roundRobinStore) Close() {
}
}
-type TestRegistry struct {
- *Registry
- fileStore *storage.FileStore
-}
-
-func (r *TestRegistry) APIs() []rpc.API {
- a := r.Registry.APIs()
- a = append(a, rpc.API{
- Namespace: "stream",
- Version: "3.0",
- Service: r,
- Public: true,
- })
- return a
-}
-
func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
r, _ := fileStore.Retrieve(context.TODO(), hash)
buf := make([]byte, 1024)
@@ -265,185 +182,74 @@ func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
return total, nil
}
-func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) {
- return readAll(r.fileStore, hash[:])
-}
-
-func (r *TestRegistry) Start(server *p2p.Server) error {
- return r.Registry.Start(server)
-}
-
-func (r *TestRegistry) Stop() error {
- return r.Registry.Stop()
-}
-
-type TestExternalRegistry struct {
- *Registry
-}
-
-func (r *TestExternalRegistry) APIs() []rpc.API {
- a := r.Registry.APIs()
- a = append(a, rpc.API{
- Namespace: "stream",
- Version: "3.0",
- Service: r,
- Public: true,
- })
- return a
-}
-
-func (r *TestExternalRegistry) GetHashes(ctx context.Context, peerId discover.NodeID, s Stream) (*rpc.Subscription, error) {
- peer := r.getPeer(peerId)
-
- client, err := peer.getClient(ctx, s)
- if err != nil {
- return nil, err
- }
-
- c := client.Client.(*testExternalClient)
+func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) {
+ nodes := sim.UpNodeIDs()
+ nodeCnt := len(nodes)
+ log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
+ //array holding generated files
+ rfiles := make([]string, nodeCnt)
+ //array holding the root hashes of the files
+ rootAddrs := make([]storage.Address, nodeCnt)
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, fmt.Errorf("Subscribe not supported")
- }
-
- sub := notifier.CreateSubscription()
-
- go func() {
- // if we begin sending event immediately some events
- // will probably be dropped since the subscription ID might not be send to
- // the client.
- // ref: rpc/subscription_test.go#L65
- time.Sleep(1 * time.Second)
- for {
- select {
- case h := <-c.hashes:
- <-c.enableNotificationsC // wait for notification subscription to complete
- if err := notifier.Notify(sub.ID, h); err != nil {
- log.Warn(fmt.Sprintf("rpc sub notifier notify stream %s: %v", s, err))
- }
- case err := <-sub.Err():
- if err != nil {
- log.Warn(fmt.Sprintf("caught subscription error in stream %s: %v", s, err))
- }
- case <-notifier.Closed():
- log.Trace(fmt.Sprintf("rpc sub notifier closed"))
- return
- }
+ var err error
+ //for every node, generate a file and upload
+ for i, id := range nodes {
+ item, ok := sim.NodeItem(id, bucketKeyFileStore)
+ if !ok {
+ return nil, nil, fmt.Errorf("Error accessing localstore")
}
- }()
-
- return sub, nil
-}
-
-func (r *TestExternalRegistry) EnableNotifications(peerId discover.NodeID, s Stream) error {
- peer := r.getPeer(peerId)
-
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- client, err := peer.getClient(ctx, s)
- if err != nil {
- return err
- }
-
- close(client.Client.(*testExternalClient).enableNotificationsC)
-
- return nil
-}
-
-// TODO: merge functionalities of testExternalClient and testExternalServer
-// with testClient and testServer.
-
-type testExternalClient struct {
- hashes chan []byte
- db *storage.DBAPI
- enableNotificationsC chan struct{}
-}
-
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
- return &testExternalClient{
- hashes: make(chan []byte),
- db: db,
- enableNotificationsC: make(chan struct{}),
- }
-}
-
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
- return nil
- }
- c.hashes <- hash
- return func() {
- chunk.WaitToStore()
+ fileStore := item.(*storage.FileStore)
+ //generate a file
+ rfiles[i], err = generateRandomFile()
+ if err != nil {
+ return nil, nil, err
+ }
+ //store it (upload it) on the FileStore
+ ctx := context.TODO()
+ rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
+ log.Debug("Uploaded random string file to node")
+ if err != nil {
+ return nil, nil, err
+ }
+ err = wait(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+ rootAddrs[i] = rk
}
+ return rootAddrs, rfiles, nil
}
-func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
- return nil
-}
-
-func (c *testExternalClient) Close() {}
-
-const testExternalServerBatchSize = 10
-
-type testExternalServer struct {
- t string
- keyFunc func(key []byte, index uint64)
- sessionAt uint64
- maxKeys uint64
- streamer *TestExternalRegistry
-}
-
-func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
- if keyFunc == nil {
- keyFunc = binary.BigEndian.PutUint64
- }
- return &testExternalServer{
- t: t,
- keyFunc: keyFunc,
- sessionAt: sessionAt,
- maxKeys: maxKeys,
+//generate a random file (string)
+func generateRandomFile() (string, error) {
+ //generate a random file size between minFileSize and maxFileSize
+ fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
+ log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
+ b := make([]byte, fileSize*1024)
+ _, err := crand.Read(b)
+ if err != nil {
+ log.Error("Error generating random file.", "err", err)
+ return "", err
}
+ return string(b), nil
}
-func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- if from == 0 && to == 0 {
- from = s.sessionAt
- to = s.sessionAt + testExternalServerBatchSize
- }
- if to-from > testExternalServerBatchSize {
- to = from + testExternalServerBatchSize - 1
- }
- if from >= s.maxKeys && to > s.maxKeys {
- return nil, 0, 0, nil, io.EOF
- }
- if to > s.maxKeys {
- to = s.maxKeys
+//create a local store for the given node
+func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) {
+ var datadir string
+ var err error
+ datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
+ if err != nil {
+ return nil, "", err
}
- b := make([]byte, HashSize*(to-from+1))
- for i := from; i <= to; i++ {
- s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
+ var store storage.ChunkStore
+ params := storage.NewDefaultLocalStoreParams()
+ params.ChunkDbPath = datadir
+ params.BaseKey = addr.Over()
+ store, err = storage.NewTestLocalStoreForAddr(params)
+ if err != nil {
+ os.RemoveAll(datadir)
+ return nil, "", err
}
- return b, from, to, nil, nil
-}
-
-func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
- return make([]byte, 4096), nil
-}
-
-func (s *testExternalServer) Close() {}
-
-// Sets the global value defaultSkipCheck.
-// It should be used in test function defer to reset the global value
-// to the original value.
-//
-// defer setDefaultSkipCheck(defaultSkipCheck)
-// defaultSkipCheck = skipCheck
-//
-// This works as defer function arguments evaluations are evaluated as ususal,
-// but only the function body invocation is deferred.
-func setDefaultSkipCheck(skipCheck bool) {
- defaultSkipCheck = skipCheck
+ return store, datadir, nil
}