diff options
Diffstat (limited to 'swarm/network_test.go')
-rw-r--r-- | swarm/network_test.go | 656 |
1 files changed, 656 insertions, 0 deletions
diff --git a/swarm/network_test.go b/swarm/network_test.go new file mode 100644 index 000000000..c291fce3b --- /dev/null +++ b/swarm/network_test.go @@ -0,0 +1,656 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package swarm + +import ( + "context" + "flag" + "fmt" + "io/ioutil" + "math/rand" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/storage" + colorable "github.com/mattn/go-colorable" +) + +var ( + loglevel = flag.Int("loglevel", 2, "verbosity of logs") + longrunning = flag.Bool("longrunning", false, "do run long-running tests") + waitKademlia = flag.Bool("waitkademlia", false, "wait for healthy kademlia before checking files availability") +) + +func init() { + rand.Seed(time.Now().UnixNano()) + + flag.Parse() + + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) +} + +// TestSwarmNetwork runs a series of test simulations with +// static and dynamic Swarm nodes in network simulation, by +// uploading files to every node and retrieving them. +func TestSwarmNetwork(t *testing.T) { + for _, tc := range []struct { + name string + steps []testSwarmNetworkStep + options *testSwarmNetworkOptions + disabled bool + }{ + { + name: "10_nodes", + steps: []testSwarmNetworkStep{ + { + nodeCount: 10, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 45 * time.Second, + }, + }, + { + name: "10_nodes_skip_check", + steps: []testSwarmNetworkStep{ + { + nodeCount: 10, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 45 * time.Second, + SkipCheck: true, + }, + }, + { + name: "100_nodes", + steps: []testSwarmNetworkStep{ + { + nodeCount: 100, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 3 * time.Minute, + }, + disabled: !*longrunning, + }, + { + name: "100_nodes_skip_check", + steps: []testSwarmNetworkStep{ + { + nodeCount: 100, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 3 * time.Minute, + SkipCheck: true, + }, + disabled: !*longrunning, + }, + { + name: "inc_node_count", + steps: []testSwarmNetworkStep{ + { + nodeCount: 2, + }, + { + nodeCount: 5, + }, + { + nodeCount: 10, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 90 * time.Second, + }, + disabled: !*longrunning, + }, + { + name: "dec_node_count", + steps: []testSwarmNetworkStep{ + { + nodeCount: 10, + }, + { + nodeCount: 6, + }, + { + nodeCount: 3, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 90 * time.Second, + }, + disabled: !*longrunning, + }, + { + name: "dec_inc_node_count", + steps: []testSwarmNetworkStep{ + { + nodeCount: 5, + }, + { + nodeCount: 3, + }, + { + nodeCount: 10, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 90 * time.Second, + }, + }, + { + name: "inc_dec_node_count", + steps: []testSwarmNetworkStep{ + { + nodeCount: 3, + }, + { + nodeCount: 5, + }, + { + nodeCount: 25, + }, + { + nodeCount: 10, + }, + { + nodeCount: 4, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 5 * time.Minute, + }, + disabled: !*longrunning, + }, + { + name: "inc_dec_node_count_skip_check", + steps: []testSwarmNetworkStep{ + { + nodeCount: 3, + }, + { + nodeCount: 5, + }, + { + nodeCount: 25, + }, + { + nodeCount: 10, + }, + { + nodeCount: 4, + }, + }, + options: &testSwarmNetworkOptions{ + Timeout: 5 * time.Minute, + SkipCheck: true, + }, + disabled: !*longrunning, + }, + } { + if tc.disabled { + continue + } + t.Run(tc.name, func(t *testing.T) { + testSwarmNetwork(t, tc.options, tc.steps...) + }) + } +} + +// testSwarmNetworkStep is the configuration +// for the state of the simulation network. +type testSwarmNetworkStep struct { + // number of swarm nodes that must be in the Up state + nodeCount int +} + +// file represents the file uploaded on a particular node. +type file struct { + addr storage.Address + data string + nodeID discover.NodeID +} + +// check represents a reference to a file that is retrieved +// from a particular node. +type check struct { + key string + nodeID discover.NodeID +} + +// testSwarmNetworkOptions contains optional parameters for running +// testSwarmNetwork. +type testSwarmNetworkOptions struct { + Timeout time.Duration + SkipCheck bool +} + +// testSwarmNetwork is a helper function used for testing different +// static and dynamic Swarm network simulations. +// It is responsible for: +// - Setting up a Swarm network simulation, and updates the number of nodes within the network on every step according to steps. +// - Uploading a unique file to every node on every step. +// - May wait for Kademlia on every node to be healthy. +// - Checking if a file is retrievable from all nodes. +func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwarmNetworkStep) { + dir, err := ioutil.TempDir("", "swarm-network-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + if o == nil { + o = new(testSwarmNetworkOptions) + } + + ctx := context.Background() + if o.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, o.Timeout) + defer cancel() + } + + swarms := make(map[discover.NodeID]*Swarm) + files := make([]file, 0) + + services := map[string]adapters.ServiceFunc{ + "swarm": func(ctx *adapters.ServiceContext) (node.Service, error) { + config := api.NewConfig() + + dir, err := ioutil.TempDir(dir, "node") + if err != nil { + return nil, err + } + + config.Path = dir + + privkey, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + + config.Init(privkey) + config.DeliverySkipCheck = o.SkipCheck + + s, err := NewSwarm(config, nil) + if err != nil { + return nil, err + } + log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", s.bzz.BaseAddr())) + swarms[ctx.Config.ID] = s + return s, nil + }, + } + + a := adapters.NewSimAdapter(services) + net := simulations.NewNetwork(a, &simulations.NetworkConfig{ + ID: "0", + DefaultService: "swarm", + }) + defer net.Shutdown() + + trigger := make(chan discover.NodeID) + + sim := simulations.NewSimulation(net) + + for i, step := range steps { + log.Debug("test sync step", "n", i+1, "nodes", step.nodeCount) + + change := step.nodeCount - len(allNodeIDs(net)) + + if change > 0 { + _, err := addNodes(change, net) + if err != nil { + t.Fatal(err) + } + } else if change < 0 { + err := removeNodes(-change, net) + if err != nil { + t.Fatal(err) + } + } else { + t.Logf("step %v: no change in nodes", i) + continue + } + + nodeIDs := allNodeIDs(net) + shuffle(len(nodeIDs), func(i, j int) { + nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] + }) + for _, id := range nodeIDs { + key, data, err := uploadFile(swarms[id]) + if err != nil { + t.Fatal(err) + } + log.Trace("file uploaded", "node", id, "key", key.String()) + files = append(files, file{ + addr: key, + data: data, + nodeID: id, + }) + } + + // Prepare PeerPot map for checking Kademlia health + var ppmap map[string]*network.PeerPot + nIDs := allNodeIDs(net) + addrs := make([][]byte, len(nIDs)) + if *waitKademlia { + for i, id := range nIDs { + addrs[i] = swarms[id].bzz.BaseAddr() + } + ppmap = network.NewPeerPotMap(2, addrs) + } + + var checkStatusM sync.Map + var nodeStatusM sync.Map + var totalFoundCount uint64 + + result := sim.Run(ctx, &simulations.Step{ + Action: func(ctx context.Context) error { + if *waitKademlia { + // Wait for healthy Kademlia on every node before checking files + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + healthy := true + log.Debug("kademlia health check", "node count", len(nIDs), "addr count", len(addrs)) + for i, id := range nIDs { + swarm := swarms[id] + //PeerPot for this node + addr := common.Bytes2Hex(swarm.bzz.BaseAddr()) + pp := ppmap[addr] + //call Healthy RPC + h := swarm.bzz.Healthy(pp) + //print info + log.Debug(swarm.bzz.String()) + log.Debug("kademlia", "empty bins", pp.EmptyBins, "gotNN", h.GotNN, "knowNN", h.KnowNN, "full", h.Full) + log.Debug("kademlia", "health", h.GotNN && h.KnowNN && h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i) + log.Debug("kademlia", "ill condition", !h.GotNN || !h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i) + if !h.GotNN || !h.Full { + healthy = false + break + } + } + if healthy { + break + } + } + } + + go func() { + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + for { + if retrieve(net, files, swarms, trigger, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 { + return + } + } + }() + return nil + }, + Trigger: trigger, + Expect: &simulations.Expectation{ + Nodes: allNodeIDs(net), + Check: func(ctx context.Context, id discover.NodeID) (bool, error) { + // The check is done by a goroutine in the action function. + return true, nil + }, + }, + }) + if result.Error != nil { + t.Fatal(result.Error) + } + log.Debug("done: test sync step", "n", i+1, "nodes", step.nodeCount) + } +} + +// allNodeIDs is returning NodeID for every node that is Up. +func allNodeIDs(net *simulations.Network) (nodes []discover.NodeID) { + for _, n := range net.GetNodes() { + if n.Up { + nodes = append(nodes, n.ID()) + } + } + return +} + +// addNodes adds a number of nodes to the network. +func addNodes(count int, net *simulations.Network) (ids []discover.NodeID, err error) { + for i := 0; i < count; i++ { + nodeIDs := allNodeIDs(net) + l := len(nodeIDs) + nodeconf := adapters.RandomNodeConfig() + node, err := net.NewNodeWithConfig(nodeconf) + if err != nil { + return nil, fmt.Errorf("create node: %v", err) + } + err = net.Start(node.ID()) + if err != nil { + return nil, fmt.Errorf("start node: %v", err) + } + + log.Debug("created node", "id", node.ID()) + + // connect nodes in a chain + if l > 0 { + var otherNodeID discover.NodeID + for i := l - 1; i >= 0; i-- { + n := net.GetNode(nodeIDs[i]) + if n.Up { + otherNodeID = n.ID() + break + } + } + log.Debug("connect nodes", "one", node.ID(), "other", otherNodeID) + if err := net.Connect(node.ID(), otherNodeID); err != nil { + return nil, err + } + } + ids = append(ids, node.ID()) + } + return ids, nil +} + +// removeNodes stops a random nodes in the network. +func removeNodes(count int, net *simulations.Network) error { + for i := 0; i < count; i++ { + // allNodeIDs are returning only the Up nodes. + nodeIDs := allNodeIDs(net) + if len(nodeIDs) == 0 { + break + } + node := net.GetNode(nodeIDs[rand.Intn(len(nodeIDs))]) + if err := node.Stop(); err != nil { + return err + } + log.Debug("removed node", "id", node.ID()) + } + return nil +} + +// uploadFile, uploads a short file to the swarm instance +// using the api.Put method. +func uploadFile(swarm *Swarm) (storage.Address, string, error) { + b := make([]byte, 8) + _, err := rand.Read(b) + if err != nil { + return nil, "", err + } + // File data is very short, but it is ensured that its + // uniqueness is very certain. + data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b) + k, wait, err := swarm.api.Put(data, "text/plain", false) + if err != nil { + return nil, "", err + } + if wait != nil { + wait() + } + return k, data, nil +} + +// retrieve is the function that is used for checking the availability of +// uploaded files in testSwarmNetwork test helper function. +func retrieve( + net *simulations.Network, + files []file, + swarms map[discover.NodeID]*Swarm, + trigger chan discover.NodeID, + checkStatusM *sync.Map, + nodeStatusM *sync.Map, + totalFoundCount *uint64, +) (missing uint64) { + shuffle(len(files), func(i, j int) { + files[i], files[j] = files[j], files[i] + }) + + var totalWg sync.WaitGroup + errc := make(chan error) + + nodeIDs := allNodeIDs(net) + + totalCheckCount := len(nodeIDs) * len(files) + + for _, id := range nodeIDs { + if _, ok := nodeStatusM.Load(id); ok { + continue + } + start := time.Now() + var checkCount uint64 + var foundCount uint64 + + totalWg.Add(1) + + var wg sync.WaitGroup + + for _, f := range files { + swarm := swarms[id] + + checkKey := check{ + key: f.addr.String(), + nodeID: id, + } + if n, ok := checkStatusM.Load(checkKey); ok && n.(int) == 0 { + continue + } + + checkCount++ + wg.Add(1) + go func(f file, id discover.NodeID) { + defer wg.Done() + + log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", atomic.LoadUint64(totalFoundCount)) + + r, _, _, _, err := swarm.api.Get(f.addr, "/") + if err != nil { + errc <- fmt.Errorf("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + d, err := ioutil.ReadAll(r) + if err != nil { + errc <- fmt.Errorf("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + data := string(d) + if data != f.data { + errc <- fmt.Errorf("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data) + return + } + checkStatusM.Store(checkKey, 0) + atomic.AddUint64(&foundCount, 1) + log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", atomic.LoadUint64(&foundCount)) + }(f, id) + } + + go func(id discover.NodeID) { + defer totalWg.Done() + wg.Wait() + + atomic.AddUint64(totalFoundCount, foundCount) + + if foundCount == checkCount { + log.Info("all files are found for node", "id", id.String(), "duration", time.Since(start)) + nodeStatusM.Store(id, 0) + trigger <- id + return + } + log.Debug("files missing for node", "id", id.String(), "check", checkCount, "found", foundCount) + }(id) + + } + + go func() { + totalWg.Wait() + close(errc) + }() + + var errCount int + for err := range errc { + if err != nil { + errCount++ + } + log.Warn(err.Error()) + } + + log.Info("check stats", "total check count", totalCheckCount, "total files found", atomic.LoadUint64(totalFoundCount), "total errors", errCount) + + return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount) +} + +// Backported from stdlib https://golang.org/src/math/rand/rand.go?s=11175:11215#L333 +// +// Replace with rand.Shuffle from go 1.10 when go 1.9 support is dropped. +// +// shuffle pseudo-randomizes the order of elements. +// n is the number of elements. Shuffle panics if n < 0. +// swap swaps the elements with indexes i and j. +func shuffle(n int, swap func(i, j int)) { + if n < 0 { + panic("invalid argument to Shuffle") + } + + // Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle + // Shuffle really ought not be called with n that doesn't fit in 32 bits. + // Not only will it take a very long time, but with 2³¹! possible permutations, + // there's no way that any PRNG can have a big enough internal state to + // generate even a minuscule percentage of the possible permutations. + // Nevertheless, the right API signature accepts an int n, so handle it as best we can. + i := n - 1 + for ; i > 1<<31-1-1; i-- { + j := int(rand.Int63n(int64(i + 1))) + swap(i, j) + } + for ; i > 0; i-- { + j := int(rand.Int31n(int32(i + 1))) + swap(i, j) + } +} |