diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 27 | ||||
-rw-r--r-- | p2p/discover/database.go | 6 | ||||
-rw-r--r-- | p2p/discover/table.go | 2 | ||||
-rw-r--r-- | p2p/discv5/net.go | 2 | ||||
-rw-r--r-- | p2p/discv5/ntp.go | 8 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 12 | ||||
-rw-r--r-- | p2p/peer.go | 5 | ||||
-rw-r--r-- | p2p/server.go | 84 | ||||
-rw-r--r-- | p2p/server_test.go | 10 | ||||
-rw-r--r-- | p2p/simulations/adapters/docker.go | 2 | ||||
-rw-r--r-- | p2p/simulations/adapters/exec.go | 1 | ||||
-rw-r--r-- | p2p/simulations/adapters/inproc.go | 4 | ||||
-rw-r--r-- | p2p/simulations/adapters/state.go | 35 | ||||
-rw-r--r-- | p2p/simulations/adapters/types.go | 6 | ||||
-rw-r--r-- | p2p/simulations/http.go | 64 | ||||
-rw-r--r-- | p2p/simulations/mocker.go | 192 | ||||
-rw-r--r-- | p2p/simulations/mocker_test.go | 171 | ||||
-rw-r--r-- | p2p/simulations/network.go | 86 |
18 files changed, 643 insertions, 74 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index 2d9e3a0ed..f5ff2c211 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -157,7 +157,7 @@ func (s *dialstate) removeStatic(n *discover.Node) { } func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { - if s.start == (time.Time{}) { + if s.start.IsZero() { s.start = now } @@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) { return } } - success := t.dial(srv, t.dest) - // Try resolving the ID of static nodes if dialing failed. - if !success && t.flags&staticDialedConn != 0 { - if t.resolve(srv) { - t.dial(srv, t.dest) + err := t.dial(srv, t.dest) + if err != nil { + log.Trace("Dial error", "task", t, "err", err) + // Try resolving the ID of static nodes if dialing failed. + if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + t.dial(srv, t.dest) + } } } } @@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool { return true } +type dialError struct { + error +} + // dial performs the actual connection attempt. -func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { +func (t *dialTask) dial(srv *Server, dest *discover.Node) error { fd, err := srv.Dialer.Dial(dest) if err != nil { - log.Trace("Dial error", "task", t, "err", err) - return false + return &dialError{err} } mfd := newMeteredConn(fd, false) - srv.SetupConn(mfd, t.flags, dest) - return true + return srv.SetupConn(mfd, t.flags, dest) } func (t *dialTask) String() string { diff --git a/p2p/discover/database.go b/p2p/discover/database.go index 7206a63c6..b136609f2 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -226,14 +226,14 @@ func (db *nodeDB) ensureExpirer() { // expirer should be started in a go routine, and is responsible for looping ad // infinitum and dropping stale data from the database. func (db *nodeDB) expirer() { - tick := time.Tick(nodeDBCleanupCycle) + tick := time.NewTicker(nodeDBCleanupCycle) + defer tick.Stop() for { select { - case <-tick: + case <-tick.C: if err := db.expireNodes(); err != nil { log.Error("Failed to expire nodedb items", "err", err) } - case <-db.quit: return } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 2f5a26c34..ec4eb94ad 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -427,7 +427,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { rc := make(chan *Node, len(nodes)) for i := range nodes { go func(n *Node) { - nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCP)) + nn, _ := tab.bond(false, n.ID, n.addr(), n.TCP) rc <- nn }(nodes[i]) } diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index a39cfcc64..2fbb60824 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -684,7 +684,7 @@ func (net *Network) refresh(done chan<- struct{}) { seeds = net.nursery } if len(seeds) == 0 { - log.Trace(fmt.Sprint("no seed nodes found")) + log.Trace("no seed nodes found") close(done) return } diff --git a/p2p/discv5/ntp.go b/p2p/discv5/ntp.go index f78d5dc43..4fb5f657a 100644 --- a/p2p/discv5/ntp.go +++ b/p2p/discv5/ntp.go @@ -54,10 +54,10 @@ func checkClockDrift() { howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings") separator := strings.Repeat("-", len(warning)) - log.Warn(fmt.Sprint(separator)) - log.Warn(fmt.Sprint(warning)) - log.Warn(fmt.Sprint(howtofix)) - log.Warn(fmt.Sprint(separator)) + log.Warn(separator) + log.Warn(warning) + log.Warn(howtofix) + log.Warn(separator) } else { log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift)) } diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 48dd114f0..193cef4be 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -398,12 +398,12 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration //s.removeExcessTickets(topic) if len(tickets.buckets) != 0 { empty = false - if list := tickets.buckets[bucket]; list != nil { - for _, ref := range list { - //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now))) - if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() { - nextTicket = ref - } + + list := tickets.buckets[bucket] + for _, ref := range list { + //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now))) + if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() { + nextTicket = ref } } } diff --git a/p2p/peer.go b/p2p/peer.go index 1d2b726e8..bad1c8c8b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -160,6 +160,11 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) } +// Inbound returns true if the peer is an inbound connection +func (p *Peer) Inbound() bool { + return p.rw.flags&inboundConn != 0 +} + func newPeer(conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ diff --git a/p2p/server.go b/p2p/server.go index d1d578401..922df55ba 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -139,6 +139,9 @@ type Config struct { // If EnableMsgEvents is set then the server will emit PeerEvents // whenever a message is sent to or received from a peer EnableMsgEvents bool + + // Logger is a custom logger to use with the p2p.Server. + Logger log.Logger } // Server manages all peer connections. @@ -172,6 +175,7 @@ type Server struct { delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed + log log.Logger } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) { return errors.New("server already running") } srv.running = true - log.Info("Starting P2P networking") + srv.log = srv.Config.Logger + if srv.log == nil { + srv.log = log.New() + } + srv.log.Info("Starting P2P networking") // static fields if srv.PrivateKey == nil { @@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) { } } if srv.NoDial && srv.ListenAddr == "" { - log.Warn("P2P server will be useless, neither dialing nor listening") + srv.log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) @@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] - log.Trace("New dial task", "task", t) + srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } @@ -517,13 +525,13 @@ running: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - log.Debug("Adding static node", "node", n) + srv.log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - log.Debug("Removing static node", "node", n) + srv.log.Debug("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) @@ -536,7 +544,7 @@ running: // A task got done. Tell dialstate about it so it // can update its state and remove it from the active // tasks list. - log.Trace("Dial task done", "task", t) + srv.log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: @@ -565,7 +573,7 @@ running: p.events = &srv.peerFeed } name := truncateName(c.name) - log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p go srv.runPeer(p) } @@ -585,7 +593,7 @@ running: } } - log.Trace("P2P networking is spinning down") + srv.log.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { @@ -639,7 +647,7 @@ type tempError interface { // inbound connections. func (srv *Server) listenLoop() { defer srv.loopWG.Done() - log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) + srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. @@ -664,10 +672,10 @@ func (srv *Server) listenLoop() { for { fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { - log.Debug("Temporary read error", "err", err) + srv.log.Debug("Temporary read error", "err", err) continue } else if err != nil { - log.Debug("Read error", "err", err) + srv.log.Debug("Read error", "err", err) return } break @@ -676,7 +684,7 @@ func (srv *Server) listenLoop() { // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { - log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) + srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue @@ -684,7 +692,7 @@ func (srv *Server) listenLoop() { } fd = newMeteredConn(fd, true) - log.Trace("Accepted connection", "addr", fd.RemoteAddr()) + srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection // has been established. @@ -698,55 +706,65 @@ func (srv *Server) listenLoop() { // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error { + self := srv.Self() + if self == nil { + return errors.New("shutdown") + } + c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} + err := srv.setupConn(c, flags, dialDest) + if err != nil { + c.close(err) + srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) + } + return err +} + +func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running srv.lock.Unlock() - c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} if !running { - c.close(errServerStopped) - return + return errServerStopped } // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { - log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) - c.close(err) - return + srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) + return err } - clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) + clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { - c.close(DiscUnexpectedIdentity) clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) - return + return DiscUnexpectedIdentity } - if err := srv.checkpoint(c, srv.posthandshake); err != nil { + err = srv.checkpoint(c, srv.posthandshake) + if err != nil { clog.Trace("Rejected peer before protocol handshake", "err", err) - c.close(err) - return + return err } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed proto handshake", "err", err) - c.close(err) - return + return err } if phs.ID != c.id { clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) - c.close(DiscUnexpectedIdentity) - return + return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name - if err := srv.checkpoint(c, srv.addpeer); err != nil { + err = srv.checkpoint(c, srv.addpeer) + if err != nil { clog.Trace("Rejected peer", "err", err) - c.close(err) - return + return err } // If the checks completed successfully, runPeer has now been // launched by run. + clog.Trace("connection set up", "inbound", dialDest == nil) + return nil } func truncateName(s string) string { diff --git a/p2p/server_test.go b/p2p/server_test.go index 11dd83e5d..10c36528e 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) { quit: make(chan struct{}), ntab: fakeTable{}, running: true, + log: log.New(), } srv.loopWG.Add(1) go func() { @@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) { } var ( - srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true} + srv = &Server{ + quit: make(chan struct{}), + ntab: fakeTable{}, + running: true, + log: log.New(), + } done = make(chan *testTask) start, end = 0, 0 ) @@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) { Protocols: []Protocol{discard}, }, newTransport: func(fd net.Conn) transport { return test.tt }, + log: log.New(), } if !test.dontstart { if err := srv.Start(); err != nil { diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go index 022314b3d..8ef5629fb 100644 --- a/p2p/simulations/adapters/docker.go +++ b/p2p/simulations/adapters/docker.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true + conf.Stack.Logger = log.New("node.id", config.ID.String()) node := &DockerNode{ ExecNode: ExecNode{ diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index bdb92cc1d..a566fb27d 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -359,6 +359,7 @@ func execP2PNode() { log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) // use explicit IP address in ListenAddr so that Enode URL is usable externalIP := func() string { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index c97188def..48d7c1730 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { Dialer: s, EnableMsgEvents: true, }, - NoUSB: true, + NoUSB: true, + Logger: log.New("node.id", id.String()), }) if err != nil { return nil, err diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go new file mode 100644 index 000000000..8b1dfef90 --- /dev/null +++ b/p2p/simulations/adapters/state.go @@ -0,0 +1,35 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. +package adapters + +type SimStateStore struct { + m map[string][]byte +} + +func (self *SimStateStore) Load(s string) ([]byte, error) { + return self.m[s], nil +} + +func (self *SimStateStore) Save(s string, data []byte) error { + self.m[s] = data + return nil +} + +func NewSimStateStore() *SimStateStore { + return &SimStateStore{ + make(map[string][]byte), + } +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index ed6cfc504..5b4b47fe2 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -83,6 +83,9 @@ type NodeConfig struct { // stack to encrypt communications PrivateKey *ecdsa.PrivateKey + // Enable peer events for Msgs + EnableMsgEvents bool + // Name is a human friendly name for the node like "node01" Name string @@ -91,6 +94,9 @@ type NodeConfig struct { // contained in SimAdapter.services, for other nodes it should be // services registered by calling the RegisterService function) Services []string + + // function to sanction or prevent suggesting a peer + Reachable func(id discover.NodeID) bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 3fa8b9292..97dd742e8 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -27,6 +27,7 @@ import ( "net/http" "strconv" "strings" + "sync" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" @@ -263,8 +264,10 @@ func (c *Client) Send(method, path string, in, out interface{}) error { // Server is an HTTP server providing an API to manage a simulation network type Server struct { - router *httprouter.Router - network *Network + router *httprouter.Router + network *Network + mockerStop chan struct{} // when set, stops the current mocker + mockerMtx sync.Mutex // synchronises access to the mockerStop field } // NewServer returns a new simulation API server @@ -278,6 +281,10 @@ func NewServer(network *Network) *Server { s.GET("/", s.GetNetwork) s.POST("/start", s.StartNetwork) s.POST("/stop", s.StopNetwork) + s.POST("/mocker/start", s.StartMocker) + s.POST("/mocker/stop", s.StopMocker) + s.GET("/mocker", s.GetMockers) + s.POST("/reset", s.ResetNetwork) s.GET("/events", s.StreamNetworkEvents) s.GET("/snapshot", s.CreateSnapshot) s.POST("/snapshot", s.LoadSnapshot) @@ -318,6 +325,59 @@ func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } +// StartMocker starts the mocker node simulation +func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { + s.mockerMtx.Lock() + defer s.mockerMtx.Unlock() + if s.mockerStop != nil { + http.Error(w, "mocker already running", http.StatusInternalServerError) + return + } + mockerType := req.FormValue("mocker-type") + mockerFn := LookupMocker(mockerType) + if mockerFn == nil { + http.Error(w, fmt.Sprintf("unknown mocker type %q", mockerType), http.StatusBadRequest) + return + } + nodeCount, err := strconv.Atoi(req.FormValue("node-count")) + if err != nil { + http.Error(w, "invalid node-count provided", http.StatusBadRequest) + return + } + s.mockerStop = make(chan struct{}) + go mockerFn(s.network, s.mockerStop, nodeCount) + + w.WriteHeader(http.StatusOK) +} + +// StopMocker stops the mocker node simulation +func (s *Server) StopMocker(w http.ResponseWriter, req *http.Request) { + s.mockerMtx.Lock() + defer s.mockerMtx.Unlock() + if s.mockerStop == nil { + http.Error(w, "stop channel not initialized", http.StatusInternalServerError) + return + } + close(s.mockerStop) + s.mockerStop = nil + + w.WriteHeader(http.StatusOK) +} + +// GetMockerList returns a list of available mockers +func (s *Server) GetMockers(w http.ResponseWriter, req *http.Request) { + + list := GetMockerList() + s.JSON(w, http.StatusOK, list) +} + +// ResetNetwork resets all properties of a network to its initial (empty) state +func (s *Server) ResetNetwork(w http.ResponseWriter, req *http.Request) { + s.network.Reset() + + w.WriteHeader(http.StatusOK) +} + // StreamNetworkEvents streams network events as a server-sent-events stream func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { events := make(chan *Event) diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go new file mode 100644 index 000000000..c38e28855 --- /dev/null +++ b/p2p/simulations/mocker.go @@ -0,0 +1,192 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package simulations simulates p2p networks. +// A mocker simulates starting and stopping real nodes in a network. +package simulations + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +//a map of mocker names to its function +var mockerList = map[string]func(net *Network, quit chan struct{}, nodeCount int){ + "startStop": startStop, + "probabilistic": probabilistic, + "boot": boot, +} + +//Lookup a mocker by its name, returns the mockerFn +func LookupMocker(mockerType string) func(net *Network, quit chan struct{}, nodeCount int) { + return mockerList[mockerType] +} + +//Get a list of mockers (keys of the map) +//Useful for frontend to build available mocker selection +func GetMockerList() []string { + list := make([]string, 0, len(mockerList)) + for k := range mockerList { + list = append(list, k) + } + return list +} + +//The boot mockerFn only connects the node in a ring and doesn't do anything else +func boot(net *Network, quit chan struct{}, nodeCount int) { + _, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } +} + +//The startStop mockerFn stops and starts nodes in a defined period (ticker) +func startStop(net *Network, quit chan struct{}, nodeCount int) { + nodes, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + for { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-tick.C: + id := nodes[rand.Intn(len(nodes))] + log.Info("stopping node", "id", id) + if err := net.Stop(id); err != nil { + log.Error("error stopping node", "id", id, "err", err) + return + } + + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-time.After(3 * time.Second): + } + + log.Debug("starting node", "id", id) + if err := net.Start(id); err != nil { + log.Error("error starting node", "id", id, "err", err) + return + } + } + } +} + +//The probabilistic mocker func has a more probabilistic pattern +//(the implementation could probably be improved): +//nodes are connected in a ring, then a varying number of random nodes is selected, +//mocker then stops and starts them in random intervals, and continues the loop +func probabilistic(net *Network, quit chan struct{}, nodeCount int) { + nodes, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } + for { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + default: + } + var lowid, highid int + var wg sync.WaitGroup + randWait := time.Duration(rand.Intn(5000)+1000) * time.Millisecond + rand1 := rand.Intn(nodeCount - 1) + rand2 := rand.Intn(nodeCount - 1) + if rand1 < rand2 { + lowid = rand1 + highid = rand2 + } else if rand1 > rand2 { + highid = rand1 + lowid = rand2 + } else { + if rand1 == 0 { + rand2 = 9 + } else if rand1 == 9 { + rand1 = 0 + } + lowid = rand1 + highid = rand2 + } + var steps = highid - lowid + wg.Add(steps) + for i := lowid; i < highid; i++ { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-time.After(randWait): + } + log.Debug(fmt.Sprintf("node %v shutting down", nodes[i])) + err := net.Stop(nodes[i]) + if err != nil { + log.Error(fmt.Sprintf("Error stopping node %s", nodes[i])) + wg.Done() + continue + } + go func(id discover.NodeID) { + time.Sleep(randWait) + err := net.Start(id) + if err != nil { + log.Error(fmt.Sprintf("Error starting node %s", id)) + } + wg.Done() + }(nodes[i]) + } + wg.Wait() + } + +} + +//connect nodeCount number of nodes in a ring +func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) { + ids := make([]discover.NodeID, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := net.NewNode() + if err != nil { + log.Error("Error creating a node! %s", err) + return nil, err + } + ids[i] = node.ID() + } + + for _, id := range ids { + if err := net.Start(id); err != nil { + log.Error("Error starting a node! %s", err) + return nil, err + } + log.Debug(fmt.Sprintf("node %v starting up", id)) + } + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := net.Connect(id, peerID); err != nil { + log.Error("Error connecting a node to a peer! %s", err) + return nil, err + } + } + + return ids, nil +} diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go new file mode 100644 index 000000000..de8ec0b33 --- /dev/null +++ b/p2p/simulations/mocker_test.go @@ -0,0 +1,171 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package simulations simulates p2p networks. +// A mokcer simulates starting and stopping real nodes in a network. +package simulations + +import ( + "encoding/json" + "net/http" + "net/url" + "strconv" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +func TestMocker(t *testing.T) { + //start the simulation HTTP server + _, s := testHTTPServer(t) + defer s.Close() + + //create a client + client := NewClient(s.URL) + + //start the network + err := client.StartNetwork() + if err != nil { + t.Fatalf("Could not start test network: %s", err) + } + //stop the network to terminate + defer func() { + err = client.StopNetwork() + if err != nil { + t.Fatalf("Could not stop test network: %s", err) + } + }() + + //get the list of available mocker types + resp, err := http.Get(s.URL + "/mocker") + if err != nil { + t.Fatalf("Could not get mocker list: %s", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received, expected 200, got %d", resp.StatusCode) + } + + //check the list is at least 1 in size + var mockerlist []string + err = json.NewDecoder(resp.Body).Decode(&mockerlist) + if err != nil { + t.Fatalf("Error decoding JSON mockerlist: %s", err) + } + + if len(mockerlist) < 1 { + t.Fatalf("No mockers available") + } + + nodeCount := 10 + var wg sync.WaitGroup + + events := make(chan *Event, 10) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + defer sub.Unsubscribe() + //wait until all nodes are started and connected + //store every node up event in a map (value is irrelevant, mimic Set datatype) + nodemap := make(map[discover.NodeID]bool) + wg.Add(1) + nodesComplete := false + connCount := 0 + go func() { + for { + select { + case event := <-events: + //if the event is a node Up event only + if event.Node != nil && event.Node.Up { + //add the correspondent node ID to the map + nodemap[event.Node.Config.ID] = true + //this means all nodes got a nodeUp event, so we can continue the test + if len(nodemap) == nodeCount { + nodesComplete = true + //wait for 3s as the mocker will need time to connect the nodes + //time.Sleep( 3 *time.Second) + } + } else if event.Conn != nil && nodesComplete { + connCount += 1 + if connCount == (nodeCount-1)*2 { + wg.Done() + return + } + } + case <-time.After(30 * time.Second): + wg.Done() + t.Fatalf("Timeout waiting for nodes being started up!") + } + } + }() + + //take the last element of the mockerlist as the default mocker-type to ensure one is enabled + mockertype := mockerlist[len(mockerlist)-1] + //still, use hardcoded "probabilistic" one if available ;) + for _, m := range mockerlist { + if m == "probabilistic" { + mockertype = m + break + } + } + //start the mocker with nodeCount number of nodes + resp, err = http.PostForm(s.URL+"/mocker/start", url.Values{"mocker-type": {mockertype}, "node-count": {strconv.Itoa(nodeCount)}}) + if err != nil { + t.Fatalf("Could not start mocker: %s", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received for starting mocker, expected 200, got %d", resp.StatusCode) + } + + wg.Wait() + + //check there are nodeCount number of nodes in the network + nodes_info, err := client.GetNodes() + if err != nil { + t.Fatalf("Could not get nodes list: %s", err) + } + + if len(nodes_info) != nodeCount { + t.Fatalf("Expected %d number of nodes, got: %d", nodeCount, len(nodes_info)) + } + + //stop the mocker + resp, err = http.Post(s.URL+"/mocker/stop", "", nil) + if err != nil { + t.Fatalf("Could not stop mocker: %s", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received for stopping mocker, expected 200, got %d", resp.StatusCode) + } + + //reset the network + _, err = http.Post(s.URL+"/reset", "", nil) + if err != nil { + t.Fatalf("Could not reset network: %s", err) + } + + //now the number of nodes in the network should be zero + nodes_info, err = client.GetNodes() + if err != nil { + t.Fatalf("Could not get nodes list: %s", err) + } + + if len(nodes_info) != 0 { + t.Fatalf("Expected empty list of nodes, got: %d", len(nodes_info)) + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 06890ffcf..caf428ece 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -30,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) +var dialBanTimeout = 200 * time.Millisecond + // NetworkConfig defines configuration options for starting a Network type NetworkConfig struct { ID string `json:"id"` @@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) conf.PrivateKey = c.PrivateKey } id := conf.ID + if conf.Reachable == nil { + conf.Reachable = func(otherID discover.NodeID) bool { + _, err := self.InitConn(conf.ID, otherID) + return err == nil + } + } // assign a name to the node if not set if conf.Name == "" { @@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error { // method on the "one" node so that it connects to the "other" node func (self *Network) Connect(oneID, otherID discover.NodeID) error { log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) - conn, err := self.GetOrCreateConn(oneID, otherID) + conn, err := self.InitConn(oneID, otherID) if err != nil { return err } - if conn.Up { - return fmt.Errorf("%v and %v already connected", oneID, otherID) - } - if err := conn.nodesUp(); err != nil { - return err - } client, err := conn.one.Client() if err != nil { return err @@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error { // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node func (self *Network) DidDisconnect(one, other discover.NodeID) error { - conn, err := self.GetOrCreateConn(one, other) - if err != nil { + conn := self.GetConn(one, other) + if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } if !conn.Up { return fmt.Errorf("%v and %v already disconnected", one, other) } conn.Up = false + conn.initiated = time.Now().Add(-dialBanTimeout) self.events.Send(NewEvent(conn)) return nil } @@ -396,10 +400,12 @@ func (self *Network) getNodeByName(name string) *Node { } // GetNodes returns the existing nodes -func (self *Network) GetNodes() []*Node { +func (self *Network) GetNodes() (nodes []*Node) { self.lock.Lock() defer self.lock.Unlock() - return self.Nodes + + nodes = append(nodes, self.Nodes...) + return nodes } // GetConn returns the connection which exists between "one" and "other" @@ -415,6 +421,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { self.lock.Lock() defer self.lock.Unlock() + return self.getOrCreateConn(oneID, otherID) +} + +func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { if conn := self.getConn(oneID, otherID); conn != nil { return conn, nil } @@ -448,6 +458,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { return self.Conns[i] } +// InitConn(one, other) retrieves the connectiton model for the connection between +// peers one and other, or creates a new one if it does not exist +// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i) +// it checks if the connection is already up, and if the nodes are running +// NOTE: +// it also checks whether there has been recent attempt to connect the peers +// this is cheating as the simulation is used as an oracle and know about +// remote peers attempt to connect to a node which will then not initiate the connection +func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { + self.lock.Lock() + defer self.lock.Unlock() + if oneID == otherID { + return nil, fmt.Errorf("refusing to connect to self %v", oneID) + } + conn, err := self.getOrCreateConn(oneID, otherID) + if err != nil { + return nil, err + } + if time.Since(conn.initiated) < dialBanTimeout { + return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) + } + if conn.Up { + return nil, fmt.Errorf("%v and %v already connected", oneID, otherID) + } + err = conn.nodesUp() + if err != nil { + return nil, fmt.Errorf("nodes not up: %v", err) + } + conn.initiated = time.Now() + return conn, nil +} + // Shutdown stops all nodes in the network and closes the quit channel func (self *Network) Shutdown() { for _, node := range self.Nodes { @@ -459,6 +501,20 @@ func (self *Network) Shutdown() { close(self.quitc) } +//Reset resets all network properties: +//emtpies the nodes and the connection list +func (self *Network) Reset() { + self.lock.Lock() + defer self.lock.Unlock() + + //re-initialize the maps + self.connMap = make(map[string]int) + self.nodeMap = make(map[discover.NodeID]int) + + self.Nodes = nil + self.Conns = nil +} + // Node is a wrapper around adapters.Node which is used to track the status // of a node in the network type Node struct { @@ -516,6 +572,8 @@ type Conn struct { // Up tracks whether or not the connection is active Up bool `json:"up"` + // Registers when the connection was grabbed to dial + initiated time.Time one *Node other *Node @@ -620,6 +678,12 @@ func (self *Network) Load(snap *Snapshot) error { } } for _, conn := range snap.Conns { + + if !self.GetNode(conn.One).Up || !self.GetNode(conn.Other).Up { + //in this case, at least one of the nodes of a connection is not up, + //so it would result in the snapshot `Load` to fail + continue + } if err := self.Connect(conn.One, conn.Other); err != nil { return err } |