diff options
author | holisticode <holistic.computing@gmail.com> | 2018-07-31 04:55:25 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-31 04:55:25 +0800 |
commit | d6efa691872efb723ea3177a92da9e9b31c34eba (patch) | |
tree | 9c7e85c9cab9a2cf1240db47a8de44162f69353e /swarm/network/stream/testing | |
parent | 3ea8ac6a9ab9e56164707119e9142f06fae4c316 (diff) | |
download | dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.gz dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.bz2 dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.lz dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.xz dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.zst dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.zip |
Merge netsim mig to master (#17241)
* swarm: merged stream-tests migration to develop
* swarm/network: expose simulation RandomUpNode to use in stream tests
* swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest
* swarm: enforce waitkademlia for snapshot tests
* swarm: fixed syncer tests and snapshot_sync_test
* swarm: linting of simulation package
* swarm: address review comments
* swarm/network/stream: fix delivery_test bugs and refactor
* swarm/network/stream: addressed PR comments @janos
* swarm/network/stream: enforce waitKademlia, improve TestIntervals
* swarm/network/stream: TestIntervals not waiting for chunk to be stored
Diffstat (limited to 'swarm/network/stream/testing')
-rw-r--r-- | swarm/network/stream/testing/testing.go | 293 |
1 files changed, 0 insertions, 293 deletions
diff --git a/swarm/network/stream/testing/testing.go b/swarm/network/stream/testing/testing.go deleted file mode 100644 index d584ec397..000000000 --- a/swarm/network/stream/testing/testing.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package testing - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "math/rand" - "os" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Simulation struct { - Net *simulations.Network - Stores []storage.ChunkStore - Addrs []network.Addr - IDs []discover.NodeID -} - -func SetStores(addrs ...network.Addr) ([]storage.ChunkStore, func(), error) { - var datadirs []string - stores := make([]storage.ChunkStore, len(addrs)) - var err error - for i, addr := range addrs { - var datadir string - datadir, err = ioutil.TempDir("", "streamer") - if err != nil { - break - } - var store storage.ChunkStore - params := storage.NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - store, err = storage.NewTestLocalStoreForAddr(params) - if err != nil { - break - } - datadirs = append(datadirs, datadir) - stores[i] = store - } - teardown := func() { - for i, datadir := range datadirs { - stores[i].Close() - os.RemoveAll(datadir) - } - } - return stores, teardown, err -} - -func NewAdapter(adapterType string, services adapters.Services) (adapter adapters.NodeAdapter, teardown func(), err error) { - teardown = func() {} - switch adapterType { - case "sim": - adapter = adapters.NewSimAdapter(services) - case "exec": - baseDir, err0 := ioutil.TempDir("", "swarm-test") - if err0 != nil { - return nil, teardown, err0 - } - teardown = func() { os.RemoveAll(baseDir) } - adapter = adapters.NewExecAdapter(baseDir) - case "docker": - adapter, err = adapters.NewDockerAdapter() - if err != nil { - return nil, teardown, err - } - default: - return nil, teardown, errors.New("adapter needs to be one of sim, exec, docker") - } - return adapter, teardown, nil -} - -func CheckResult(t *testing.T, result *simulations.StepResult, startedAt, finishedAt time.Time) { - t.Logf("Simulation passed in %s", result.FinishedAt.Sub(result.StartedAt)) - if len(result.Passes) > 1 { - var min, max time.Duration - var sum int - for _, pass := range result.Passes { - duration := pass.Sub(result.StartedAt) - if sum == 0 || duration < min { - min = duration - } - if duration > max { - max = duration - } - sum += int(duration.Nanoseconds()) - } - t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond) - } - t.Logf("Setup: %s, Shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt)) -} - -type RunConfig struct { - Adapter string - Step *simulations.Step - NodeCount int - ConnLevel int - ToAddr func(discover.NodeID) *network.BzzAddr - Services adapters.Services - DefaultService string - EnableMsgEvents bool -} - -func NewSimulation(conf *RunConfig) (*Simulation, func(), error) { - // create network - nodes := conf.NodeCount - adapter, adapterTeardown, err := NewAdapter(conf.Adapter, conf.Services) - if err != nil { - return nil, adapterTeardown, err - } - defaultService := "streamer" - if conf.DefaultService != "" { - defaultService = conf.DefaultService - } - net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ - ID: "0", - DefaultService: defaultService, - }) - teardown := func() { - adapterTeardown() - net.Shutdown() - } - ids := make([]discover.NodeID, nodes) - addrs := make([]network.Addr, nodes) - // start nodes - for i := 0; i < nodes; i++ { - nodeconf := adapters.RandomNodeConfig() - nodeconf.EnableMsgEvents = conf.EnableMsgEvents - node, err := net.NewNodeWithConfig(nodeconf) - if err != nil { - return nil, teardown, fmt.Errorf("error creating node: %s", err) - } - ids[i] = node.ID() - addrs[i] = conf.ToAddr(ids[i]) - } - // set nodes number of Stores available - stores, storeTeardown, err := SetStores(addrs...) - teardown = func() { - net.Shutdown() - adapterTeardown() - storeTeardown() - } - if err != nil { - return nil, teardown, err - } - s := &Simulation{ - Net: net, - Stores: stores, - IDs: ids, - Addrs: addrs, - } - return s, teardown, nil -} - -func (s *Simulation) Run(ctx context.Context, conf *RunConfig) (*simulations.StepResult, error) { - // bring up nodes, launch the servive - nodes := conf.NodeCount - conns := conf.ConnLevel - for i := 0; i < nodes; i++ { - if err := s.Net.Start(s.IDs[i]); err != nil { - return nil, fmt.Errorf("error starting node %s: %s", s.IDs[i].TerminalString(), err) - } - } - // run a simulation which connects the 10 nodes in a chain - wg := sync.WaitGroup{} - for i := range s.IDs { - // collect the overlay addresses, to - for j := 0; j < conns; j++ { - var k int - if j == 0 { - k = i - 1 - } else { - k = rand.Intn(len(s.IDs)) - } - if i > 0 { - wg.Add(1) - go func(i, k int) { - defer wg.Done() - s.Net.Connect(s.IDs[i], s.IDs[k]) - }(i, k) - } - } - } - wg.Wait() - log.Info(fmt.Sprintf("simulation with %v nodes", len(s.Addrs))) - - // create an only locally retrieving FileStore for the pivot node to test - // if retriee requests have arrived - result := simulations.NewSimulation(s.Net).Run(ctx, conf.Step) - return result, nil -} - -// WatchDisconnections subscribes to admin peerEvents and sends peer event drop -// errors to the errc channel. Channel quitC signals the termination of the event loop. -// Returned doneC will be closed after the rpc subscription is unsubscribed, -// signaling that simulations network is safe to shutdown. -func WatchDisconnections(id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}, err error) { - events := make(chan *p2p.PeerEvent) - sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") - if err != nil { - return nil, fmt.Errorf("error getting peer events for node %v: %s", id, err) - } - c := make(chan struct{}) - go func() { - defer func() { - log.Trace("watch disconnections: unsubscribe", "id", id) - sub.Unsubscribe() - close(c) - }() - for { - select { - case <-quitC: - return - case e := <-events: - if e.Type == p2p.PeerEventTypeDrop { - select { - case errc <- fmt.Errorf("peerEvent for node %v: %v", id, e): - case <-quitC: - return - } - } - case err := <-sub.Err(): - if err != nil { - select { - case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err): - case <-quitC: - return - } - } - } - } - }() - return c, nil -} - -func Trigger(d time.Duration, quitC chan struct{}, ids ...discover.NodeID) chan discover.NodeID { - trigger := make(chan discover.NodeID) - go func() { - defer close(trigger) - ticker := time.NewTicker(d) - defer ticker.Stop() - // we are only testing the pivot node (net.Nodes[0]) - for range ticker.C { - for _, id := range ids { - select { - case trigger <- id: - case <-quitC: - return - } - } - } - }() - return trigger -} - -func (sim *Simulation) CallClient(id discover.NodeID, f func(*rpc.Client) error) error { - node := sim.Net.GetNode(id) - if node == nil { - return fmt.Errorf("unknown node: %s", id) - } - client, err := node.Client() - if err != nil { - return fmt.Errorf("error getting node client: %s", err) - } - return f(client) -} |