// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package stream import ( "context" crand "crypto/rand" "errors" "flag" "fmt" "io" "io/ioutil" "math/rand" "os" "strings" "sync/atomic" "testing" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" colorable "github.com/mattn/go-colorable" ) var ( loglevel = flag.Int("loglevel", 2, "verbosity of logs") nodes = flag.Int("nodes", 0, "number of nodes") chunks = flag.Int("chunks", 0, "number of chunks") useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)") longrunning = flag.Bool("longrunning", false, "do run long-running tests") bucketKeyDB = simulation.BucketKey("db") bucketKeyStore = simulation.BucketKey("store") bucketKeyFileStore = simulation.BucketKey("filestore") bucketKeyNetStore = simulation.BucketKey("netstore") bucketKeyDelivery = simulation.BucketKey("delivery") bucketKeyRegistry = simulation.BucketKey("registry") chunkSize = 4096 pof = pot.DefaultPof(256) ) func init() { flag.Parse() rand.Seed(time.Now().UnixNano()) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } func createGlobalStore() (string, *mockdb.GlobalStore, error) { var globalStore *mockdb.GlobalStore globalStoreDir, err := ioutil.TempDir("", "global.store") if err != nil { log.Error("Error initiating global store temp directory!", "err", err) return "", nil, err } globalStore, err = mockdb.NewGlobalStore(globalStoreDir) if err != nil { log.Error("Error initiating global store!", "err", err) return "", nil, err } return globalStoreDir, globalStore, nil } func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) // temp datadir datadir, err := ioutil.TempDir("", "streamer") if err != nil { return nil, nil, nil, func() {}, err } removeDataDir := func() { os.RemoveAll(datadir) } params := storage.NewDefaultLocalStoreParams() params.Init(datadir) params.BaseKey = addr.Over() localStore, err := storage.NewTestLocalStoreForAddr(params) if err != nil { return nil, nil, nil, removeDataDir, err } netStore, err := storage.NewNetStore(localStore, nil) if err != nil { return nil, nil, nil, removeDataDir, err } delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions) teardown := func() { streamer.Close() removeDataDir() } protocolTester := p2ptest.NewProtocolTester(t, addr.ID(), 1, streamer.runProtocol) err = waitForPeers(streamer, 1*time.Second, 1) if err != nil { return nil, nil, nil, nil, errors.New("timeout: peer is not created") } return protocolTester, streamer, localStore, teardown, nil } func waitForPeers(streamer *Registry, timeout time.Duration, expectedPeers int) error { ticker := time.NewTicker(10 * time.Millisecond) timeoutTimer := time.NewTimer(timeout) for { select { case <-ticker.C: if streamer.peersCount() >= expectedPeers { return nil } case <-timeoutTimer.C: return errors.New("timeout") } } } type roundRobinStore struct { index uint32 stores []storage.ChunkStore } func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { return &roundRobinStore{ stores: stores, } } func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) return rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { for _, store := range rrs.stores { store.Close() } } func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { r, _ := fileStore.Retrieve(context.TODO(), hash) buf := make([]byte, 1024) var n int var total int64 var err error for (total == 0 || n > 0) && err == nil { n, err = r.ReadAt(buf, total) total += int64(n) } if err != nil && err != io.EOF { return total, err } return total, nil } func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) { nodes := sim.UpNodeIDs() nodeCnt := len(nodes) log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt)) //array holding generated files rfiles := make([]string, nodeCnt) //array holding the root hashes of the files rootAddrs := make([]storage.Address, nodeCnt) var err error //for every node, generate a file and upload for i, id := range nodes { item, ok := sim.NodeItem(id, bucketKeyFileStore) if !ok { return nil, nil, fmt.Errorf("Error accessing localstore") } fileStore := item.(*storage.FileStore) //generate a file rfiles[i], err = generateRandomFile() if err != nil { return nil, nil, err } //store it (upload it) on the FileStore ctx := context.TODO() rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) log.Debug("Uploaded random string file to node") if err != nil { return nil, nil, err } err = wait(ctx) if err != nil { return nil, nil, err } rootAddrs[i] = rk } return rootAddrs, rfiles, nil } //generate a random file (string) func generateRandomFile() (string, error) { //generate a random file size between minFileSize and maxFileSize fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize)) b := make([]byte, fileSize*1024) _, err := crand.Read(b) if err != nil { log.Error("Error generating random file.", "err", err) return "", err } return string(b), nil } //create a local store for the given node func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.ChunkStore, string, error) { var datadir string var err error datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString())) if err != nil { return nil, "", err } var store storage.ChunkStore params := storage.NewDefaultLocalStoreParams() params.ChunkDbPath = datadir params.BaseKey = addr.Over() store, err = storage.NewTestLocalStoreForAddr(params) if err != nil { os.RemoveAll(datadir) return nil, "", err } return store, datadir, nil }