aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/discv5/net.go2
-rw-r--r--p2p/discv5/ntp.go8
-rw-r--r--p2p/discv5/ticket.go12
-rw-r--r--p2p/simulations/adapters/state.go35
-rw-r--r--p2p/simulations/http.go64
-rw-r--r--p2p/simulations/mocker.go192
-rw-r--r--p2p/simulations/mocker_test.go171
-rw-r--r--p2p/simulations/network.go27
8 files changed, 494 insertions, 17 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index a39cfcc64..2fbb60824 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -684,7 +684,7 @@ func (net *Network) refresh(done chan<- struct{}) {
seeds = net.nursery
}
if len(seeds) == 0 {
- log.Trace(fmt.Sprint("no seed nodes found"))
+ log.Trace("no seed nodes found")
close(done)
return
}
diff --git a/p2p/discv5/ntp.go b/p2p/discv5/ntp.go
index f78d5dc43..4fb5f657a 100644
--- a/p2p/discv5/ntp.go
+++ b/p2p/discv5/ntp.go
@@ -54,10 +54,10 @@ func checkClockDrift() {
howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings")
separator := strings.Repeat("-", len(warning))
- log.Warn(fmt.Sprint(separator))
- log.Warn(fmt.Sprint(warning))
- log.Warn(fmt.Sprint(howtofix))
- log.Warn(fmt.Sprint(separator))
+ log.Warn(separator)
+ log.Warn(warning)
+ log.Warn(howtofix)
+ log.Warn(separator)
} else {
log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift))
}
diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go
index 48dd114f0..193cef4be 100644
--- a/p2p/discv5/ticket.go
+++ b/p2p/discv5/ticket.go
@@ -398,12 +398,12 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration
//s.removeExcessTickets(topic)
if len(tickets.buckets) != 0 {
empty = false
- if list := tickets.buckets[bucket]; list != nil {
- for _, ref := range list {
- //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
- if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
- nextTicket = ref
- }
+
+ list := tickets.buckets[bucket]
+ for _, ref := range list {
+ //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
+ if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
+ nextTicket = ref
}
}
}
diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go
new file mode 100644
index 000000000..8b1dfef90
--- /dev/null
+++ b/p2p/simulations/adapters/state.go
@@ -0,0 +1,35 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package adapters
+
+type SimStateStore struct {
+ m map[string][]byte
+}
+
+func (self *SimStateStore) Load(s string) ([]byte, error) {
+ return self.m[s], nil
+}
+
+func (self *SimStateStore) Save(s string, data []byte) error {
+ self.m[s] = data
+ return nil
+}
+
+func NewSimStateStore() *SimStateStore {
+ return &SimStateStore{
+ make(map[string][]byte),
+ }
+}
diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go
index 3fa8b9292..97dd742e8 100644
--- a/p2p/simulations/http.go
+++ b/p2p/simulations/http.go
@@ -27,6 +27,7 @@ import (
"net/http"
"strconv"
"strings"
+ "sync"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
@@ -263,8 +264,10 @@ func (c *Client) Send(method, path string, in, out interface{}) error {
// Server is an HTTP server providing an API to manage a simulation network
type Server struct {
- router *httprouter.Router
- network *Network
+ router *httprouter.Router
+ network *Network
+ mockerStop chan struct{} // when set, stops the current mocker
+ mockerMtx sync.Mutex // synchronises access to the mockerStop field
}
// NewServer returns a new simulation API server
@@ -278,6 +281,10 @@ func NewServer(network *Network) *Server {
s.GET("/", s.GetNetwork)
s.POST("/start", s.StartNetwork)
s.POST("/stop", s.StopNetwork)
+ s.POST("/mocker/start", s.StartMocker)
+ s.POST("/mocker/stop", s.StopMocker)
+ s.GET("/mocker", s.GetMockers)
+ s.POST("/reset", s.ResetNetwork)
s.GET("/events", s.StreamNetworkEvents)
s.GET("/snapshot", s.CreateSnapshot)
s.POST("/snapshot", s.LoadSnapshot)
@@ -318,6 +325,59 @@ func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}
+// StartMocker starts the mocker node simulation
+func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) {
+ s.mockerMtx.Lock()
+ defer s.mockerMtx.Unlock()
+ if s.mockerStop != nil {
+ http.Error(w, "mocker already running", http.StatusInternalServerError)
+ return
+ }
+ mockerType := req.FormValue("mocker-type")
+ mockerFn := LookupMocker(mockerType)
+ if mockerFn == nil {
+ http.Error(w, fmt.Sprintf("unknown mocker type %q", mockerType), http.StatusBadRequest)
+ return
+ }
+ nodeCount, err := strconv.Atoi(req.FormValue("node-count"))
+ if err != nil {
+ http.Error(w, "invalid node-count provided", http.StatusBadRequest)
+ return
+ }
+ s.mockerStop = make(chan struct{})
+ go mockerFn(s.network, s.mockerStop, nodeCount)
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// StopMocker stops the mocker node simulation
+func (s *Server) StopMocker(w http.ResponseWriter, req *http.Request) {
+ s.mockerMtx.Lock()
+ defer s.mockerMtx.Unlock()
+ if s.mockerStop == nil {
+ http.Error(w, "stop channel not initialized", http.StatusInternalServerError)
+ return
+ }
+ close(s.mockerStop)
+ s.mockerStop = nil
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// GetMockerList returns a list of available mockers
+func (s *Server) GetMockers(w http.ResponseWriter, req *http.Request) {
+
+ list := GetMockerList()
+ s.JSON(w, http.StatusOK, list)
+}
+
+// ResetNetwork resets all properties of a network to its initial (empty) state
+func (s *Server) ResetNetwork(w http.ResponseWriter, req *http.Request) {
+ s.network.Reset()
+
+ w.WriteHeader(http.StatusOK)
+}
+
// StreamNetworkEvents streams network events as a server-sent-events stream
func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) {
events := make(chan *Event)
diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go
new file mode 100644
index 000000000..c38e28855
--- /dev/null
+++ b/p2p/simulations/mocker.go
@@ -0,0 +1,192 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package simulations simulates p2p networks.
+// A mocker simulates starting and stopping real nodes in a network.
+package simulations
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+//a map of mocker names to its function
+var mockerList = map[string]func(net *Network, quit chan struct{}, nodeCount int){
+ "startStop": startStop,
+ "probabilistic": probabilistic,
+ "boot": boot,
+}
+
+//Lookup a mocker by its name, returns the mockerFn
+func LookupMocker(mockerType string) func(net *Network, quit chan struct{}, nodeCount int) {
+ return mockerList[mockerType]
+}
+
+//Get a list of mockers (keys of the map)
+//Useful for frontend to build available mocker selection
+func GetMockerList() []string {
+ list := make([]string, 0, len(mockerList))
+ for k := range mockerList {
+ list = append(list, k)
+ }
+ return list
+}
+
+//The boot mockerFn only connects the node in a ring and doesn't do anything else
+func boot(net *Network, quit chan struct{}, nodeCount int) {
+ _, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+}
+
+//The startStop mockerFn stops and starts nodes in a defined period (ticker)
+func startStop(net *Network, quit chan struct{}, nodeCount int) {
+ nodes, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+ tick := time.NewTicker(10 * time.Second)
+ defer tick.Stop()
+ for {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-tick.C:
+ id := nodes[rand.Intn(len(nodes))]
+ log.Info("stopping node", "id", id)
+ if err := net.Stop(id); err != nil {
+ log.Error("error stopping node", "id", id, "err", err)
+ return
+ }
+
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-time.After(3 * time.Second):
+ }
+
+ log.Debug("starting node", "id", id)
+ if err := net.Start(id); err != nil {
+ log.Error("error starting node", "id", id, "err", err)
+ return
+ }
+ }
+ }
+}
+
+//The probabilistic mocker func has a more probabilistic pattern
+//(the implementation could probably be improved):
+//nodes are connected in a ring, then a varying number of random nodes is selected,
+//mocker then stops and starts them in random intervals, and continues the loop
+func probabilistic(net *Network, quit chan struct{}, nodeCount int) {
+ nodes, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+ for {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ default:
+ }
+ var lowid, highid int
+ var wg sync.WaitGroup
+ randWait := time.Duration(rand.Intn(5000)+1000) * time.Millisecond
+ rand1 := rand.Intn(nodeCount - 1)
+ rand2 := rand.Intn(nodeCount - 1)
+ if rand1 < rand2 {
+ lowid = rand1
+ highid = rand2
+ } else if rand1 > rand2 {
+ highid = rand1
+ lowid = rand2
+ } else {
+ if rand1 == 0 {
+ rand2 = 9
+ } else if rand1 == 9 {
+ rand1 = 0
+ }
+ lowid = rand1
+ highid = rand2
+ }
+ var steps = highid - lowid
+ wg.Add(steps)
+ for i := lowid; i < highid; i++ {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-time.After(randWait):
+ }
+ log.Debug(fmt.Sprintf("node %v shutting down", nodes[i]))
+ err := net.Stop(nodes[i])
+ if err != nil {
+ log.Error(fmt.Sprintf("Error stopping node %s", nodes[i]))
+ wg.Done()
+ continue
+ }
+ go func(id discover.NodeID) {
+ time.Sleep(randWait)
+ err := net.Start(id)
+ if err != nil {
+ log.Error(fmt.Sprintf("Error starting node %s", id))
+ }
+ wg.Done()
+ }(nodes[i])
+ }
+ wg.Wait()
+ }
+
+}
+
+//connect nodeCount number of nodes in a ring
+func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) {
+ ids := make([]discover.NodeID, nodeCount)
+ for i := 0; i < nodeCount; i++ {
+ node, err := net.NewNode()
+ if err != nil {
+ log.Error("Error creating a node! %s", err)
+ return nil, err
+ }
+ ids[i] = node.ID()
+ }
+
+ for _, id := range ids {
+ if err := net.Start(id); err != nil {
+ log.Error("Error starting a node! %s", err)
+ return nil, err
+ }
+ log.Debug(fmt.Sprintf("node %v starting up", id))
+ }
+ for i, id := range ids {
+ peerID := ids[(i+1)%len(ids)]
+ if err := net.Connect(id, peerID); err != nil {
+ log.Error("Error connecting a node to a peer! %s", err)
+ return nil, err
+ }
+ }
+
+ return ids, nil
+}
diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go
new file mode 100644
index 000000000..de8ec0b33
--- /dev/null
+++ b/p2p/simulations/mocker_test.go
@@ -0,0 +1,171 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package simulations simulates p2p networks.
+// A mokcer simulates starting and stopping real nodes in a network.
+package simulations
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+func TestMocker(t *testing.T) {
+ //start the simulation HTTP server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ //create a client
+ client := NewClient(s.URL)
+
+ //start the network
+ err := client.StartNetwork()
+ if err != nil {
+ t.Fatalf("Could not start test network: %s", err)
+ }
+ //stop the network to terminate
+ defer func() {
+ err = client.StopNetwork()
+ if err != nil {
+ t.Fatalf("Could not stop test network: %s", err)
+ }
+ }()
+
+ //get the list of available mocker types
+ resp, err := http.Get(s.URL + "/mocker")
+ if err != nil {
+ t.Fatalf("Could not get mocker list: %s", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received, expected 200, got %d", resp.StatusCode)
+ }
+
+ //check the list is at least 1 in size
+ var mockerlist []string
+ err = json.NewDecoder(resp.Body).Decode(&mockerlist)
+ if err != nil {
+ t.Fatalf("Error decoding JSON mockerlist: %s", err)
+ }
+
+ if len(mockerlist) < 1 {
+ t.Fatalf("No mockers available")
+ }
+
+ nodeCount := 10
+ var wg sync.WaitGroup
+
+ events := make(chan *Event, 10)
+ var opts SubscribeOpts
+ sub, err := client.SubscribeNetwork(events, opts)
+ defer sub.Unsubscribe()
+ //wait until all nodes are started and connected
+ //store every node up event in a map (value is irrelevant, mimic Set datatype)
+ nodemap := make(map[discover.NodeID]bool)
+ wg.Add(1)
+ nodesComplete := false
+ connCount := 0
+ go func() {
+ for {
+ select {
+ case event := <-events:
+ //if the event is a node Up event only
+ if event.Node != nil && event.Node.Up {
+ //add the correspondent node ID to the map
+ nodemap[event.Node.Config.ID] = true
+ //this means all nodes got a nodeUp event, so we can continue the test
+ if len(nodemap) == nodeCount {
+ nodesComplete = true
+ //wait for 3s as the mocker will need time to connect the nodes
+ //time.Sleep( 3 *time.Second)
+ }
+ } else if event.Conn != nil && nodesComplete {
+ connCount += 1
+ if connCount == (nodeCount-1)*2 {
+ wg.Done()
+ return
+ }
+ }
+ case <-time.After(30 * time.Second):
+ wg.Done()
+ t.Fatalf("Timeout waiting for nodes being started up!")
+ }
+ }
+ }()
+
+ //take the last element of the mockerlist as the default mocker-type to ensure one is enabled
+ mockertype := mockerlist[len(mockerlist)-1]
+ //still, use hardcoded "probabilistic" one if available ;)
+ for _, m := range mockerlist {
+ if m == "probabilistic" {
+ mockertype = m
+ break
+ }
+ }
+ //start the mocker with nodeCount number of nodes
+ resp, err = http.PostForm(s.URL+"/mocker/start", url.Values{"mocker-type": {mockertype}, "node-count": {strconv.Itoa(nodeCount)}})
+ if err != nil {
+ t.Fatalf("Could not start mocker: %s", err)
+ }
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received for starting mocker, expected 200, got %d", resp.StatusCode)
+ }
+
+ wg.Wait()
+
+ //check there are nodeCount number of nodes in the network
+ nodes_info, err := client.GetNodes()
+ if err != nil {
+ t.Fatalf("Could not get nodes list: %s", err)
+ }
+
+ if len(nodes_info) != nodeCount {
+ t.Fatalf("Expected %d number of nodes, got: %d", nodeCount, len(nodes_info))
+ }
+
+ //stop the mocker
+ resp, err = http.Post(s.URL+"/mocker/stop", "", nil)
+ if err != nil {
+ t.Fatalf("Could not stop mocker: %s", err)
+ }
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received for stopping mocker, expected 200, got %d", resp.StatusCode)
+ }
+
+ //reset the network
+ _, err = http.Post(s.URL+"/reset", "", nil)
+ if err != nil {
+ t.Fatalf("Could not reset network: %s", err)
+ }
+
+ //now the number of nodes in the network should be zero
+ nodes_info, err = client.GetNodes()
+ if err != nil {
+ t.Fatalf("Could not get nodes list: %s", err)
+ }
+
+ if len(nodes_info) != 0 {
+ t.Fatalf("Expected empty list of nodes, got: %d", len(nodes_info))
+ }
+}
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index fd8777673..caf428ece 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -403,9 +403,8 @@ func (self *Network) getNodeByName(name string) *Node {
func (self *Network) GetNodes() (nodes []*Node) {
self.lock.Lock()
defer self.lock.Unlock()
- for _, node := range self.Nodes {
- nodes = append(nodes, node)
- }
+
+ nodes = append(nodes, self.Nodes...)
return nodes
}
@@ -477,7 +476,7 @@ func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
if err != nil {
return nil, err
}
- if time.Now().Sub(conn.initiated) < dialBanTimeout {
+ if time.Since(conn.initiated) < dialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
if conn.Up {
@@ -502,6 +501,20 @@ func (self *Network) Shutdown() {
close(self.quitc)
}
+//Reset resets all network properties:
+//emtpies the nodes and the connection list
+func (self *Network) Reset() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ //re-initialize the maps
+ self.connMap = make(map[string]int)
+ self.nodeMap = make(map[discover.NodeID]int)
+
+ self.Nodes = nil
+ self.Conns = nil
+}
+
// Node is a wrapper around adapters.Node which is used to track the status
// of a node in the network
type Node struct {
@@ -665,6 +678,12 @@ func (self *Network) Load(snap *Snapshot) error {
}
}
for _, conn := range snap.Conns {
+
+ if !self.GetNode(conn.One).Up || !self.GetNode(conn.Other).Up {
+ //in this case, at least one of the nodes of a connection is not up,
+ //so it would result in the snapshot `Load` to fail
+ continue
+ }
if err := self.Connect(conn.One, conn.Other); err != nil {
return err
}