aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/simulations
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/simulations
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/simulations')
-rw-r--r--swarm/network/simulations/discovery/discovery.go17
-rw-r--r--swarm/network/simulations/discovery/discovery_test.go586
-rwxr-xr-xswarm/network/simulations/discovery/jsonsnapshot.txt1
-rw-r--r--swarm/network/simulations/overlay.go144
-rw-r--r--swarm/network/simulations/overlay_test.go195
5 files changed, 943 insertions, 0 deletions
diff --git a/swarm/network/simulations/discovery/discovery.go b/swarm/network/simulations/discovery/discovery.go
new file mode 100644
index 000000000..a6ff5fd45
--- /dev/null
+++ b/swarm/network/simulations/discovery/discovery.go
@@ -0,0 +1,17 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package discovery
diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go
new file mode 100644
index 000000000..acf3479e5
--- /dev/null
+++ b/swarm/network/simulations/discovery/discovery_test.go
@@ -0,0 +1,586 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package discovery
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ colorable "github.com/mattn/go-colorable"
+)
+
+// serviceName is used with the exec adapter so the exec'd binary knows which
+// service to execute
+const serviceName = "discovery"
+const testMinProxBinSize = 2
+const discoveryPersistenceDatadir = "discovery_persistence_test_store"
+
+var discoveryPersistencePath = path.Join(os.TempDir(), discoveryPersistenceDatadir)
+var discoveryEnabled = true
+var persistenceEnabled = false
+
+var services = adapters.Services{
+ serviceName: newService,
+}
+
+func cleanDbStores() error {
+ entries, err := ioutil.ReadDir(os.TempDir())
+ if err != nil {
+ return err
+ }
+
+ for _, f := range entries {
+ if strings.HasPrefix(f.Name(), discoveryPersistenceDatadir) {
+ os.RemoveAll(path.Join(os.TempDir(), f.Name()))
+ }
+ }
+ return nil
+
+}
+
+func getDbStore(nodeID string) (*state.DBStore, error) {
+ if _, err := os.Stat(discoveryPersistencePath + "_" + nodeID); os.IsNotExist(err) {
+ log.Info(fmt.Sprintf("directory for nodeID %s does not exist. creating...", nodeID))
+ ioutil.TempDir("", discoveryPersistencePath+"_"+nodeID)
+ }
+ log.Info(fmt.Sprintf("opening storage directory for nodeID %s", nodeID))
+ store, err := state.NewDBStore(discoveryPersistencePath + "_" + nodeID)
+ if err != nil {
+ return nil, err
+ }
+ return store, nil
+}
+
+var (
+ nodeCount = flag.Int("nodes", 10, "number of nodes to create (default 10)")
+ initCount = flag.Int("conns", 1, "number of originally connected peers (default 1)")
+ snapshotFile = flag.String("snapshot", "", "create snapshot")
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+ rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
+)
+
+func init() {
+ flag.Parse()
+ // register the discovery service which will run as a devp2p
+ // protocol when using the exec adapter
+ adapters.RegisterServices(services)
+
+ log.PrintOrigins(true)
+ log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog))))
+}
+
+// Benchmarks to test the average time it takes for an N-node ring
+// to full a healthy kademlia topology
+func BenchmarkDiscovery_8_1(b *testing.B) { benchmarkDiscovery(b, 8, 1) }
+func BenchmarkDiscovery_16_1(b *testing.B) { benchmarkDiscovery(b, 16, 1) }
+func BenchmarkDiscovery_32_1(b *testing.B) { benchmarkDiscovery(b, 32, 1) }
+func BenchmarkDiscovery_64_1(b *testing.B) { benchmarkDiscovery(b, 64, 1) }
+func BenchmarkDiscovery_128_1(b *testing.B) { benchmarkDiscovery(b, 128, 1) }
+func BenchmarkDiscovery_256_1(b *testing.B) { benchmarkDiscovery(b, 256, 1) }
+
+func BenchmarkDiscovery_8_2(b *testing.B) { benchmarkDiscovery(b, 8, 2) }
+func BenchmarkDiscovery_16_2(b *testing.B) { benchmarkDiscovery(b, 16, 2) }
+func BenchmarkDiscovery_32_2(b *testing.B) { benchmarkDiscovery(b, 32, 2) }
+func BenchmarkDiscovery_64_2(b *testing.B) { benchmarkDiscovery(b, 64, 2) }
+func BenchmarkDiscovery_128_2(b *testing.B) { benchmarkDiscovery(b, 128, 2) }
+func BenchmarkDiscovery_256_2(b *testing.B) { benchmarkDiscovery(b, 256, 2) }
+
+func BenchmarkDiscovery_8_4(b *testing.B) { benchmarkDiscovery(b, 8, 4) }
+func BenchmarkDiscovery_16_4(b *testing.B) { benchmarkDiscovery(b, 16, 4) }
+func BenchmarkDiscovery_32_4(b *testing.B) { benchmarkDiscovery(b, 32, 4) }
+func BenchmarkDiscovery_64_4(b *testing.B) { benchmarkDiscovery(b, 64, 4) }
+func BenchmarkDiscovery_128_4(b *testing.B) { benchmarkDiscovery(b, 128, 4) }
+func BenchmarkDiscovery_256_4(b *testing.B) { benchmarkDiscovery(b, 256, 4) }
+
+func TestDiscoverySimulationDockerAdapter(t *testing.T) {
+ testDiscoverySimulationDockerAdapter(t, *nodeCount, *initCount)
+}
+
+func testDiscoverySimulationDockerAdapter(t *testing.T, nodes, conns int) {
+ adapter, err := adapters.NewDockerAdapter()
+ if err != nil {
+ if err == adapters.ErrLinuxOnly {
+ t.Skip(err)
+ } else {
+ t.Fatal(err)
+ }
+ }
+ testDiscoverySimulation(t, nodes, conns, adapter)
+}
+
+func TestDiscoverySimulationExecAdapter(t *testing.T) {
+ testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount)
+}
+
+func testDiscoverySimulationExecAdapter(t *testing.T, nodes, conns int) {
+ baseDir, err := ioutil.TempDir("", "swarm-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(baseDir)
+ testDiscoverySimulation(t, nodes, conns, adapters.NewExecAdapter(baseDir))
+}
+
+func TestDiscoverySimulationSimAdapter(t *testing.T) {
+ testDiscoverySimulationSimAdapter(t, *nodeCount, *initCount)
+}
+
+func TestDiscoveryPersistenceSimulationSimAdapter(t *testing.T) {
+ testDiscoveryPersistenceSimulationSimAdapter(t, *nodeCount, *initCount)
+}
+
+func testDiscoveryPersistenceSimulationSimAdapter(t *testing.T, nodes, conns int) {
+ testDiscoveryPersistenceSimulation(t, nodes, conns, adapters.NewSimAdapter(services))
+}
+
+func testDiscoverySimulationSimAdapter(t *testing.T, nodes, conns int) {
+ testDiscoverySimulation(t, nodes, conns, adapters.NewSimAdapter(services))
+}
+
+func testDiscoverySimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) {
+ startedAt := time.Now()
+ result, err := discoverySimulation(nodes, conns, adapter)
+ if err != nil {
+ t.Fatalf("Setting up simulation failed: %v", err)
+ }
+ if result.Error != nil {
+ t.Fatalf("Simulation failed: %s", result.Error)
+ }
+ t.Logf("Simulation with %d nodes passed in %s", nodes, result.FinishedAt.Sub(result.StartedAt))
+ var min, max time.Duration
+ var sum int
+ for _, pass := range result.Passes {
+ duration := pass.Sub(result.StartedAt)
+ if sum == 0 || duration < min {
+ min = duration
+ }
+ if duration > max {
+ max = duration
+ }
+ sum += int(duration.Nanoseconds())
+ }
+ t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond)
+ finishedAt := time.Now()
+ t.Logf("Setup: %s, shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt))
+}
+
+func testDiscoveryPersistenceSimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) map[int][]byte {
+ persistenceEnabled = true
+ discoveryEnabled = true
+
+ result, err := discoveryPersistenceSimulation(nodes, conns, adapter)
+
+ if err != nil {
+ t.Fatalf("Setting up simulation failed: %v", err)
+ }
+ if result.Error != nil {
+ t.Fatalf("Simulation failed: %s", result.Error)
+ }
+ t.Logf("Simulation with %d nodes passed in %s", nodes, result.FinishedAt.Sub(result.StartedAt))
+ // set the discovery and persistence flags again to default so other
+ // tests will not be affected
+ discoveryEnabled = true
+ persistenceEnabled = false
+ return nil
+}
+
+func benchmarkDiscovery(b *testing.B, nodes, conns int) {
+ for i := 0; i < b.N; i++ {
+ result, err := discoverySimulation(nodes, conns, adapters.NewSimAdapter(services))
+ if err != nil {
+ b.Fatalf("setting up simulation failed: %v", err)
+ }
+ if result.Error != nil {
+ b.Logf("simulation failed: %s", result.Error)
+ }
+ }
+}
+
+func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simulations.StepResult, error) {
+ // create network
+ net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ ID: "0",
+ DefaultService: serviceName,
+ })
+ defer net.Shutdown()
+ trigger := make(chan discover.NodeID)
+ ids := make([]discover.NodeID, nodes)
+ for i := 0; i < nodes; i++ {
+ conf := adapters.RandomNodeConfig()
+ node, err := net.NewNodeWithConfig(conf)
+ if err != nil {
+ return nil, fmt.Errorf("error starting node: %s", err)
+ }
+ if err := net.Start(node.ID()); err != nil {
+ return nil, fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err)
+ }
+ if err := triggerChecks(trigger, net, node.ID()); err != nil {
+ return nil, fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err)
+ }
+ ids[i] = node.ID()
+ }
+
+ // run a simulation which connects the 10 nodes in a ring and waits
+ // for full peer discovery
+ var addrs [][]byte
+ action := func(ctx context.Context) error {
+ return nil
+ }
+ wg := sync.WaitGroup{}
+ for i := range ids {
+ // collect the overlay addresses, to
+ addrs = append(addrs, network.ToOverlayAddr(ids[i].Bytes()))
+ for j := 0; j < conns; j++ {
+ var k int
+ if j == 0 {
+ k = (i + 1) % len(ids)
+ } else {
+ k = rand.Intn(len(ids))
+ }
+ wg.Add(1)
+ go func(i, k int) {
+ defer wg.Done()
+ net.Connect(ids[i], ids[k])
+ }(i, k)
+ }
+ }
+ wg.Wait()
+ log.Debug(fmt.Sprintf("nodes: %v", len(addrs)))
+ // construct the peer pot, so that kademlia health can be checked
+ ppmap := network.NewPeerPotMap(testMinProxBinSize, addrs)
+ check := func(ctx context.Context, id discover.NodeID) (bool, error) {
+ select {
+ case <-ctx.Done():
+ return false, ctx.Err()
+ default:
+ }
+
+ node := net.GetNode(id)
+ if node == nil {
+ return false, fmt.Errorf("unknown node: %s", id)
+ }
+ client, err := node.Client()
+ if err != nil {
+ return false, fmt.Errorf("error getting node client: %s", err)
+ }
+ healthy := &network.Health{}
+ addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
+ if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil {
+ return false, fmt.Errorf("error getting node health: %s", err)
+ }
+ log.Debug(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v, saturated: %v\n%v", id, healthy.GotNN, healthy.KnowNN, healthy.Full, healthy.Hive))
+ return healthy.KnowNN && healthy.GotNN && healthy.Full, nil
+ }
+
+ // 64 nodes ~ 1min
+ // 128 nodes ~
+ timeout := 300 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
+ Action: action,
+ Trigger: trigger,
+ Expect: &simulations.Expectation{
+ Nodes: ids,
+ Check: check,
+ },
+ })
+ if result.Error != nil {
+ return result, nil
+ }
+
+ if *snapshotFile != "" {
+ snap, err := net.Snapshot()
+ if err != nil {
+ return nil, errors.New("no shapshot dude")
+ }
+ jsonsnapshot, err := json.Marshal(snap)
+ if err != nil {
+ return nil, fmt.Errorf("corrupt json snapshot: %v", err)
+ }
+ log.Info("writing snapshot", "file", *snapshotFile)
+ err = ioutil.WriteFile(*snapshotFile, jsonsnapshot, 0755)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return result, nil
+}
+
+func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simulations.StepResult, error) {
+ cleanDbStores()
+ defer cleanDbStores()
+
+ // create network
+ net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ ID: "0",
+ DefaultService: serviceName,
+ })
+ defer net.Shutdown()
+ trigger := make(chan discover.NodeID)
+ ids := make([]discover.NodeID, nodes)
+ var addrs [][]byte
+
+ for i := 0; i < nodes; i++ {
+ conf := adapters.RandomNodeConfig()
+ node, err := net.NewNodeWithConfig(conf)
+ if err != nil {
+ panic(err)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("error starting node: %s", err)
+ }
+ if err := net.Start(node.ID()); err != nil {
+ return nil, fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err)
+ }
+ if err := triggerChecks(trigger, net, node.ID()); err != nil {
+ return nil, fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err)
+ }
+ ids[i] = node.ID()
+ a := network.ToOverlayAddr(ids[i].Bytes())
+
+ addrs = append(addrs, a)
+ }
+
+ // run a simulation which connects the 10 nodes in a ring and waits
+ // for full peer discovery
+ ppmap := network.NewPeerPotMap(testMinProxBinSize, addrs)
+
+ var restartTime time.Time
+
+ action := func(ctx context.Context) error {
+ ticker := time.NewTicker(500 * time.Millisecond)
+
+ for range ticker.C {
+ isHealthy := true
+ for _, id := range ids {
+ //call Healthy RPC
+ node := net.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("unknown node: %s", id)
+ }
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("error getting node client: %s", err)
+ }
+ healthy := &network.Health{}
+ addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
+ if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil {
+ return fmt.Errorf("error getting node health: %s", err)
+ }
+
+ log.Info(fmt.Sprintf("NODE: %s, IS HEALTHY: %t", id.String(), healthy.GotNN && healthy.KnowNN && healthy.Full))
+ if !healthy.GotNN || !healthy.Full {
+ isHealthy = false
+ break
+ }
+ }
+ if isHealthy {
+ break
+ }
+ }
+ ticker.Stop()
+
+ log.Info("reached healthy kademlia. starting to shutdown nodes.")
+ shutdownStarted := time.Now()
+ // stop all ids, then start them again
+ for _, id := range ids {
+ node := net.GetNode(id)
+
+ if err := net.Stop(node.ID()); err != nil {
+ return fmt.Errorf("error stopping node %s: %s", node.ID().TerminalString(), err)
+ }
+ }
+ log.Info(fmt.Sprintf("shutting down nodes took: %s", time.Since(shutdownStarted)))
+ persistenceEnabled = true
+ discoveryEnabled = false
+ restartTime = time.Now()
+ for _, id := range ids {
+ node := net.GetNode(id)
+ if err := net.Start(node.ID()); err != nil {
+ return fmt.Errorf("error starting node %s: %s", node.ID().TerminalString(), err)
+ }
+ if err := triggerChecks(trigger, net, node.ID()); err != nil {
+ return fmt.Errorf("error triggering checks for node %s: %s", node.ID().TerminalString(), err)
+ }
+ }
+
+ log.Info(fmt.Sprintf("restarting nodes took: %s", time.Since(restartTime)))
+
+ return nil
+ }
+ //connects in a chain
+ wg := sync.WaitGroup{}
+ //connects in a ring
+ for i := range ids {
+ for j := 1; j <= conns; j++ {
+ k := (i + j) % len(ids)
+ if k == i {
+ k = (k + 1) % len(ids)
+ }
+ wg.Add(1)
+ go func(i, k int) {
+ defer wg.Done()
+ net.Connect(ids[i], ids[k])
+ }(i, k)
+ }
+ }
+ wg.Wait()
+ log.Debug(fmt.Sprintf("nodes: %v", len(addrs)))
+ // construct the peer pot, so that kademlia health can be checked
+ check := func(ctx context.Context, id discover.NodeID) (bool, error) {
+ select {
+ case <-ctx.Done():
+ return false, ctx.Err()
+ default:
+ }
+
+ node := net.GetNode(id)
+ if node == nil {
+ return false, fmt.Errorf("unknown node: %s", id)
+ }
+ client, err := node.Client()
+ if err != nil {
+ return false, fmt.Errorf("error getting node client: %s", err)
+ }
+ healthy := &network.Health{}
+ addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
+ if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil {
+ return false, fmt.Errorf("error getting node health: %s", err)
+ }
+ log.Info(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v, saturated: %v", id, healthy.GotNN, healthy.KnowNN, healthy.Full))
+
+ return healthy.KnowNN && healthy.GotNN && healthy.Full, nil
+ }
+
+ // 64 nodes ~ 1min
+ // 128 nodes ~
+ timeout := 300 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
+ Action: action,
+ Trigger: trigger,
+ Expect: &simulations.Expectation{
+ Nodes: ids,
+ Check: check,
+ },
+ })
+ if result.Error != nil {
+ return result, nil
+ }
+
+ return result, nil
+}
+
+// triggerChecks triggers a simulation step check whenever a peer is added or
+// removed from the given node, and also every second to avoid a race between
+// peer events and kademlia becoming healthy
+func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id discover.NodeID) error {
+ node := net.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("unknown node: %s", id)
+ }
+ client, err := node.Client()
+ if err != nil {
+ return err
+ }
+ events := make(chan *p2p.PeerEvent)
+ sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
+ if err != nil {
+ return fmt.Errorf("error getting peer events for node %v: %s", id, err)
+ }
+ go func() {
+ defer sub.Unsubscribe()
+
+ tick := time.NewTicker(time.Second)
+ defer tick.Stop()
+
+ for {
+ select {
+ case <-events:
+ trigger <- id
+ case <-tick.C:
+ trigger <- id
+ case err := <-sub.Err():
+ if err != nil {
+ log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err)
+ }
+ return
+ }
+ }
+ }()
+ return nil
+}
+
+func newService(ctx *adapters.ServiceContext) (node.Service, error) {
+ host := adapters.ExternalIP()
+
+ addr := network.NewAddrFromNodeIDAndPort(ctx.Config.ID, host, ctx.Config.Port)
+
+ kp := network.NewKadParams()
+ kp.MinProxBinSize = testMinProxBinSize
+
+ if ctx.Config.Reachable != nil {
+ kp.Reachable = func(o network.OverlayAddr) bool {
+ return ctx.Config.Reachable(o.(*network.BzzAddr).ID())
+ }
+ }
+ kad := network.NewKademlia(addr.Over(), kp)
+ hp := network.NewHiveParams()
+ hp.KeepAliveInterval = time.Duration(200) * time.Millisecond
+ hp.Discovery = discoveryEnabled
+
+ log.Info(fmt.Sprintf("discovery for nodeID %s is %t", ctx.Config.ID.String(), hp.Discovery))
+
+ config := &network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: hp,
+ }
+
+ if persistenceEnabled {
+ log.Info(fmt.Sprintf("persistence enabled for nodeID %s", ctx.Config.ID.String()))
+ store, err := getDbStore(ctx.Config.ID.String())
+ if err != nil {
+ return nil, err
+ }
+ return network.NewBzz(config, kad, store, nil, nil), nil
+ }
+
+ return network.NewBzz(config, kad, nil, nil, nil), nil
+}
diff --git a/swarm/network/simulations/discovery/jsonsnapshot.txt b/swarm/network/simulations/discovery/jsonsnapshot.txt
new file mode 100755
index 000000000..51d319dbc
--- /dev/null
+++ b/swarm/network/simulations/discovery/jsonsnapshot.txt
@@ -0,0 +1 @@
+{"nodes":[{"config":{"id":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","private_key":"0fe997f31d91d569cd9283232c7b44ea29fbdd25b3ba351d0676d12f36236fce","name":"node01","services":["discovery"]},"up":true},{"config":{"id":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","private_key":"710b57c14e04a6800c26ed2effe1d76ae25f7e113410c30f4065cdf7639aea30","name":"node02","services":["discovery"]},"up":true},{"config":{"id":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","private_key":"370728241e71b18ecfe4979cbeeeb968f30943e8022ae1f41965ffb1959f50f6","name":"node03","services":["discovery"]},"up":true},{"config":{"id":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","private_key":"2dd7bf0eca70d3b78a01600abc6665b1abd27cee96f42a4c0aea58ebc3e0f1c0","name":"node04","services":["discovery"]},"up":true},{"config":{"id":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","private_key":"f9e67ff0212a3ddf9085385e825bf63e1619938ba8332f970b28e4241a78ec50","name":"node05","services":["discovery"]},"up":true},{"config":{"id":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","private_key":"9e6291b175d334e057dd7a902b42675f6ba4735378351ce22b742f835be1082d","name":"node06","services":["discovery"]},"up":true},{"config":{"id":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","private_key":"ebff8542458c73a3ee77b58a6e7c12ef2132f2fe1623eb47e67751ca277be79a","name":"node07","services":["discovery"]},"up":true},{"config":{"id":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","private_key":"5725444d69bdd3e6740ebf2f7aa9126d9f00297a0a83eea6e5cbeb81a7fe56f7","name":"node08","services":["discovery"]},"up":true},{"config":{"id":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","private_key":"56b698d576cb9b1758ad09ca53a61b297b59dd2e6f5aeb1828ca22beb5be2ea7","name":"node09","services":["discovery"]},"up":true},{"config":{"id":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","private_key":"99488b9451a47aa37013cc8934ecc51614a8f23f3b1fa29b6537c01e7da55530","name":"node10","services":["discovery"]},"up":true}],"conns":[{"one":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","other":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","up":true,"reverse":false,"distance":79},{"one":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","other":"b3bdd767da3baf548169c34731204e18c2661fdd6f99859aad09c0e3a575cebbdbff3ad2fce53b3af9226e421a0ef5b7c0d934b382054b1aab0dd37586bda390","up":true,"reverse":false,"distance":77},{"one":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","other":"b6dbb137efcd90412472b7015a5c94800be2ae4d9a2bb5a93e6edd56358c170031dba7552bda187ef60bf84cdafc0f7f800d70a06e75359c13bab53ae1df2849","up":true,"reverse":false,"distance":65},{"one":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","other":"5a9a437cf250d662b6c13ae07b3713db497d84f16e98dc4dc849a91ebc4e4a4056c4911150b03633ff9a12af5b59036242325967944a5354be936e101900051e","up":true,"reverse":true,"distance":69},{"one":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","other":"110cedce4adb25a6cb0ff756e28cde22421e825c1110f7472b52a8d4de604a9ffe7d01f5e71aff483ec0a3fa8bfe8e2cb53eb85b8944f839331351628f1b209d","up":true,"reverse":false,"distance":70},{"one":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","other":"3ed77f18fe4fcfe40621e525c8c329bd066c477d01ff1d237458d66d0d3646961c0f943ae773a3ab78b07579dc0ee28eae5a89936c11ccaf43e12b86fc3f63ea","up":true,"reverse":false,"distance":69},{"one":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","other":"62481ad258b8d3ddd9262adcdccec70288e879db1e74565599cf4aa277d7f03c333d2ef0d9a06699c779f9d274a2b84a32506009afe5ee5c4e9574302c04a2bb","up":true,"reverse":false,"distance":65},{"one":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","other":"f4718b84450d7f5444394533f5312f0196f2c2c7d867fb3ddd82fbafdc21f3c478555c96401357aa8c68582f39ad4e752aa61ff19e781ca5c4525fc258853eec","up":true,"reverse":false,"distance":65},{"one":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","other":"5019a6b7ab464e4c443a1fb74a94fbf4fe2754999ad2b08a6585cc44e0cf53a0a964d5e2cf5069b5a5660b0346a4fd9f6d998b8843be6b4be8858431c813bd23","up":true,"reverse":false,"distance":69},{"one":"b212f4df8ee646c3a6cd566a6544ec4534ebcc3be9ab697010225014136ab9cdeaca96b8119ca07e3ff69f7f097e162793d8262aaee2a79367a298a77ae2cfeb","other":"17917299fdc3a358f7b7336157e927c22e3e0c661fb0e630df3821f238fff46e2e6387cfaf2a6fdb33cf5bb005a6248bea664645133c28f068578c0fb362d132","up":true,"reverse":true,"distance":65}]} \ No newline at end of file
diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go
new file mode 100644
index 000000000..9419de0c6
--- /dev/null
+++ b/swarm/network/simulations/overlay.go
@@ -0,0 +1,144 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// You can run this simulation using
+//
+// go run ./swarm/network/simulations/overlay.go
+package main
+
+import (
+ "flag"
+ "fmt"
+ "net/http"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ colorable "github.com/mattn/go-colorable"
+)
+
+var (
+ noDiscovery = flag.Bool("no-discovery", false, "disable discovery (useful if you want to load a snapshot)")
+ vmodule = flag.String("vmodule", "", "log filters for logger via Vmodule")
+ verbosity = flag.Int("verbosity", 0, "log filters for logger via Vmodule")
+ httpSimPort = 8888
+)
+
+func init() {
+ flag.Parse()
+ //initialize the logger
+ //this is a demonstration on how to use Vmodule for filtering logs
+ //provide -vmodule as param, and comma-separated values, e.g.:
+ //-vmodule overlay_test.go=4,simulations=3
+ //above examples sets overlay_test.go logs to level 4, while packages ending with "simulations" to 3
+ if *vmodule != "" {
+ //only enable the pattern matching handler if the flag has been provided
+ glogger := log.NewGlogHandler(log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))
+ if *verbosity > 0 {
+ glogger.Verbosity(log.Lvl(*verbosity))
+ }
+ glogger.Vmodule(*vmodule)
+ log.Root().SetHandler(glogger)
+ }
+}
+
+type Simulation struct {
+ mtx sync.Mutex
+ stores map[discover.NodeID]*state.InmemoryStore
+}
+
+func NewSimulation() *Simulation {
+ return &Simulation{
+ stores: make(map[discover.NodeID]*state.InmemoryStore),
+ }
+}
+
+func (s *Simulation) NewService(ctx *adapters.ServiceContext) (node.Service, error) {
+ id := ctx.Config.ID
+ s.mtx.Lock()
+ store, ok := s.stores[id]
+ if !ok {
+ store = state.NewInmemoryStore()
+ s.stores[id] = store
+ }
+ s.mtx.Unlock()
+
+ addr := network.NewAddrFromNodeID(id)
+
+ kp := network.NewKadParams()
+ kp.MinProxBinSize = 2
+ kp.MaxBinSize = 4
+ kp.MinBinSize = 1
+ kp.MaxRetries = 1000
+ kp.RetryExponent = 2
+ kp.RetryInterval = 1000000
+ kad := network.NewKademlia(addr.Over(), kp)
+ hp := network.NewHiveParams()
+ hp.Discovery = !*noDiscovery
+ hp.KeepAliveInterval = 300 * time.Millisecond
+
+ config := &network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: hp,
+ }
+
+ return network.NewBzz(config, kad, store, nil, nil), nil
+}
+
+//create the simulation network
+func newSimulationNetwork() *simulations.Network {
+
+ s := NewSimulation()
+ services := adapters.Services{
+ "overlay": s.NewService,
+ }
+ adapter := adapters.NewSimAdapter(services)
+ simNetwork := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ DefaultService: "overlay",
+ })
+ return simNetwork
+}
+
+//return a new http server
+func newOverlaySim(sim *simulations.Network) *simulations.Server {
+ return simulations.NewServer(sim)
+}
+
+// var server
+func main() {
+ //cpu optimization
+ runtime.GOMAXPROCS(runtime.NumCPU())
+ //run the sim
+ runOverlaySim()
+}
+
+func runOverlaySim() {
+ //create the simulation network
+ net := newSimulationNetwork()
+ //create a http server with it
+ sim := newOverlaySim(net)
+ log.Info(fmt.Sprintf("starting simulation server on 0.0.0.0:%d...", httpSimPort))
+ //start the HTTP server
+ http.ListenAndServe(fmt.Sprintf(":%d", httpSimPort), sim)
+}
diff --git a/swarm/network/simulations/overlay_test.go b/swarm/network/simulations/overlay_test.go
new file mode 100644
index 000000000..4d4eb6c37
--- /dev/null
+++ b/swarm/network/simulations/overlay_test.go
@@ -0,0 +1,195 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/swarm/log"
+)
+
+var (
+ nodeCount = 16
+)
+
+//This test is used to test the overlay simulation.
+//As the simulation is executed via a main, it is easily missed on changes
+//An automated test will prevent that
+//The test just connects to the simulations, starts the network,
+//starts the mocker, gets the number of nodes, and stops it again.
+//It also provides a documentation on the steps needed by frontends
+//to use the simulations
+func TestOverlaySim(t *testing.T) {
+ t.Skip("Test is flaky, see: https://github.com/ethersphere/go-ethereum/issues/592")
+ //start the simulation
+ log.Info("Start simulation backend")
+ //get the simulation networ; needed to subscribe for up events
+ net := newSimulationNetwork()
+ //create the overlay simulation
+ sim := newOverlaySim(net)
+ //create a http test server with it
+ srv := httptest.NewServer(sim)
+ defer srv.Close()
+
+ log.Debug("Http simulation server started. Start simulation network")
+ //start the simulation network (initialization of simulation)
+ resp, err := http.Post(srv.URL+"/start", "application/json", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("Expected Status Code %d, got %d", http.StatusOK, resp.StatusCode)
+ }
+
+ log.Debug("Start mocker")
+ //start the mocker, needs a node count and an ID
+ resp, err = http.PostForm(srv.URL+"/mocker/start",
+ url.Values{
+ "node-count": {fmt.Sprintf("%d", nodeCount)},
+ "mocker-type": {simulations.GetMockerList()[0]},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ reason, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ t.Fatalf("Expected Status Code %d, got %d, response body %s", http.StatusOK, resp.StatusCode, string(reason))
+ }
+
+ //variables needed to wait for nodes being up
+ var upCount int
+ trigger := make(chan discover.NodeID)
+
+ //wait for all nodes to be up
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ //start watching node up events...
+ go watchSimEvents(net, ctx, trigger)
+
+ //...and wait until all expected up events (nodeCount) have been received
+LOOP:
+ for {
+ select {
+ case <-trigger:
+ //new node up event received, increase counter
+ upCount++
+ //all expected node up events received
+ if upCount == nodeCount {
+ break LOOP
+ }
+ case <-ctx.Done():
+ t.Fatalf("Timed out waiting for up events")
+ }
+
+ }
+
+ //at this point we can query the server
+ log.Info("Get number of nodes")
+ //get the number of nodes
+ resp, err = http.Get(srv.URL + "/nodes")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("err %s", resp.Status)
+ }
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ //unmarshal number of nodes from JSON response
+ var nodesArr []simulations.Node
+ err = json.Unmarshal(b, &nodesArr)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ //check if number of nodes received is same as sent
+ if len(nodesArr) != nodeCount {
+ t.Fatal(fmt.Errorf("Expected %d number of nodes, got %d", nodeCount, len(nodesArr)))
+ }
+
+ //need to let it run for a little while, otherwise stopping it immediately can crash due running nodes
+ //wanting to connect to already stopped nodes
+ time.Sleep(1 * time.Second)
+
+ log.Info("Stop the network")
+ //stop the network
+ resp, err = http.Post(srv.URL+"/stop", "application/json", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("err %s", resp.Status)
+ }
+
+ log.Info("Reset the network")
+ //reset the network (removes all nodes and connections)
+ resp, err = http.Post(srv.URL+"/reset", "application/json", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("err %s", resp.Status)
+ }
+}
+
+//watch for events so we know when all nodes are up
+func watchSimEvents(net *simulations.Network, ctx context.Context, trigger chan discover.NodeID) {
+ events := make(chan *simulations.Event)
+ sub := net.Events().Subscribe(events)
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case ev := <-events:
+ //only catch node up events
+ if ev.Type == simulations.EventTypeNode {
+ if ev.Node.Up {
+ log.Debug("got node up event", "event", ev, "node", ev.Node.Config.ID)
+ select {
+ case trigger <- ev.Node.Config.ID:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}