aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/dial.go27
-rw-r--r--p2p/discover/database.go6
-rw-r--r--p2p/discover/table.go2
-rw-r--r--p2p/discv5/net.go2
-rw-r--r--p2p/discv5/ntp.go8
-rw-r--r--p2p/discv5/ticket.go12
-rw-r--r--p2p/peer.go5
-rw-r--r--p2p/server.go84
-rw-r--r--p2p/server_test.go10
-rw-r--r--p2p/simulations/adapters/docker.go2
-rw-r--r--p2p/simulations/adapters/exec.go1
-rw-r--r--p2p/simulations/adapters/inproc.go4
-rw-r--r--p2p/simulations/adapters/state.go35
-rw-r--r--p2p/simulations/adapters/types.go6
-rw-r--r--p2p/simulations/http.go64
-rw-r--r--p2p/simulations/mocker.go192
-rw-r--r--p2p/simulations/mocker_test.go171
-rw-r--r--p2p/simulations/network.go86
18 files changed, 643 insertions, 74 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index 2d9e3a0ed..f5ff2c211 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -157,7 +157,7 @@ func (s *dialstate) removeStatic(n *discover.Node) {
}
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
- if s.start == (time.Time{}) {
+ if s.start.IsZero() {
s.start = now
}
@@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) {
return
}
}
- success := t.dial(srv, t.dest)
- // Try resolving the ID of static nodes if dialing failed.
- if !success && t.flags&staticDialedConn != 0 {
- if t.resolve(srv) {
- t.dial(srv, t.dest)
+ err := t.dial(srv, t.dest)
+ if err != nil {
+ log.Trace("Dial error", "task", t, "err", err)
+ // Try resolving the ID of static nodes if dialing failed.
+ if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
+ if t.resolve(srv) {
+ t.dial(srv, t.dest)
+ }
}
}
}
@@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool {
return true
}
+type dialError struct {
+ error
+}
+
// dial performs the actual connection attempt.
-func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
+func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
- log.Trace("Dial error", "task", t, "err", err)
- return false
+ return &dialError{err}
}
mfd := newMeteredConn(fd, false)
- srv.SetupConn(mfd, t.flags, dest)
- return true
+ return srv.SetupConn(mfd, t.flags, dest)
}
func (t *dialTask) String() string {
diff --git a/p2p/discover/database.go b/p2p/discover/database.go
index 7206a63c6..b136609f2 100644
--- a/p2p/discover/database.go
+++ b/p2p/discover/database.go
@@ -226,14 +226,14 @@ func (db *nodeDB) ensureExpirer() {
// expirer should be started in a go routine, and is responsible for looping ad
// infinitum and dropping stale data from the database.
func (db *nodeDB) expirer() {
- tick := time.Tick(nodeDBCleanupCycle)
+ tick := time.NewTicker(nodeDBCleanupCycle)
+ defer tick.Stop()
for {
select {
- case <-tick:
+ case <-tick.C:
if err := db.expireNodes(); err != nil {
log.Error("Failed to expire nodedb items", "err", err)
}
-
case <-db.quit:
return
}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 2f5a26c34..ec4eb94ad 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -427,7 +427,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
rc := make(chan *Node, len(nodes))
for i := range nodes {
go func(n *Node) {
- nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCP))
+ nn, _ := tab.bond(false, n.ID, n.addr(), n.TCP)
rc <- nn
}(nodes[i])
}
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index a39cfcc64..2fbb60824 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -684,7 +684,7 @@ func (net *Network) refresh(done chan<- struct{}) {
seeds = net.nursery
}
if len(seeds) == 0 {
- log.Trace(fmt.Sprint("no seed nodes found"))
+ log.Trace("no seed nodes found")
close(done)
return
}
diff --git a/p2p/discv5/ntp.go b/p2p/discv5/ntp.go
index f78d5dc43..4fb5f657a 100644
--- a/p2p/discv5/ntp.go
+++ b/p2p/discv5/ntp.go
@@ -54,10 +54,10 @@ func checkClockDrift() {
howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings")
separator := strings.Repeat("-", len(warning))
- log.Warn(fmt.Sprint(separator))
- log.Warn(fmt.Sprint(warning))
- log.Warn(fmt.Sprint(howtofix))
- log.Warn(fmt.Sprint(separator))
+ log.Warn(separator)
+ log.Warn(warning)
+ log.Warn(howtofix)
+ log.Warn(separator)
} else {
log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift))
}
diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go
index 48dd114f0..193cef4be 100644
--- a/p2p/discv5/ticket.go
+++ b/p2p/discv5/ticket.go
@@ -398,12 +398,12 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration
//s.removeExcessTickets(topic)
if len(tickets.buckets) != 0 {
empty = false
- if list := tickets.buckets[bucket]; list != nil {
- for _, ref := range list {
- //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
- if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
- nextTicket = ref
- }
+
+ list := tickets.buckets[bucket]
+ for _, ref := range list {
+ //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
+ if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
+ nextTicket = ref
}
}
}
diff --git a/p2p/peer.go b/p2p/peer.go
index 1d2b726e8..bad1c8c8b 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -160,6 +160,11 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
}
+// Inbound returns true if the peer is an inbound connection
+func (p *Peer) Inbound() bool {
+ return p.rw.flags&inboundConn != 0
+}
+
func newPeer(conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
diff --git a/p2p/server.go b/p2p/server.go
index d1d578401..922df55ba 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -139,6 +139,9 @@ type Config struct {
// If EnableMsgEvents is set then the server will emit PeerEvents
// whenever a message is sent to or received from a peer
EnableMsgEvents bool
+
+ // Logger is a custom logger to use with the p2p.Server.
+ Logger log.Logger
}
// Server manages all peer connections.
@@ -172,6 +175,7 @@ type Server struct {
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
+ log log.Logger
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) {
return errors.New("server already running")
}
srv.running = true
- log.Info("Starting P2P networking")
+ srv.log = srv.Config.Logger
+ if srv.log == nil {
+ srv.log = log.New()
+ }
+ srv.log.Info("Starting P2P networking")
// static fields
if srv.PrivateKey == nil {
@@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) {
}
}
if srv.NoDial && srv.ListenAddr == "" {
- log.Warn("P2P server will be useless, neither dialing nor listening")
+ srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
@@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
- log.Trace("New dial task", "task", t)
+ srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
@@ -517,13 +525,13 @@ running:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
- log.Debug("Adding static node", "node", n)
+ srv.log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
- log.Debug("Removing static node", "node", n)
+ srv.log.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
@@ -536,7 +544,7 @@ running:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
- log.Trace("Dial task done", "task", t)
+ srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
@@ -565,7 +573,7 @@ running:
p.events = &srv.peerFeed
}
name := truncateName(c.name)
- log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
+ srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
go srv.runPeer(p)
}
@@ -585,7 +593,7 @@ running:
}
}
- log.Trace("P2P networking is spinning down")
+ srv.log.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
@@ -639,7 +647,7 @@ type tempError interface {
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
- log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
+ srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
@@ -664,10 +672,10 @@ func (srv *Server) listenLoop() {
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
- log.Debug("Temporary read error", "err", err)
+ srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
- log.Debug("Read error", "err", err)
+ srv.log.Debug("Read error", "err", err)
return
}
break
@@ -676,7 +684,7 @@ func (srv *Server) listenLoop() {
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
- log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
+ srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
@@ -684,7 +692,7 @@ func (srv *Server) listenLoop() {
}
fd = newMeteredConn(fd, true)
- log.Trace("Accepted connection", "addr", fd.RemoteAddr())
+ srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
// Spawn the handler. It will give the slot back when the connection
// has been established.
@@ -698,55 +706,65 @@ func (srv *Server) listenLoop() {
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
+ self := srv.Self()
+ if self == nil {
+ return errors.New("shutdown")
+ }
+ c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
+ err := srv.setupConn(c, flags, dialDest)
+ if err != nil {
+ c.close(err)
+ srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
+ }
+ return err
+}
+
+func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
- c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
if !running {
- c.close(errServerStopped)
- return
+ return errServerStopped
}
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
- log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
- c.close(err)
- return
+ srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
+ return err
}
- clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
+ clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
- c.close(DiscUnexpectedIdentity)
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
- return
+ return DiscUnexpectedIdentity
}
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ err = srv.checkpoint(c, srv.posthandshake)
+ if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
- c.close(err)
- return
+ return err
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
- c.close(err)
- return
+ return err
}
if phs.ID != c.id {
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
- c.close(DiscUnexpectedIdentity)
- return
+ return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
- if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ err = srv.checkpoint(c, srv.addpeer)
+ if err != nil {
clog.Trace("Rejected peer", "err", err)
- c.close(err)
- return
+ return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
+ clog.Trace("connection set up", "inbound", dialDest == nil)
+ return nil
}
func truncateName(s string) string {
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 11dd83e5d..10c36528e 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
)
@@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) {
quit: make(chan struct{}),
ntab: fakeTable{},
running: true,
+ log: log.New(),
}
srv.loopWG.Add(1)
go func() {
@@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) {
}
var (
- srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
+ srv = &Server{
+ quit: make(chan struct{}),
+ ntab: fakeTable{},
+ running: true,
+ log: log.New(),
+ }
done = make(chan *testTask)
start, end = 0, 0
)
@@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) {
Protocols: []Protocol{discard},
},
newTransport: func(fd net.Conn) transport { return test.tt },
+ log: log.New(),
}
if !test.dontstart {
if err := srv.Start(); err != nil {
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
index 022314b3d..8ef5629fb 100644
--- a/p2p/simulations/adapters/docker.go
+++ b/p2p/simulations/adapters/docker.go
@@ -28,6 +28,7 @@ import (
"strings"
"github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
)
@@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil
conf.Stack.NoUSB = true
+ conf.Stack.Logger = log.New("node.id", config.ID.String())
node := &DockerNode{
ExecNode: ExecNode{
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
index bdb92cc1d..a566fb27d 100644
--- a/p2p/simulations/adapters/exec.go
+++ b/p2p/simulations/adapters/exec.go
@@ -359,6 +359,7 @@ func execP2PNode() {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+ conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
// use explicit IP address in ListenAddr so that Enode URL is usable
externalIP := func() string {
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
index c97188def..48d7c1730 100644
--- a/p2p/simulations/adapters/inproc.go
+++ b/p2p/simulations/adapters/inproc.go
@@ -24,6 +24,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
Dialer: s,
EnableMsgEvents: true,
},
- NoUSB: true,
+ NoUSB: true,
+ Logger: log.New("node.id", id.String()),
})
if err != nil {
return nil, err
diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go
new file mode 100644
index 000000000..8b1dfef90
--- /dev/null
+++ b/p2p/simulations/adapters/state.go
@@ -0,0 +1,35 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package adapters
+
+type SimStateStore struct {
+ m map[string][]byte
+}
+
+func (self *SimStateStore) Load(s string) ([]byte, error) {
+ return self.m[s], nil
+}
+
+func (self *SimStateStore) Save(s string, data []byte) error {
+ self.m[s] = data
+ return nil
+}
+
+func NewSimStateStore() *SimStateStore {
+ return &SimStateStore{
+ make(map[string][]byte),
+ }
+}
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
index ed6cfc504..5b4b47fe2 100644
--- a/p2p/simulations/adapters/types.go
+++ b/p2p/simulations/adapters/types.go
@@ -83,6 +83,9 @@ type NodeConfig struct {
// stack to encrypt communications
PrivateKey *ecdsa.PrivateKey
+ // Enable peer events for Msgs
+ EnableMsgEvents bool
+
// Name is a human friendly name for the node like "node01"
Name string
@@ -91,6 +94,9 @@ type NodeConfig struct {
// contained in SimAdapter.services, for other nodes it should be
// services registered by calling the RegisterService function)
Services []string
+
+ // function to sanction or prevent suggesting a peer
+ Reachable func(id discover.NodeID) bool
}
// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go
index 3fa8b9292..97dd742e8 100644
--- a/p2p/simulations/http.go
+++ b/p2p/simulations/http.go
@@ -27,6 +27,7 @@ import (
"net/http"
"strconv"
"strings"
+ "sync"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
@@ -263,8 +264,10 @@ func (c *Client) Send(method, path string, in, out interface{}) error {
// Server is an HTTP server providing an API to manage a simulation network
type Server struct {
- router *httprouter.Router
- network *Network
+ router *httprouter.Router
+ network *Network
+ mockerStop chan struct{} // when set, stops the current mocker
+ mockerMtx sync.Mutex // synchronises access to the mockerStop field
}
// NewServer returns a new simulation API server
@@ -278,6 +281,10 @@ func NewServer(network *Network) *Server {
s.GET("/", s.GetNetwork)
s.POST("/start", s.StartNetwork)
s.POST("/stop", s.StopNetwork)
+ s.POST("/mocker/start", s.StartMocker)
+ s.POST("/mocker/stop", s.StopMocker)
+ s.GET("/mocker", s.GetMockers)
+ s.POST("/reset", s.ResetNetwork)
s.GET("/events", s.StreamNetworkEvents)
s.GET("/snapshot", s.CreateSnapshot)
s.POST("/snapshot", s.LoadSnapshot)
@@ -318,6 +325,59 @@ func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}
+// StartMocker starts the mocker node simulation
+func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) {
+ s.mockerMtx.Lock()
+ defer s.mockerMtx.Unlock()
+ if s.mockerStop != nil {
+ http.Error(w, "mocker already running", http.StatusInternalServerError)
+ return
+ }
+ mockerType := req.FormValue("mocker-type")
+ mockerFn := LookupMocker(mockerType)
+ if mockerFn == nil {
+ http.Error(w, fmt.Sprintf("unknown mocker type %q", mockerType), http.StatusBadRequest)
+ return
+ }
+ nodeCount, err := strconv.Atoi(req.FormValue("node-count"))
+ if err != nil {
+ http.Error(w, "invalid node-count provided", http.StatusBadRequest)
+ return
+ }
+ s.mockerStop = make(chan struct{})
+ go mockerFn(s.network, s.mockerStop, nodeCount)
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// StopMocker stops the mocker node simulation
+func (s *Server) StopMocker(w http.ResponseWriter, req *http.Request) {
+ s.mockerMtx.Lock()
+ defer s.mockerMtx.Unlock()
+ if s.mockerStop == nil {
+ http.Error(w, "stop channel not initialized", http.StatusInternalServerError)
+ return
+ }
+ close(s.mockerStop)
+ s.mockerStop = nil
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// GetMockerList returns a list of available mockers
+func (s *Server) GetMockers(w http.ResponseWriter, req *http.Request) {
+
+ list := GetMockerList()
+ s.JSON(w, http.StatusOK, list)
+}
+
+// ResetNetwork resets all properties of a network to its initial (empty) state
+func (s *Server) ResetNetwork(w http.ResponseWriter, req *http.Request) {
+ s.network.Reset()
+
+ w.WriteHeader(http.StatusOK)
+}
+
// StreamNetworkEvents streams network events as a server-sent-events stream
func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) {
events := make(chan *Event)
diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go
new file mode 100644
index 000000000..c38e28855
--- /dev/null
+++ b/p2p/simulations/mocker.go
@@ -0,0 +1,192 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package simulations simulates p2p networks.
+// A mocker simulates starting and stopping real nodes in a network.
+package simulations
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+//a map of mocker names to its function
+var mockerList = map[string]func(net *Network, quit chan struct{}, nodeCount int){
+ "startStop": startStop,
+ "probabilistic": probabilistic,
+ "boot": boot,
+}
+
+//Lookup a mocker by its name, returns the mockerFn
+func LookupMocker(mockerType string) func(net *Network, quit chan struct{}, nodeCount int) {
+ return mockerList[mockerType]
+}
+
+//Get a list of mockers (keys of the map)
+//Useful for frontend to build available mocker selection
+func GetMockerList() []string {
+ list := make([]string, 0, len(mockerList))
+ for k := range mockerList {
+ list = append(list, k)
+ }
+ return list
+}
+
+//The boot mockerFn only connects the node in a ring and doesn't do anything else
+func boot(net *Network, quit chan struct{}, nodeCount int) {
+ _, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+}
+
+//The startStop mockerFn stops and starts nodes in a defined period (ticker)
+func startStop(net *Network, quit chan struct{}, nodeCount int) {
+ nodes, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+ tick := time.NewTicker(10 * time.Second)
+ defer tick.Stop()
+ for {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-tick.C:
+ id := nodes[rand.Intn(len(nodes))]
+ log.Info("stopping node", "id", id)
+ if err := net.Stop(id); err != nil {
+ log.Error("error stopping node", "id", id, "err", err)
+ return
+ }
+
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-time.After(3 * time.Second):
+ }
+
+ log.Debug("starting node", "id", id)
+ if err := net.Start(id); err != nil {
+ log.Error("error starting node", "id", id, "err", err)
+ return
+ }
+ }
+ }
+}
+
+//The probabilistic mocker func has a more probabilistic pattern
+//(the implementation could probably be improved):
+//nodes are connected in a ring, then a varying number of random nodes is selected,
+//mocker then stops and starts them in random intervals, and continues the loop
+func probabilistic(net *Network, quit chan struct{}, nodeCount int) {
+ nodes, err := connectNodesInRing(net, nodeCount)
+ if err != nil {
+ panic("Could not startup node network for mocker")
+ }
+ for {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ default:
+ }
+ var lowid, highid int
+ var wg sync.WaitGroup
+ randWait := time.Duration(rand.Intn(5000)+1000) * time.Millisecond
+ rand1 := rand.Intn(nodeCount - 1)
+ rand2 := rand.Intn(nodeCount - 1)
+ if rand1 < rand2 {
+ lowid = rand1
+ highid = rand2
+ } else if rand1 > rand2 {
+ highid = rand1
+ lowid = rand2
+ } else {
+ if rand1 == 0 {
+ rand2 = 9
+ } else if rand1 == 9 {
+ rand1 = 0
+ }
+ lowid = rand1
+ highid = rand2
+ }
+ var steps = highid - lowid
+ wg.Add(steps)
+ for i := lowid; i < highid; i++ {
+ select {
+ case <-quit:
+ log.Info("Terminating simulation loop")
+ return
+ case <-time.After(randWait):
+ }
+ log.Debug(fmt.Sprintf("node %v shutting down", nodes[i]))
+ err := net.Stop(nodes[i])
+ if err != nil {
+ log.Error(fmt.Sprintf("Error stopping node %s", nodes[i]))
+ wg.Done()
+ continue
+ }
+ go func(id discover.NodeID) {
+ time.Sleep(randWait)
+ err := net.Start(id)
+ if err != nil {
+ log.Error(fmt.Sprintf("Error starting node %s", id))
+ }
+ wg.Done()
+ }(nodes[i])
+ }
+ wg.Wait()
+ }
+
+}
+
+//connect nodeCount number of nodes in a ring
+func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) {
+ ids := make([]discover.NodeID, nodeCount)
+ for i := 0; i < nodeCount; i++ {
+ node, err := net.NewNode()
+ if err != nil {
+ log.Error("Error creating a node! %s", err)
+ return nil, err
+ }
+ ids[i] = node.ID()
+ }
+
+ for _, id := range ids {
+ if err := net.Start(id); err != nil {
+ log.Error("Error starting a node! %s", err)
+ return nil, err
+ }
+ log.Debug(fmt.Sprintf("node %v starting up", id))
+ }
+ for i, id := range ids {
+ peerID := ids[(i+1)%len(ids)]
+ if err := net.Connect(id, peerID); err != nil {
+ log.Error("Error connecting a node to a peer! %s", err)
+ return nil, err
+ }
+ }
+
+ return ids, nil
+}
diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go
new file mode 100644
index 000000000..de8ec0b33
--- /dev/null
+++ b/p2p/simulations/mocker_test.go
@@ -0,0 +1,171 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package simulations simulates p2p networks.
+// A mokcer simulates starting and stopping real nodes in a network.
+package simulations
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+func TestMocker(t *testing.T) {
+ //start the simulation HTTP server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ //create a client
+ client := NewClient(s.URL)
+
+ //start the network
+ err := client.StartNetwork()
+ if err != nil {
+ t.Fatalf("Could not start test network: %s", err)
+ }
+ //stop the network to terminate
+ defer func() {
+ err = client.StopNetwork()
+ if err != nil {
+ t.Fatalf("Could not stop test network: %s", err)
+ }
+ }()
+
+ //get the list of available mocker types
+ resp, err := http.Get(s.URL + "/mocker")
+ if err != nil {
+ t.Fatalf("Could not get mocker list: %s", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received, expected 200, got %d", resp.StatusCode)
+ }
+
+ //check the list is at least 1 in size
+ var mockerlist []string
+ err = json.NewDecoder(resp.Body).Decode(&mockerlist)
+ if err != nil {
+ t.Fatalf("Error decoding JSON mockerlist: %s", err)
+ }
+
+ if len(mockerlist) < 1 {
+ t.Fatalf("No mockers available")
+ }
+
+ nodeCount := 10
+ var wg sync.WaitGroup
+
+ events := make(chan *Event, 10)
+ var opts SubscribeOpts
+ sub, err := client.SubscribeNetwork(events, opts)
+ defer sub.Unsubscribe()
+ //wait until all nodes are started and connected
+ //store every node up event in a map (value is irrelevant, mimic Set datatype)
+ nodemap := make(map[discover.NodeID]bool)
+ wg.Add(1)
+ nodesComplete := false
+ connCount := 0
+ go func() {
+ for {
+ select {
+ case event := <-events:
+ //if the event is a node Up event only
+ if event.Node != nil && event.Node.Up {
+ //add the correspondent node ID to the map
+ nodemap[event.Node.Config.ID] = true
+ //this means all nodes got a nodeUp event, so we can continue the test
+ if len(nodemap) == nodeCount {
+ nodesComplete = true
+ //wait for 3s as the mocker will need time to connect the nodes
+ //time.Sleep( 3 *time.Second)
+ }
+ } else if event.Conn != nil && nodesComplete {
+ connCount += 1
+ if connCount == (nodeCount-1)*2 {
+ wg.Done()
+ return
+ }
+ }
+ case <-time.After(30 * time.Second):
+ wg.Done()
+ t.Fatalf("Timeout waiting for nodes being started up!")
+ }
+ }
+ }()
+
+ //take the last element of the mockerlist as the default mocker-type to ensure one is enabled
+ mockertype := mockerlist[len(mockerlist)-1]
+ //still, use hardcoded "probabilistic" one if available ;)
+ for _, m := range mockerlist {
+ if m == "probabilistic" {
+ mockertype = m
+ break
+ }
+ }
+ //start the mocker with nodeCount number of nodes
+ resp, err = http.PostForm(s.URL+"/mocker/start", url.Values{"mocker-type": {mockertype}, "node-count": {strconv.Itoa(nodeCount)}})
+ if err != nil {
+ t.Fatalf("Could not start mocker: %s", err)
+ }
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received for starting mocker, expected 200, got %d", resp.StatusCode)
+ }
+
+ wg.Wait()
+
+ //check there are nodeCount number of nodes in the network
+ nodes_info, err := client.GetNodes()
+ if err != nil {
+ t.Fatalf("Could not get nodes list: %s", err)
+ }
+
+ if len(nodes_info) != nodeCount {
+ t.Fatalf("Expected %d number of nodes, got: %d", nodeCount, len(nodes_info))
+ }
+
+ //stop the mocker
+ resp, err = http.Post(s.URL+"/mocker/stop", "", nil)
+ if err != nil {
+ t.Fatalf("Could not stop mocker: %s", err)
+ }
+ if resp.StatusCode != 200 {
+ t.Fatalf("Invalid Status Code received for stopping mocker, expected 200, got %d", resp.StatusCode)
+ }
+
+ //reset the network
+ _, err = http.Post(s.URL+"/reset", "", nil)
+ if err != nil {
+ t.Fatalf("Could not reset network: %s", err)
+ }
+
+ //now the number of nodes in the network should be zero
+ nodes_info, err = client.GetNodes()
+ if err != nil {
+ t.Fatalf("Could not get nodes list: %s", err)
+ }
+
+ if len(nodes_info) != 0 {
+ t.Fatalf("Expected empty list of nodes, got: %d", len(nodes_info))
+ }
+}
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index 06890ffcf..caf428ece 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@@ -30,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
+var dialBanTimeout = 200 * time.Millisecond
+
// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
ID string `json:"id"`
@@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
+ if conf.Reachable == nil {
+ conf.Reachable = func(otherID discover.NodeID) bool {
+ _, err := self.InitConn(conf.ID, otherID)
+ return err == nil
+ }
+ }
// assign a name to the node if not set
if conf.Name == "" {
@@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error {
// method on the "one" node so that it connects to the "other" node
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
- conn, err := self.GetOrCreateConn(oneID, otherID)
+ conn, err := self.InitConn(oneID, otherID)
if err != nil {
return err
}
- if conn.Up {
- return fmt.Errorf("%v and %v already connected", oneID, otherID)
- }
- if err := conn.nodesUp(); err != nil {
- return err
- }
client, err := conn.one.Client()
if err != nil {
return err
@@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
- conn, err := self.GetOrCreateConn(one, other)
- if err != nil {
+ conn := self.GetConn(one, other)
+ if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
+ conn.initiated = time.Now().Add(-dialBanTimeout)
self.events.Send(NewEvent(conn))
return nil
}
@@ -396,10 +400,12 @@ func (self *Network) getNodeByName(name string) *Node {
}
// GetNodes returns the existing nodes
-func (self *Network) GetNodes() []*Node {
+func (self *Network) GetNodes() (nodes []*Node) {
self.lock.Lock()
defer self.lock.Unlock()
- return self.Nodes
+
+ nodes = append(nodes, self.Nodes...)
+ return nodes
}
// GetConn returns the connection which exists between "one" and "other"
@@ -415,6 +421,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
self.lock.Lock()
defer self.lock.Unlock()
+ return self.getOrCreateConn(oneID, otherID)
+}
+
+func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
if conn := self.getConn(oneID, otherID); conn != nil {
return conn, nil
}
@@ -448,6 +458,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
return self.Conns[i]
}
+// InitConn(one, other) retrieves the connectiton model for the connection between
+// peers one and other, or creates a new one if it does not exist
+// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
+// it checks if the connection is already up, and if the nodes are running
+// NOTE:
+// it also checks whether there has been recent attempt to connect the peers
+// this is cheating as the simulation is used as an oracle and know about
+// remote peers attempt to connect to a node which will then not initiate the connection
+func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if oneID == otherID {
+ return nil, fmt.Errorf("refusing to connect to self %v", oneID)
+ }
+ conn, err := self.getOrCreateConn(oneID, otherID)
+ if err != nil {
+ return nil, err
+ }
+ if time.Since(conn.initiated) < dialBanTimeout {
+ return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
+ }
+ if conn.Up {
+ return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
+ }
+ err = conn.nodesUp()
+ if err != nil {
+ return nil, fmt.Errorf("nodes not up: %v", err)
+ }
+ conn.initiated = time.Now()
+ return conn, nil
+}
+
// Shutdown stops all nodes in the network and closes the quit channel
func (self *Network) Shutdown() {
for _, node := range self.Nodes {
@@ -459,6 +501,20 @@ func (self *Network) Shutdown() {
close(self.quitc)
}
+//Reset resets all network properties:
+//emtpies the nodes and the connection list
+func (self *Network) Reset() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ //re-initialize the maps
+ self.connMap = make(map[string]int)
+ self.nodeMap = make(map[discover.NodeID]int)
+
+ self.Nodes = nil
+ self.Conns = nil
+}
+
// Node is a wrapper around adapters.Node which is used to track the status
// of a node in the network
type Node struct {
@@ -516,6 +572,8 @@ type Conn struct {
// Up tracks whether or not the connection is active
Up bool `json:"up"`
+ // Registers when the connection was grabbed to dial
+ initiated time.Time
one *Node
other *Node
@@ -620,6 +678,12 @@ func (self *Network) Load(snap *Snapshot) error {
}
}
for _, conn := range snap.Conns {
+
+ if !self.GetNode(conn.One).Up || !self.GetNode(conn.Other).Up {
+ //in this case, at least one of the nodes of a connection is not up,
+ //so it would result in the snapshot `Load` to fail
+ continue
+ }
if err := self.Connect(conn.One, conn.Other); err != nil {
return err
}