diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/simulations | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/simulations')
-rw-r--r-- | swarm/network/simulations/discovery/discovery.go | 17 | ||||
-rw-r--r-- | swarm/network/simulations/discovery/discovery_test.go | 586 | ||||
-rwxr-xr-x | swarm/network/simulations/discovery/jsonsnapshot.txt | 1 | ||||
-rw-r--r-- | swarm/network/simulations/overlay.go | 144 | ||||
-rw-r--r-- | swarm/network/simulations/overlay_test.go | 195 |
5 files changed, 943 insertions, 0 deletions
diff --git a/swarm/network/simulations/discovery/discovery.go b/swarm/network/simulations/discovery/discovery.go new file mode 100644 index 000000000..a6ff5fd45 --- /dev/null +++ b/swarm/network/simulations/discovery/discovery.go @@ -0,0 +1,17 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package discovery diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go new file mode 100644 index 000000000..acf3479e5 --- /dev/null +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -0,0 +1,586 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package discovery + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" + "strings" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/state" + colorable "github.com/mattn/go-colorable" +) + +// serviceName is used with the exec adapter so the exec'd binary knows which +// service to execute +const serviceName = "discovery" +const testMinProxBinSize = 2 +const discoveryPersistenceDatadir = "discovery_persistence_test_store" + +var discoveryPersistencePath = path.Join(os.TempDir(), discoveryPersistenceDatadir) +var discoveryEnabled = true +var persistenceEnabled = false + +var services = adapters.Services{ + serviceName: newService, +} + +func cleanDbStores() error { + entries, err := ioutil.ReadDir(os.TempDir()) + if err != nil { + return err + } + + for _, f := range entries { + if strings.HasPrefix(f.Name(), discoveryPersistenceDatadir) { + os.RemoveAll(path.Join(os.TempDir(), f.Name())) + } + } + return nil + +} + +func getDbStore(nodeID string) (*state.DBStore, error) { + if _, err := os.Stat(discoveryPersistencePath + "_" + nodeID); os.IsNotExist(err) { + log.Info(fmt.Sprintf("directory for nodeID %s does not exist. creating...", nodeID)) + ioutil.TempDir("", discoveryPersistencePath+"_"+nodeID) + } + log.Info(fmt.Sprintf("opening storage directory for nodeID %s", nodeID)) + store, err := state.NewDBStore(discoveryPersistencePath + "_" + nodeID) + if err != nil { + return nil, err + } + return store, nil +} + +var ( + nodeCount = flag.Int("nodes", 10, "number of nodes to create (default 10)") + initCount = flag.Int("conns", 1, "number of originally connected peers (default 1)") + snapshotFile = flag.String("snapshot", "", "create snapshot") + loglevel = flag.Int("loglevel", 3, "verbosity of logs") + rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs") +) + +func init() { + flag.Parse() + // register the discovery service which will run as a devp2p + // protocol when using the exec adapter + adapters.RegisterServices(services) + + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog)))) +} + +// Benchmarks to test the average time it takes for an N-node ring +// to full a healthy kademlia topology +func BenchmarkDiscovery_8_1(b *testing.B) { benchmarkDiscovery(b, 8, 1) } +func BenchmarkDiscovery_16_1(b *testing.B) { benchmarkDiscovery(b, 16, 1) } +func BenchmarkDiscovery_32_1(b *testing.B) { benchmarkDiscovery(b, 32, 1) } +func BenchmarkDiscovery_64_1(b *testing.B) { benchmarkDiscovery(b, 64, 1) } +func BenchmarkDiscovery_128_1(b *testing.B) { benchmarkDiscovery(b, 128, 1) } +func BenchmarkDiscovery_256_1(b *testing.B) { benchmarkDiscovery(b, 256, 1) } + +func BenchmarkDiscovery_8_2(b *testing.B) { benchmarkDiscovery(b, 8, 2) } +func BenchmarkDiscovery_16_2(b *testing.B) { benchmarkDiscovery(b, 16, 2) } +func BenchmarkDiscovery_32_2(b *testing.B) { benchmarkDiscovery(b, 32, 2) } +func BenchmarkDiscovery_64_2(b *testing.B) { benchmarkDiscovery(b, 64, 2) } +func BenchmarkDiscovery_128_2(b *testing.B) { benchmarkDiscovery(b, 128, 2) } +func BenchmarkDiscovery_256_2(b *testing.B) { benchmarkDiscovery(b, 256, 2) } + +func BenchmarkDiscovery_8_4(b *testing.B) { benchmarkDiscovery(b, 8, 4) } +func BenchmarkDiscovery_16_4(b *testing.B) { benchmarkDiscovery(b, 16, 4) } +func BenchmarkDiscovery_32_4(b *testing.B) { benchmarkDiscovery(b, 32, 4) } +func BenchmarkDiscovery_64_4(b *testing.B) { benchmarkDiscovery(b, 64, 4) } +func BenchmarkDiscovery_128_4(b *testing.B) { benchmarkDiscovery(b, 128, 4) } +func BenchmarkDiscovery_256_4(b *testing.B) { benchmarkDiscovery(b, 256, 4) } + +func TestDiscoverySimulationDockerAdapter(t *testing.T) { + testDiscoverySimulationDockerAdapter(t, *nodeCount, *initCount) +} + +func testDiscoverySimulationDockerAdapter(t *testing.T, nodes, conns int) { + adapter, err := adapters.NewDockerAdapter() + if err != nil { + if err == adapters.ErrLinuxOnly { + t.Skip(err) + } else { + t.Fatal(err) + } + } + testDiscoverySimulation(t, nodes, conns, adapter) +} + +func TestDiscoverySimulationExecAdapter(t *testing.T) { + testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount) +} + +func testDiscoverySimulationExecAdapter(t *testing.T, nodes, conns int) { + baseDir, err := ioutil.TempDir("", "swarm-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(baseDir) + testDiscoverySimulation(t, nodes, conns, adapters.NewExecAdapter(baseDir)) +} + +func TestDiscoverySimulationSimAdapter(t *testing.T) { + testDiscoverySimulationSimAdapter(t, *nodeCount, *initCount) +} + +func TestDiscoveryPersistenceSimulationSimAdapter(t *testing.T) { + testDiscoveryPersistenceSimulationSimAdapter(t, *nodeCount, *initCount) +} + +func testDiscoveryPersistenceSimulationSimAdapter(t *testing.T, nodes, conns int) { + testDiscoveryPersistenceSimulation(t, nodes, conns, adapters.NewSimAdapter(services)) +} + +func testDiscoverySimulationSimAdapter(t *testing.T, nodes, conns int) { + testDiscoverySimulation(t, nodes, conns, adapters.NewSimAdapter(services)) +} + +func testDiscoverySimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) { + startedAt := time.Now() + result, err := discoverySimulation(nodes, conns, adapter) + if err != nil { + t.Fatalf("Setting up simulation failed: %v", err) + } + if result.Error != nil { + t.Fatalf("Simulation failed: %s", result.Error) + } + t.Logf("Simulation with %d nodes passed in %s", nodes, result.FinishedAt.Sub(result.StartedAt)) + var min, max time.Duration + var sum int + for _, pass := range result.Passes { + duration := pass.Sub(result.StartedAt) + if sum == 0 || duration < min { + min = duration + } + if duration > max { + max = duration + } + sum += int(duration.Nanoseconds()) + } + t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond) + finishedAt := time.Now() + t.Logf("Setup: %s, shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt)) +} + +func testDiscoveryPersistenceSimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) map[int][]byte { + persistenceEnabled = true + discoveryEnabled = true + + result, err := discoveryPersistenceSimulation(nodes, conns, adapter) + + if err != nil { + t.Fatalf("Setting up simulation failed: %v", err) + } + if result.Error != nil { + t.Fatalf("Simulation failed: %s", result.Error) + } + t.Logf("Simulation with %d nodes passed in %s", nodes, result.FinishedAt.Sub(result.StartedAt)) + // set the discovery and persistence flags again to default so other + // tests will not be affected + discoveryEnabled = true + persistenceEnabled = false + return nil +} + +func benchmarkDiscovery(b *testing.B, nodes, conns int) { + for i := 0; i < b.N; i++ { + result, err := discoverySimulation(nodes, conns, adapters.NewSimAdapter(services)) + if err != nil { + b.Fatalf("setting up simulation failed: %v", err) + } + if result.Error != nil { + b.Logf("simulation failed: %s", result.Error) + } + } +} + +func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simulations.StepResult, error) { + // create network + net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + ID: "0", + DefaultService: serviceName, + }) + defer net.Shutdown() + trigger := make(chan discover.NodeID) + ids := make([]discover.NodeID, nodes) + for i := 0; i < nodes; i++ { + conf := adapters.RandomNodeConfig() + node, err := net.NewNodeWithConfig(conf) + if err != nil { + return nil, fmt.Errorf("error starting node: %s", err) + } + if err := net.Start(node.ID()); err != nil { + return nil, fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err) + } + if err := triggerChecks(trigger, net, node.ID()); err != nil { + return nil, fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err) + } + ids[i] = node.ID() + } + + // run a simulation which connects the 10 nodes in a ring and waits + // for full peer discovery + var addrs [][]byte + action := func(ctx context.Context) error { + return nil + } + wg := sync.WaitGroup{} + for i := range ids { + // collect the overlay addresses, to + addrs = append(addrs, network.ToOverlayAddr(ids[i].Bytes())) + for j := 0; j < conns; j++ { + var k int + if j == 0 { + k = (i + 1) % len(ids) + } else { + k = rand.Intn(len(ids)) + } + wg.Add(1) + go func(i, k int) { + defer wg.Done() + net.Connect(ids[i], ids[k]) + }(i, k) + } + } + wg.Wait() + log.Debug(fmt.Sprintf("nodes: %v", len(addrs))) + // construct the peer pot, so that kademlia health can be checked + ppmap := network.NewPeerPotMap(testMinProxBinSize, addrs) + check := func(ctx context.Context, id discover.NodeID) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + node := net.GetNode(id) + if node == nil { + return false, fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return false, fmt.Errorf("error getting node client: %s", err) + } + healthy := &network.Health{} + addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes())) + if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil { + return false, fmt.Errorf("error getting node health: %s", err) + } + log.Debug(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v, saturated: %v\n%v", id, healthy.GotNN, healthy.KnowNN, healthy.Full, healthy.Hive)) + return healthy.KnowNN && healthy.GotNN && healthy.Full, nil + } + + // 64 nodes ~ 1min + // 128 nodes ~ + timeout := 300 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ + Action: action, + Trigger: trigger, + Expect: &simulations.Expectation{ + Nodes: ids, + Check: check, + }, + }) + if result.Error != nil { + return result, nil + } + + if *snapshotFile != "" { + snap, err := net.Snapshot() + if err != nil { + return nil, errors.New("no shapshot dude") + } + jsonsnapshot, err := json.Marshal(snap) + if err != nil { + return nil, fmt.Errorf("corrupt json snapshot: %v", err) + } + log.Info("writing snapshot", "file", *snapshotFile) + err = ioutil.WriteFile(*snapshotFile, jsonsnapshot, 0755) + if err != nil { + return nil, err + } + } + return result, nil +} + +func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simulations.StepResult, error) { + cleanDbStores() + defer cleanDbStores() + + // create network + net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + ID: "0", + DefaultService: serviceName, + }) + defer net.Shutdown() + trigger := make(chan discover.NodeID) + ids := make([]discover.NodeID, nodes) + var addrs [][]byte + + for i := 0; i < nodes; i++ { + conf := adapters.RandomNodeConfig() + node, err := net.NewNodeWithConfig(conf) + if err != nil { + panic(err) + } + if err != nil { + return nil, fmt.Errorf("error starting node: %s", err) + } + if err := net.Start(node.ID()); err != nil { + return nil, fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err) + } + if err := triggerChecks(trigger, net, node.ID()); err != nil { + return nil, fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err) + } + ids[i] = node.ID() + a := network.ToOverlayAddr(ids[i].Bytes()) + + addrs = append(addrs, a) + } + + // run a simulation which connects the 10 nodes in a ring and waits + // for full peer discovery + ppmap := network.NewPeerPotMap(testMinProxBinSize, addrs) + + var restartTime time.Time + + action := func(ctx context.Context) error { + ticker := time.NewTicker(500 * time.Millisecond) + + for range ticker.C { + isHealthy := true + for _, id := range ids { + //call Healthy RPC + node := net.GetNode(id) + if node == nil { + return fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return fmt.Errorf("error getting node client: %s", err) + } + healthy := &network.Health{} + addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes())) + if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil { + return fmt.Errorf("error getting node health: %s", err) + } + + log.Info(fmt.Sprintf("NODE: %s, IS HEALTHY: %t", id.String(), healthy.GotNN && healthy.KnowNN && healthy.Full)) + if !healthy.GotNN || !healthy.Full { + isHealthy = false + break + } + } + if isHealthy { + break + } + } + ticker.Stop() + + log.Info("reached healthy kademlia. starting to shutdown nodes.") + shutdownStarted := time.Now() + // stop all ids, then start them again + for _, id := range ids { + node := net.GetNode(id) + + if err := net.Stop(node.ID()); err != nil { + return fmt.Errorf("error stopping node %s: %s", node.ID().TerminalString(), err) + } + } + log.Info(fmt.Sprintf("shutting down nodes took: %s", time.Since(shutdownStarted))) + persistenceEnabled = true + discoveryEnabled = false + restartTime = time.Now() + for _, id := range ids { + node := net.GetNode(id) + if err := net.Start(node.ID()); err != nil { + return fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err) + } + if err := triggerChecks(trigger, net, node.ID()); err != nil { + return fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err) + } + } + + log.Info(fmt.Sprintf("restarting nodes took: %s", time.Since(restartTime))) + + return nil + } + //connects in a chain + wg := sync.WaitGroup{} + //connects in a ring + for i := range ids { + for j := 1; j <= conns; j++ { + k := (i + j) % len(ids) + if k == i { + k = (k + 1) % len(ids) + } + wg.Add(1) + go func(i, k int) { + defer wg.Done() + net.Connect(ids[i], ids[k]) + }(i, k) + } + } + wg.Wait() + log.Debug(fmt.Sprintf("nodes: %v", len(addrs))) + // construct the peer pot, so that kademlia health can be checked + check := func(ctx context.Context, id discover.NodeID) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + node := net.GetNode(id) + if node == nil { + return false, fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return false, fmt.Errorf("error getting node client: %s", err) + } + healthy := &network.Health{} + addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes())) + if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil { + return false, fmt.Errorf("error getting node health: %s", err) + } + log.Info(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v, saturated: %v", id, healthy.GotNN, healthy.KnowNN, healthy.Full)) + + return healthy.KnowNN && healthy.GotNN && healthy.Full, nil + } + + // 64 nodes ~ 1min + // 128 nodes ~ + timeout := 300 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ + Action: action, + Trigger: trigger, + Expect: &simulations.Expectation{ + Nodes: ids, + Check: check, + }, + }) + if result.Error != nil { + return result, nil + } + + return result, nil +} + +// triggerChecks triggers a simulation step check whenever a peer is added or +// removed from the given node, and also every second to avoid a race between +// peer events and kademlia becoming healthy +func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id discover.NodeID) error { + node := net.GetNode(id) + if node == nil { + return fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return err + } + events := make(chan *p2p.PeerEvent) + sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") + if err != nil { + return fmt.Errorf("error getting peer events for node %v: %s", id, err) + } + go func() { + defer sub.Unsubscribe() + + tick := time.NewTicker(time.Second) + defer tick.Stop() + + for { + select { + case <-events: + trigger <- id + case <-tick.C: + trigger <- id + case err := <-sub.Err(): + if err != nil { + log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) + } + return + } + } + }() + return nil +} + +func newService(ctx *adapters.ServiceContext) (node.Service, error) { + host := adapters.ExternalIP() + + addr := network.NewAddrFromNodeIDAndPort(ctx.Config.ID, host, ctx.Config.Port) + + kp := network.NewKadParams() + kp.MinProxBinSize = testMinProxBinSize + + if ctx.Config.Reachable != nil { + kp.Reachable = func(o network.OverlayAddr) bool { + return ctx.Config.Reachable(o.(*network.BzzAddr).ID()) + } + } + kad := network.NewKademlia(addr.Over(), kp) + hp := network.NewHiveParams() + hp.KeepAliveInterval = time.Duration(200) * time.Millisecond + hp.Discovery = discoveryEnabled + + log.Info(fmt.Sprintf("discovery for nodeID %s is %t", ctx.Config.ID.String(), hp.Discovery)) + + config := &network.BzzConfig{ + OverlayAddr: addr.Over(), + UnderlayAddr: addr.Under(), + HiveParams: hp, + } + + if persistenceEnabled { + log.Info(fmt.Sprintf("persistence enabled for nodeID %s", ctx.Config.ID.String())) + store, err := getDbStore(ctx.Config.ID.String()) + if err != nil { + return nil, err + } + return network.NewBzz(config, kad, store, nil, nil), nil + } + + return network.NewBzz(config, kad, nil, nil, nil), nil +} diff --git a/swarm/network/simulations/discovery/jsonsnapshot.txt b/swarm/network/simulations/discovery/jsonsnapshot.txt new file mode 100755 index 000000000..51d319dbc --- /dev/null +++ b/swarm/network/simulations/discovery/jsonsnapshot.txt @@ -0,0 +1 @@ +{"nodes":[{"config":{"id":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","private_key":"0fe997f31d91d569cd9283232c7b44ea29fbdd25b3ba351d0676d12f36236fce","name":"node01","services":["discovery"]},"up":true},{"config":{"id":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","private_key":"710b57c14e04a6800c26ed2effe1d76ae25f7e113410c30f4065cdf7639aea30","name":"node02","services":["discovery"]},"up":true},{"config":{"id":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","private_key":"370728241e71b18ecfe4979cbeeeb968f30943e8022ae1f41965ffb1959f50f6","name":"node03","services":["discovery"]},"up":true},{"config":{"id":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","private_key":"2dd7bf0eca70d3b78a01600abc6665b1abd27cee96f42a4c0aea58ebc3e0f1c0","name":"node04","services":["discovery"]},"up":true},{"config":{"id":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","private_key":"f9e67ff0212a3ddf9085385e825bf63e1619938ba8332f970b28e4241a78ec50","name":"node05","services":["discovery"]},"up":true},{"config":{"id":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","private_key":"9e6291b175d334e057dd7a902b42675f6ba4735378351ce22b742f835be1082d","name":"node06","services":["discovery"]},"up":true},{"config":{"id":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","private_key":"ebff8542458c73a3ee77b58a6e7c12ef2132f2fe1623eb47e67751ca277be79a","name":"node07","services":["discovery"]},"up":true},{"config":{"id":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","private_key":"5725444d69bdd3e6740ebf2f7aa9126d9f00297a0a83eea6e5cbeb81a7fe56f7","name":"node08","services":["discovery"]},"up":true},{"config":{"id":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","private_key":"56b698d576cb9b1758ad09ca53a61b297b59dd2e6f5aeb1828ca22beb5be2ea7","name":"node09","services":["discovery"]},"up":true},{"config":{"id":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","private_key":"99488b9451a47aa37013cc8934ecc51614a8f23f3b1fa29b6537c01e7da55530","name":"node10","services":["discovery"]},"up":true}],"conns":[{"one":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","other":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","up":true,"reverse":false,"distance":79},{"one":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","other":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","up":true,"reverse":false,"distance":77},{"one":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","other":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","up":true,"reverse":false,"distance":65},{"one":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","other":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","up":true,"reverse":true,"distance":69},{"one":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","other":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","up":true,"reverse":false,"distance":70},{"one":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","other":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","up":true,"reverse":false,"distance":69},{"one":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","other":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","up":true,"reverse":false,"distance":65},{"one":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","other":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","up":true,"reverse":false,"distance":65},{"one":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","other":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","up":true,"reverse":false,"distance":69},{"one":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","other":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","up":true,"reverse":true,"distance":65}]}
\ No newline at end of file diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go new file mode 100644 index 000000000..9419de0c6 --- /dev/null +++ b/swarm/network/simulations/overlay.go @@ -0,0 +1,144 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// You can run this simulation using +// +// go run ./swarm/network/simulations/overlay.go +package main + +import ( + "flag" + "fmt" + "net/http" + "runtime" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/state" + colorable "github.com/mattn/go-colorable" +) + +var ( + noDiscovery = flag.Bool("no-discovery", false, "disable discovery (useful if you want to load a snapshot)") + vmodule = flag.String("vmodule", "", "log filters for logger via Vmodule") + verbosity = flag.Int("verbosity", 0, "log filters for logger via Vmodule") + httpSimPort = 8888 +) + +func init() { + flag.Parse() + //initialize the logger + //this is a demonstration on how to use Vmodule for filtering logs + //provide -vmodule as param, and comma-separated values, e.g.: + //-vmodule overlay_test.go=4,simulations=3 + //above examples sets overlay_test.go logs to level 4, while packages ending with "simulations" to 3 + if *vmodule != "" { + //only enable the pattern matching handler if the flag has been provided + glogger := log.NewGlogHandler(log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))) + if *verbosity > 0 { + glogger.Verbosity(log.Lvl(*verbosity)) + } + glogger.Vmodule(*vmodule) + log.Root().SetHandler(glogger) + } +} + +type Simulation struct { + mtx sync.Mutex + stores map[discover.NodeID]*state.InmemoryStore +} + +func NewSimulation() *Simulation { + return &Simulation{ + stores: make(map[discover.NodeID]*state.InmemoryStore), + } +} + +func (s *Simulation) NewService(ctx *adapters.ServiceContext) (node.Service, error) { + id := ctx.Config.ID + s.mtx.Lock() + store, ok := s.stores[id] + if !ok { + store = state.NewInmemoryStore() + s.stores[id] = store + } + s.mtx.Unlock() + + addr := network.NewAddrFromNodeID(id) + + kp := network.NewKadParams() + kp.MinProxBinSize = 2 + kp.MaxBinSize = 4 + kp.MinBinSize = 1 + kp.MaxRetries = 1000 + kp.RetryExponent = 2 + kp.RetryInterval = 1000000 + kad := network.NewKademlia(addr.Over(), kp) + hp := network.NewHiveParams() + hp.Discovery = !*noDiscovery + hp.KeepAliveInterval = 300 * time.Millisecond + + config := &network.BzzConfig{ + OverlayAddr: addr.Over(), + UnderlayAddr: addr.Under(), + HiveParams: hp, + } + + return network.NewBzz(config, kad, store, nil, nil), nil +} + +//create the simulation network +func newSimulationNetwork() *simulations.Network { + + s := NewSimulation() + services := adapters.Services{ + "overlay": s.NewService, + } + adapter := adapters.NewSimAdapter(services) + simNetwork := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "overlay", + }) + return simNetwork +} + +//return a new http server +func newOverlaySim(sim *simulations.Network) *simulations.Server { + return simulations.NewServer(sim) +} + +// var server +func main() { + //cpu optimization + runtime.GOMAXPROCS(runtime.NumCPU()) + //run the sim + runOverlaySim() +} + +func runOverlaySim() { + //create the simulation network + net := newSimulationNetwork() + //create a http server with it + sim := newOverlaySim(net) + log.Info(fmt.Sprintf("starting simulation server on 0.0.0.0:%d...", httpSimPort)) + //start the HTTP server + http.ListenAndServe(fmt.Sprintf(":%d", httpSimPort), sim) +} diff --git a/swarm/network/simulations/overlay_test.go b/swarm/network/simulations/overlay_test.go new file mode 100644 index 000000000..4d4eb6c37 --- /dev/null +++ b/swarm/network/simulations/overlay_test.go @@ -0,0 +1,195 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. +package main + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/swarm/log" +) + +var ( + nodeCount = 16 +) + +//This test is used to test the overlay simulation. +//As the simulation is executed via a main, it is easily missed on changes +//An automated test will prevent that +//The test just connects to the simulations, starts the network, +//starts the mocker, gets the number of nodes, and stops it again. +//It also provides a documentation on the steps needed by frontends +//to use the simulations +func TestOverlaySim(t *testing.T) { + t.Skip("Test is flaky, see: https://github.com/ethersphere/go-ethereum/issues/592") + //start the simulation + log.Info("Start simulation backend") + //get the simulation networ; needed to subscribe for up events + net := newSimulationNetwork() + //create the overlay simulation + sim := newOverlaySim(net) + //create a http test server with it + srv := httptest.NewServer(sim) + defer srv.Close() + + log.Debug("Http simulation server started. Start simulation network") + //start the simulation network (initialization of simulation) + resp, err := http.Post(srv.URL+"/start", "application/json", nil) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected Status Code %d, got %d", http.StatusOK, resp.StatusCode) + } + + log.Debug("Start mocker") + //start the mocker, needs a node count and an ID + resp, err = http.PostForm(srv.URL+"/mocker/start", + url.Values{ + "node-count": {fmt.Sprintf("%d", nodeCount)}, + "mocker-type": {simulations.GetMockerList()[0]}, + }) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + reason, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + t.Fatalf("Expected Status Code %d, got %d, response body %s", http.StatusOK, resp.StatusCode, string(reason)) + } + + //variables needed to wait for nodes being up + var upCount int + trigger := make(chan discover.NodeID) + + //wait for all nodes to be up + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + //start watching node up events... + go watchSimEvents(net, ctx, trigger) + + //...and wait until all expected up events (nodeCount) have been received +LOOP: + for { + select { + case <-trigger: + //new node up event received, increase counter + upCount++ + //all expected node up events received + if upCount == nodeCount { + break LOOP + } + case <-ctx.Done(): + t.Fatalf("Timed out waiting for up events") + } + + } + + //at this point we can query the server + log.Info("Get number of nodes") + //get the number of nodes + resp, err = http.Get(srv.URL + "/nodes") + if err != nil { + t.Fatal(err) + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("err %s", resp.Status) + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + //unmarshal number of nodes from JSON response + var nodesArr []simulations.Node + err = json.Unmarshal(b, &nodesArr) + if err != nil { + t.Fatal(err) + } + + //check if number of nodes received is same as sent + if len(nodesArr) != nodeCount { + t.Fatal(fmt.Errorf("Expected %d number of nodes, got %d", nodeCount, len(nodesArr))) + } + + //need to let it run for a little while, otherwise stopping it immediately can crash due running nodes + //wanting to connect to already stopped nodes + time.Sleep(1 * time.Second) + + log.Info("Stop the network") + //stop the network + resp, err = http.Post(srv.URL+"/stop", "application/json", nil) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("err %s", resp.Status) + } + + log.Info("Reset the network") + //reset the network (removes all nodes and connections) + resp, err = http.Post(srv.URL+"/reset", "application/json", nil) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("err %s", resp.Status) + } +} + +//watch for events so we know when all nodes are up +func watchSimEvents(net *simulations.Network, ctx context.Context, trigger chan discover.NodeID) { + events := make(chan *simulations.Event) + sub := net.Events().Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case ev := <-events: + //only catch node up events + if ev.Type == simulations.EventTypeNode { + if ev.Node.Up { + log.Debug("got node up event", "event", ev, "node", ev.Node.Config.ID) + select { + case trigger <- ev.Node.Config.ID: + case <-ctx.Done(): + return + } + } + } + case <-ctx.Done(): + return + } + } +} |