diff options
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r-- | swarm/network/stream/common_test.go | 364 |
1 files changed, 85 insertions, 279 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 4d55c6ee3..491dc9fd5 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -18,135 +18,70 @@ package stream import ( "context" - "encoding/binary" + crand "crypto/rand" "errors" "flag" "fmt" "io" "io/ioutil" + "math/rand" "os" + "strings" "sync/atomic" "testing" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/mock" - "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" colorable "github.com/mattn/go-colorable" ) var ( - deliveries map[discover.NodeID]*Delivery - stores map[discover.NodeID]storage.ChunkStore - toAddr func(discover.NodeID) *network.BzzAddr - peerCount func(discover.NodeID) int - adapter = flag.String("adapter", "sim", "type of simulation: sim|exec|docker") loglevel = flag.Int("loglevel", 2, "verbosity of logs") nodes = flag.Int("nodes", 0, "number of nodes") chunks = flag.Int("chunks", 0, "number of chunks") useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)") -) + longrunning = flag.Bool("longrunning", false, "do run long-running tests") -var ( - defaultSkipCheck bool - waitPeerErrC chan error - chunkSize = 4096 - registries map[discover.NodeID]*TestRegistry - createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) - getRetrieveFunc = defaultRetrieveFunc - subscriptionCount = 0 - globalStore mock.GlobalStorer - globalStoreDir string -) + bucketKeyDB = simulation.BucketKey("db") + bucketKeyStore = simulation.BucketKey("store") + bucketKeyFileStore = simulation.BucketKey("filestore") + bucketKeyNetStore = simulation.BucketKey("netstore") + bucketKeyDelivery = simulation.BucketKey("delivery") + bucketKeyRegistry = simulation.BucketKey("registry") -var services = adapters.Services{ - "streamer": NewStreamerService, - "intervalsStreamer": newIntervalsStreamerService, -} + chunkSize = 4096 + pof = pot.DefaultPof(256) +) func init() { flag.Parse() - // register the Delivery service which will run as a devp2p - // protocol when using the exec adapter - adapters.RegisterServices(services) + rand.Seed(time.Now().UnixNano()) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } -func createGlobalStore() { - var err error - globalStoreDir, err = ioutil.TempDir("", "global.store") +func createGlobalStore() (string, *mockdb.GlobalStore, error) { + var globalStore *mockdb.GlobalStore + globalStoreDir, err := ioutil.TempDir("", "global.store") if err != nil { log.Error("Error initiating global store temp directory!", "err", err) - return + return "", nil, err } - globalStore, err = db.NewGlobalStore(globalStoreDir) + globalStore, err = mockdb.NewGlobalStore(globalStoreDir) if err != nil { log.Error("Error initiating global store!", "err", err) + return "", nil, err } -} - -// NewStreamerService -func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { - var err error - id := ctx.Config.ID - addr := toAddr(id) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - stores[id], err = createStoreFunc(id, addr) - if err != nil { - return nil, err - } - store := stores[id].(*storage.LocalStore) - db := storage.NewDBAPI(store) - delivery := NewDelivery(kad, db) - deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - DoRetrieve: false, - }) - RegisterSwarmSyncerServer(r, db) - RegisterSwarmSyncerClient(r, db) - go func() { - waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id)) - }() - fileStore := storage.NewFileStore(storage.NewNetStore(store, getRetrieveFunc(id)), storage.NewFileStoreParams()) - testRegistry := &TestRegistry{Registry: r, fileStore: fileStore} - registries[id] = testRegistry - return testRegistry, nil -} - -func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { - return nil -} - -func datadirsCleanup() { - for _, id := range ids { - os.RemoveAll(datadirs[id]) - } - if globalStoreDir != "" { - os.RemoveAll(globalStoreDir) - } -} - -//local stores need to be cleaned up after the sim is done -func localStoreCleanup() { - log.Info("Cleaning up...") - for _, id := range ids { - registries[id].Close() - stores[id].Close() - } - log.Info("Local store cleanup done") + return globalStoreDir, globalStore, nil } func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { @@ -174,9 +109,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora db := storage.NewDBAPI(localStore) delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - }) + streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -233,22 +166,6 @@ func (rrs *roundRobinStore) Close() { } } -type TestRegistry struct { - *Registry - fileStore *storage.FileStore -} - -func (r *TestRegistry) APIs() []rpc.API { - a := r.Registry.APIs() - a = append(a, rpc.API{ - Namespace: "stream", - Version: "3.0", - Service: r, - Public: true, - }) - return a -} - func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { r, _ := fileStore.Retrieve(context.TODO(), hash) buf := make([]byte, 1024) @@ -265,185 +182,74 @@ func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { return total, nil } -func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) { - return readAll(r.fileStore, hash[:]) -} - -func (r *TestRegistry) Start(server *p2p.Server) error { - return r.Registry.Start(server) -} - -func (r *TestRegistry) Stop() error { - return r.Registry.Stop() -} - -type TestExternalRegistry struct { - *Registry -} - -func (r *TestExternalRegistry) APIs() []rpc.API { - a := r.Registry.APIs() - a = append(a, rpc.API{ - Namespace: "stream", - Version: "3.0", - Service: r, - Public: true, - }) - return a -} - -func (r *TestExternalRegistry) GetHashes(ctx context.Context, peerId discover.NodeID, s Stream) (*rpc.Subscription, error) { - peer := r.getPeer(peerId) - - client, err := peer.getClient(ctx, s) - if err != nil { - return nil, err - } - - c := client.Client.(*testExternalClient) +func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) { + nodes := sim.UpNodeIDs() + nodeCnt := len(nodes) + log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt)) + //array holding generated files + rfiles := make([]string, nodeCnt) + //array holding the root hashes of the files + rootAddrs := make([]storage.Address, nodeCnt) - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, fmt.Errorf("Subscribe not supported") - } - - sub := notifier.CreateSubscription() - - go func() { - // if we begin sending event immediately some events - // will probably be dropped since the subscription ID might not be send to - // the client. - // ref: rpc/subscription_test.go#L65 - time.Sleep(1 * time.Second) - for { - select { - case h := <-c.hashes: - <-c.enableNotificationsC // wait for notification subscription to complete - if err := notifier.Notify(sub.ID, h); err != nil { - log.Warn(fmt.Sprintf("rpc sub notifier notify stream %s: %v", s, err)) - } - case err := <-sub.Err(): - if err != nil { - log.Warn(fmt.Sprintf("caught subscription error in stream %s: %v", s, err)) - } - case <-notifier.Closed(): - log.Trace(fmt.Sprintf("rpc sub notifier closed")) - return - } + var err error + //for every node, generate a file and upload + for i, id := range nodes { + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return nil, nil, fmt.Errorf("Error accessing localstore") } - }() - - return sub, nil -} - -func (r *TestExternalRegistry) EnableNotifications(peerId discover.NodeID, s Stream) error { - peer := r.getPeer(peerId) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - client, err := peer.getClient(ctx, s) - if err != nil { - return err - } - - close(client.Client.(*testExternalClient).enableNotificationsC) - - return nil -} - -// TODO: merge functionalities of testExternalClient and testExternalServer -// with testClient and testServer. - -type testExternalClient struct { - hashes chan []byte - db *storage.DBAPI - enableNotificationsC chan struct{} -} - -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { - return &testExternalClient{ - hashes: make(chan []byte), - db: db, - enableNotificationsC: make(chan struct{}), - } -} - -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { - return nil - } - c.hashes <- hash - return func() { - chunk.WaitToStore() + fileStore := item.(*storage.FileStore) + //generate a file + rfiles[i], err = generateRandomFile() + if err != nil { + return nil, nil, err + } + //store it (upload it) on the FileStore + ctx := context.TODO() + rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) + log.Debug("Uploaded random string file to node") + if err != nil { + return nil, nil, err + } + err = wait(ctx) + if err != nil { + return nil, nil, err + } + rootAddrs[i] = rk } + return rootAddrs, rfiles, nil } -func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { - return nil -} - -func (c *testExternalClient) Close() {} - -const testExternalServerBatchSize = 10 - -type testExternalServer struct { - t string - keyFunc func(key []byte, index uint64) - sessionAt uint64 - maxKeys uint64 - streamer *TestExternalRegistry -} - -func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer { - if keyFunc == nil { - keyFunc = binary.BigEndian.PutUint64 - } - return &testExternalServer{ - t: t, - keyFunc: keyFunc, - sessionAt: sessionAt, - maxKeys: maxKeys, +//generate a random file (string) +func generateRandomFile() (string, error) { + //generate a random file size between minFileSize and maxFileSize + fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize + log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize)) + b := make([]byte, fileSize*1024) + _, err := crand.Read(b) + if err != nil { + log.Error("Error generating random file.", "err", err) + return "", err } + return string(b), nil } -func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - if from == 0 && to == 0 { - from = s.sessionAt - to = s.sessionAt + testExternalServerBatchSize - } - if to-from > testExternalServerBatchSize { - to = from + testExternalServerBatchSize - 1 - } - if from >= s.maxKeys && to > s.maxKeys { - return nil, 0, 0, nil, io.EOF - } - if to > s.maxKeys { - to = s.maxKeys +//create a local store for the given node +func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) { + var datadir string + var err error + datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString())) + if err != nil { + return nil, "", err } - b := make([]byte, HashSize*(to-from+1)) - for i := from; i <= to; i++ { - s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) + var store storage.ChunkStore + params := storage.NewDefaultLocalStoreParams() + params.ChunkDbPath = datadir + params.BaseKey = addr.Over() + store, err = storage.NewTestLocalStoreForAddr(params) + if err != nil { + os.RemoveAll(datadir) + return nil, "", err } - return b, from, to, nil, nil -} - -func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { - return make([]byte, 4096), nil -} - -func (s *testExternalServer) Close() {} - -// Sets the global value defaultSkipCheck. -// It should be used in test function defer to reset the global value -// to the original value. -// -// defer setDefaultSkipCheck(defaultSkipCheck) -// defaultSkipCheck = skipCheck -// -// This works as defer function arguments evaluations are evaluated as ususal, -// but only the function body invocation is deferred. -func setDefaultSkipCheck(skipCheck bool) { - defaultSkipCheck = skipCheck + return store, datadir, nil } |