From 86f6568f6618945b19057553ec32690d723da982 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Fri, 10 Nov 2017 18:06:45 +0100 Subject: build: enable unconvert linter (#15456) * build: enable unconvert linter - fixes #15453 - update code base for failing cases * cmd/puppeth: replace syscall.Stdin with os.Stdin.Fd() for unconvert linter --- p2p/discover/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p') diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 2f5a26c34..ec4eb94ad 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -427,7 +427,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { rc := make(chan *Node, len(nodes)) for i := range nodes { go func(n *Node) { - nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCP)) + nn, _ := tab.bond(false, n.ID, n.addr(), n.TCP) rc <- nn }(nodes[i]) } -- cgit v1.2.3 From 54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Fri, 1 Dec 2017 11:49:04 +0000 Subject: p2p/simulations: various stability fixes (#15198) p2p/simulations: introduce dialBan - Refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). - The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network. p2p: add Inbound public method to p2p.Peer p2p/simulations: Add server id to logs to support debugging in-memory network simulations when multiple peers are logging. p2p: SetupConn now returns error. The dialer checks the error and only calls resolve if the actual TCP dial fails. --- p2p/dial.go | 25 +++++++----- p2p/peer.go | 5 +++ p2p/server.go | 84 +++++++++++++++++++++++--------------- p2p/server_test.go | 10 ++++- p2p/simulations/adapters/docker.go | 2 + p2p/simulations/adapters/exec.go | 1 + p2p/simulations/adapters/inproc.go | 4 +- p2p/simulations/adapters/types.go | 6 +++ p2p/simulations/network.go | 67 +++++++++++++++++++++++++----- 9 files changed, 148 insertions(+), 56 deletions(-) (limited to 'p2p') diff --git a/p2p/dial.go b/p2p/dial.go index 2d9e3a0ed..8ca3dc5a1 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) { return } } - success := t.dial(srv, t.dest) - // Try resolving the ID of static nodes if dialing failed. - if !success && t.flags&staticDialedConn != 0 { - if t.resolve(srv) { - t.dial(srv, t.dest) + err := t.dial(srv, t.dest) + if err != nil { + log.Trace("Dial error", "task", t, "err", err) + // Try resolving the ID of static nodes if dialing failed. + if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + t.dial(srv, t.dest) + } } } } @@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool { return true } +type dialError struct { + error +} + // dial performs the actual connection attempt. -func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { +func (t *dialTask) dial(srv *Server, dest *discover.Node) error { fd, err := srv.Dialer.Dial(dest) if err != nil { - log.Trace("Dial error", "task", t, "err", err) - return false + return &dialError{err} } mfd := newMeteredConn(fd, false) - srv.SetupConn(mfd, t.flags, dest) - return true + return srv.SetupConn(mfd, t.flags, dest) } func (t *dialTask) String() string { diff --git a/p2p/peer.go b/p2p/peer.go index 1d2b726e8..bad1c8c8b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -160,6 +160,11 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) } +// Inbound returns true if the peer is an inbound connection +func (p *Peer) Inbound() bool { + return p.rw.flags&inboundConn != 0 +} + func newPeer(conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ diff --git a/p2p/server.go b/p2p/server.go index d1d578401..922df55ba 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -139,6 +139,9 @@ type Config struct { // If EnableMsgEvents is set then the server will emit PeerEvents // whenever a message is sent to or received from a peer EnableMsgEvents bool + + // Logger is a custom logger to use with the p2p.Server. + Logger log.Logger } // Server manages all peer connections. @@ -172,6 +175,7 @@ type Server struct { delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed + log log.Logger } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) { return errors.New("server already running") } srv.running = true - log.Info("Starting P2P networking") + srv.log = srv.Config.Logger + if srv.log == nil { + srv.log = log.New() + } + srv.log.Info("Starting P2P networking") // static fields if srv.PrivateKey == nil { @@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) { } } if srv.NoDial && srv.ListenAddr == "" { - log.Warn("P2P server will be useless, neither dialing nor listening") + srv.log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) @@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] - log.Trace("New dial task", "task", t) + srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } @@ -517,13 +525,13 @@ running: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - log.Debug("Adding static node", "node", n) + srv.log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - log.Debug("Removing static node", "node", n) + srv.log.Debug("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) @@ -536,7 +544,7 @@ running: // A task got done. Tell dialstate about it so it // can update its state and remove it from the active // tasks list. - log.Trace("Dial task done", "task", t) + srv.log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: @@ -565,7 +573,7 @@ running: p.events = &srv.peerFeed } name := truncateName(c.name) - log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p go srv.runPeer(p) } @@ -585,7 +593,7 @@ running: } } - log.Trace("P2P networking is spinning down") + srv.log.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { @@ -639,7 +647,7 @@ type tempError interface { // inbound connections. func (srv *Server) listenLoop() { defer srv.loopWG.Done() - log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) + srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. @@ -664,10 +672,10 @@ func (srv *Server) listenLoop() { for { fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { - log.Debug("Temporary read error", "err", err) + srv.log.Debug("Temporary read error", "err", err) continue } else if err != nil { - log.Debug("Read error", "err", err) + srv.log.Debug("Read error", "err", err) return } break @@ -676,7 +684,7 @@ func (srv *Server) listenLoop() { // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { - log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) + srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue @@ -684,7 +692,7 @@ func (srv *Server) listenLoop() { } fd = newMeteredConn(fd, true) - log.Trace("Accepted connection", "addr", fd.RemoteAddr()) + srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection // has been established. @@ -698,55 +706,65 @@ func (srv *Server) listenLoop() { // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error { + self := srv.Self() + if self == nil { + return errors.New("shutdown") + } + c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} + err := srv.setupConn(c, flags, dialDest) + if err != nil { + c.close(err) + srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) + } + return err +} + +func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running srv.lock.Unlock() - c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} if !running { - c.close(errServerStopped) - return + return errServerStopped } // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { - log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) - c.close(err) - return + srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) + return err } - clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) + clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { - c.close(DiscUnexpectedIdentity) clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) - return + return DiscUnexpectedIdentity } - if err := srv.checkpoint(c, srv.posthandshake); err != nil { + err = srv.checkpoint(c, srv.posthandshake) + if err != nil { clog.Trace("Rejected peer before protocol handshake", "err", err) - c.close(err) - return + return err } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed proto handshake", "err", err) - c.close(err) - return + return err } if phs.ID != c.id { clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) - c.close(DiscUnexpectedIdentity) - return + return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name - if err := srv.checkpoint(c, srv.addpeer); err != nil { + err = srv.checkpoint(c, srv.addpeer) + if err != nil { clog.Trace("Rejected peer", "err", err) - c.close(err) - return + return err } // If the checks completed successfully, runPeer has now been // launched by run. + clog.Trace("connection set up", "inbound", dialDest == nil) + return nil } func truncateName(s string) string { diff --git a/p2p/server_test.go b/p2p/server_test.go index 11dd83e5d..10c36528e 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) { quit: make(chan struct{}), ntab: fakeTable{}, running: true, + log: log.New(), } srv.loopWG.Add(1) go func() { @@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) { } var ( - srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true} + srv = &Server{ + quit: make(chan struct{}), + ntab: fakeTable{}, + running: true, + log: log.New(), + } done = make(chan *testTask) start, end = 0, 0 ) @@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) { Protocols: []Protocol{discard}, }, newTransport: func(fd net.Conn) transport { return test.tt }, + log: log.New(), } if !test.dontstart { if err := srv.Start(); err != nil { diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go index 022314b3d..8ef5629fb 100644 --- a/p2p/simulations/adapters/docker.go +++ b/p2p/simulations/adapters/docker.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true + conf.Stack.Logger = log.New("node.id", config.ID.String()) node := &DockerNode{ ExecNode: ExecNode{ diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index bdb92cc1d..a566fb27d 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -359,6 +359,7 @@ func execP2PNode() { log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) // use explicit IP address in ListenAddr so that Enode URL is usable externalIP := func() string { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index c97188def..48d7c1730 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { Dialer: s, EnableMsgEvents: true, }, - NoUSB: true, + NoUSB: true, + Logger: log.New("node.id", id.String()), }) if err != nil { return nil, err diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index ed6cfc504..5b4b47fe2 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -83,6 +83,9 @@ type NodeConfig struct { // stack to encrypt communications PrivateKey *ecdsa.PrivateKey + // Enable peer events for Msgs + EnableMsgEvents bool + // Name is a human friendly name for the node like "node01" Name string @@ -91,6 +94,9 @@ type NodeConfig struct { // contained in SimAdapter.services, for other nodes it should be // services registered by calling the RegisterService function) Services []string + + // function to sanction or prevent suggesting a peer + Reachable func(id discover.NodeID) bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 06890ffcf..fd8777673 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -30,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) +var dialBanTimeout = 200 * time.Millisecond + // NetworkConfig defines configuration options for starting a Network type NetworkConfig struct { ID string `json:"id"` @@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) conf.PrivateKey = c.PrivateKey } id := conf.ID + if conf.Reachable == nil { + conf.Reachable = func(otherID discover.NodeID) bool { + _, err := self.InitConn(conf.ID, otherID) + return err == nil + } + } // assign a name to the node if not set if conf.Name == "" { @@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error { // method on the "one" node so that it connects to the "other" node func (self *Network) Connect(oneID, otherID discover.NodeID) error { log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) - conn, err := self.GetOrCreateConn(oneID, otherID) + conn, err := self.InitConn(oneID, otherID) if err != nil { return err } - if conn.Up { - return fmt.Errorf("%v and %v already connected", oneID, otherID) - } - if err := conn.nodesUp(); err != nil { - return err - } client, err := conn.one.Client() if err != nil { return err @@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error { // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node func (self *Network) DidDisconnect(one, other discover.NodeID) error { - conn, err := self.GetOrCreateConn(one, other) - if err != nil { + conn := self.GetConn(one, other) + if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } if !conn.Up { return fmt.Errorf("%v and %v already disconnected", one, other) } conn.Up = false + conn.initiated = time.Now().Add(-dialBanTimeout) self.events.Send(NewEvent(conn)) return nil } @@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node { } // GetNodes returns the existing nodes -func (self *Network) GetNodes() []*Node { +func (self *Network) GetNodes() (nodes []*Node) { self.lock.Lock() defer self.lock.Unlock() - return self.Nodes + for _, node := range self.Nodes { + nodes = append(nodes, node) + } + return nodes } // GetConn returns the connection which exists between "one" and "other" @@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { self.lock.Lock() defer self.lock.Unlock() + return self.getOrCreateConn(oneID, otherID) +} + +func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { if conn := self.getConn(oneID, otherID); conn != nil { return conn, nil } @@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { return self.Conns[i] } +// InitConn(one, other) retrieves the connectiton model for the connection between +// peers one and other, or creates a new one if it does not exist +// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i) +// it checks if the connection is already up, and if the nodes are running +// NOTE: +// it also checks whether there has been recent attempt to connect the peers +// this is cheating as the simulation is used as an oracle and know about +// remote peers attempt to connect to a node which will then not initiate the connection +func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { + self.lock.Lock() + defer self.lock.Unlock() + if oneID == otherID { + return nil, fmt.Errorf("refusing to connect to self %v", oneID) + } + conn, err := self.getOrCreateConn(oneID, otherID) + if err != nil { + return nil, err + } + if time.Now().Sub(conn.initiated) < dialBanTimeout { + return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) + } + if conn.Up { + return nil, fmt.Errorf("%v and %v already connected", oneID, otherID) + } + err = conn.nodesUp() + if err != nil { + return nil, fmt.Errorf("nodes not up: %v", err) + } + conn.initiated = time.Now() + return conn, nil +} + // Shutdown stops all nodes in the network and closes the quit channel func (self *Network) Shutdown() { for _, node := range self.Nodes { @@ -516,6 +559,8 @@ type Conn struct { // Up tracks whether or not the connection is active Up bool `json:"up"` + // Registers when the connection was grabbed to dial + initiated time.Time one *Node other *Node -- cgit v1.2.3 From 1d06e41f04d75c31334c455063e9ec7b4136bf23 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Mon, 4 Dec 2017 11:07:10 +0100 Subject: p2p, swarm/network/kademlia: use IsZero to check for zero time (#15603) --- p2p/dial.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p') diff --git a/p2p/dial.go b/p2p/dial.go index 8ca3dc5a1..f5ff2c211 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -157,7 +157,7 @@ func (s *dialstate) removeStatic(n *discover.Node) { } func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { - if s.start == (time.Time{}) { + if s.start.IsZero() { s.start = now } -- cgit v1.2.3 From 3da1bf8ca18f54d9bd2c8c110854dc071ee3898b Mon Sep 17 00:00:00 2001 From: Zach Date: Tue, 12 Dec 2017 18:05:47 +0000 Subject: all: use gometalinter.v2, fix new gosimple issues (#15650) --- p2p/discv5/net.go | 2 +- p2p/discv5/ntp.go | 8 ++++---- p2p/discv5/ticket.go | 12 ++++++------ p2p/simulations/network.go | 7 +++---- 4 files changed, 14 insertions(+), 15 deletions(-) (limited to 'p2p') diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index a39cfcc64..2fbb60824 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -684,7 +684,7 @@ func (net *Network) refresh(done chan<- struct{}) { seeds = net.nursery } if len(seeds) == 0 { - log.Trace(fmt.Sprint("no seed nodes found")) + log.Trace("no seed nodes found") close(done) return } diff --git a/p2p/discv5/ntp.go b/p2p/discv5/ntp.go index f78d5dc43..4fb5f657a 100644 --- a/p2p/discv5/ntp.go +++ b/p2p/discv5/ntp.go @@ -54,10 +54,10 @@ func checkClockDrift() { howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings") separator := strings.Repeat("-", len(warning)) - log.Warn(fmt.Sprint(separator)) - log.Warn(fmt.Sprint(warning)) - log.Warn(fmt.Sprint(howtofix)) - log.Warn(fmt.Sprint(separator)) + log.Warn(separator) + log.Warn(warning) + log.Warn(howtofix) + log.Warn(separator) } else { log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift)) } diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 48dd114f0..193cef4be 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -398,12 +398,12 @@ func (s *ticketStore) nextRegisterableTicket() (t *ticketRef, wait time.Duration //s.removeExcessTickets(topic) if len(tickets.buckets) != 0 { empty = false - if list := tickets.buckets[bucket]; list != nil { - for _, ref := range list { - //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now))) - if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() { - nextTicket = ref - } + + list := tickets.buckets[bucket] + for _, ref := range list { + //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now))) + if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() { + nextTicket = ref } } } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index fd8777673..f3dda2e44 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -403,9 +403,8 @@ func (self *Network) getNodeByName(name string) *Node { func (self *Network) GetNodes() (nodes []*Node) { self.lock.Lock() defer self.lock.Unlock() - for _, node := range self.Nodes { - nodes = append(nodes, node) - } + + nodes = append(nodes, self.Nodes...) return nodes } @@ -477,7 +476,7 @@ func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { if err != nil { return nil, err } - if time.Now().Sub(conn.initiated) < dialBanTimeout { + if time.Since(conn.initiated) < dialBanTimeout { return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) } if conn.Up { -- cgit v1.2.3 From fd777bb2104f15cb7c2f7eede7069ad436e29b57 Mon Sep 17 00:00:00 2001 From: holisticode Date: Tue, 12 Dec 2017 13:10:41 -0500 Subject: p2p/simulations: add mocker functionality (#15207) This commit adds mocker functionality to p2p/simulations. A mocker allows to starting/stopping of nodes via the HTTP API. --- p2p/simulations/adapters/state.go | 35 +++++++ p2p/simulations/http.go | 64 ++++++++++++- p2p/simulations/mocker.go | 192 ++++++++++++++++++++++++++++++++++++++ p2p/simulations/mocker_test.go | 171 +++++++++++++++++++++++++++++++++ p2p/simulations/network.go | 20 ++++ 5 files changed, 480 insertions(+), 2 deletions(-) create mode 100644 p2p/simulations/adapters/state.go create mode 100644 p2p/simulations/mocker.go create mode 100644 p2p/simulations/mocker_test.go (limited to 'p2p') diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go new file mode 100644 index 000000000..8b1dfef90 --- /dev/null +++ b/p2p/simulations/adapters/state.go @@ -0,0 +1,35 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package adapters + +type SimStateStore struct { + m map[string][]byte +} + +func (self *SimStateStore) Load(s string) ([]byte, error) { + return self.m[s], nil +} + +func (self *SimStateStore) Save(s string, data []byte) error { + self.m[s] = data + return nil +} + +func NewSimStateStore() *SimStateStore { + return &SimStateStore{ + make(map[string][]byte), + } +} diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 3fa8b9292..97dd742e8 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -27,6 +27,7 @@ import ( "net/http" "strconv" "strings" + "sync" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" @@ -263,8 +264,10 @@ func (c *Client) Send(method, path string, in, out interface{}) error { // Server is an HTTP server providing an API to manage a simulation network type Server struct { - router *httprouter.Router - network *Network + router *httprouter.Router + network *Network + mockerStop chan struct{} // when set, stops the current mocker + mockerMtx sync.Mutex // synchronises access to the mockerStop field } // NewServer returns a new simulation API server @@ -278,6 +281,10 @@ func NewServer(network *Network) *Server { s.GET("/", s.GetNetwork) s.POST("/start", s.StartNetwork) s.POST("/stop", s.StopNetwork) + s.POST("/mocker/start", s.StartMocker) + s.POST("/mocker/stop", s.StopMocker) + s.GET("/mocker", s.GetMockers) + s.POST("/reset", s.ResetNetwork) s.GET("/events", s.StreamNetworkEvents) s.GET("/snapshot", s.CreateSnapshot) s.POST("/snapshot", s.LoadSnapshot) @@ -318,6 +325,59 @@ func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } +// StartMocker starts the mocker node simulation +func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { + s.mockerMtx.Lock() + defer s.mockerMtx.Unlock() + if s.mockerStop != nil { + http.Error(w, "mocker already running", http.StatusInternalServerError) + return + } + mockerType := req.FormValue("mocker-type") + mockerFn := LookupMocker(mockerType) + if mockerFn == nil { + http.Error(w, fmt.Sprintf("unknown mocker type %q", mockerType), http.StatusBadRequest) + return + } + nodeCount, err := strconv.Atoi(req.FormValue("node-count")) + if err != nil { + http.Error(w, "invalid node-count provided", http.StatusBadRequest) + return + } + s.mockerStop = make(chan struct{}) + go mockerFn(s.network, s.mockerStop, nodeCount) + + w.WriteHeader(http.StatusOK) +} + +// StopMocker stops the mocker node simulation +func (s *Server) StopMocker(w http.ResponseWriter, req *http.Request) { + s.mockerMtx.Lock() + defer s.mockerMtx.Unlock() + if s.mockerStop == nil { + http.Error(w, "stop channel not initialized", http.StatusInternalServerError) + return + } + close(s.mockerStop) + s.mockerStop = nil + + w.WriteHeader(http.StatusOK) +} + +// GetMockerList returns a list of available mockers +func (s *Server) GetMockers(w http.ResponseWriter, req *http.Request) { + + list := GetMockerList() + s.JSON(w, http.StatusOK, list) +} + +// ResetNetwork resets all properties of a network to its initial (empty) state +func (s *Server) ResetNetwork(w http.ResponseWriter, req *http.Request) { + s.network.Reset() + + w.WriteHeader(http.StatusOK) +} + // StreamNetworkEvents streams network events as a server-sent-events stream func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { events := make(chan *Event) diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go new file mode 100644 index 000000000..c38e28855 --- /dev/null +++ b/p2p/simulations/mocker.go @@ -0,0 +1,192 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package simulations simulates p2p networks. +// A mocker simulates starting and stopping real nodes in a network. +package simulations + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +//a map of mocker names to its function +var mockerList = map[string]func(net *Network, quit chan struct{}, nodeCount int){ + "startStop": startStop, + "probabilistic": probabilistic, + "boot": boot, +} + +//Lookup a mocker by its name, returns the mockerFn +func LookupMocker(mockerType string) func(net *Network, quit chan struct{}, nodeCount int) { + return mockerList[mockerType] +} + +//Get a list of mockers (keys of the map) +//Useful for frontend to build available mocker selection +func GetMockerList() []string { + list := make([]string, 0, len(mockerList)) + for k := range mockerList { + list = append(list, k) + } + return list +} + +//The boot mockerFn only connects the node in a ring and doesn't do anything else +func boot(net *Network, quit chan struct{}, nodeCount int) { + _, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } +} + +//The startStop mockerFn stops and starts nodes in a defined period (ticker) +func startStop(net *Network, quit chan struct{}, nodeCount int) { + nodes, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + for { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-tick.C: + id := nodes[rand.Intn(len(nodes))] + log.Info("stopping node", "id", id) + if err := net.Stop(id); err != nil { + log.Error("error stopping node", "id", id, "err", err) + return + } + + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-time.After(3 * time.Second): + } + + log.Debug("starting node", "id", id) + if err := net.Start(id); err != nil { + log.Error("error starting node", "id", id, "err", err) + return + } + } + } +} + +//The probabilistic mocker func has a more probabilistic pattern +//(the implementation could probably be improved): +//nodes are connected in a ring, then a varying number of random nodes is selected, +//mocker then stops and starts them in random intervals, and continues the loop +func probabilistic(net *Network, quit chan struct{}, nodeCount int) { + nodes, err := connectNodesInRing(net, nodeCount) + if err != nil { + panic("Could not startup node network for mocker") + } + for { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + default: + } + var lowid, highid int + var wg sync.WaitGroup + randWait := time.Duration(rand.Intn(5000)+1000) * time.Millisecond + rand1 := rand.Intn(nodeCount - 1) + rand2 := rand.Intn(nodeCount - 1) + if rand1 < rand2 { + lowid = rand1 + highid = rand2 + } else if rand1 > rand2 { + highid = rand1 + lowid = rand2 + } else { + if rand1 == 0 { + rand2 = 9 + } else if rand1 == 9 { + rand1 = 0 + } + lowid = rand1 + highid = rand2 + } + var steps = highid - lowid + wg.Add(steps) + for i := lowid; i < highid; i++ { + select { + case <-quit: + log.Info("Terminating simulation loop") + return + case <-time.After(randWait): + } + log.Debug(fmt.Sprintf("node %v shutting down", nodes[i])) + err := net.Stop(nodes[i]) + if err != nil { + log.Error(fmt.Sprintf("Error stopping node %s", nodes[i])) + wg.Done() + continue + } + go func(id discover.NodeID) { + time.Sleep(randWait) + err := net.Start(id) + if err != nil { + log.Error(fmt.Sprintf("Error starting node %s", id)) + } + wg.Done() + }(nodes[i]) + } + wg.Wait() + } + +} + +//connect nodeCount number of nodes in a ring +func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) { + ids := make([]discover.NodeID, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := net.NewNode() + if err != nil { + log.Error("Error creating a node! %s", err) + return nil, err + } + ids[i] = node.ID() + } + + for _, id := range ids { + if err := net.Start(id); err != nil { + log.Error("Error starting a node! %s", err) + return nil, err + } + log.Debug(fmt.Sprintf("node %v starting up", id)) + } + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := net.Connect(id, peerID); err != nil { + log.Error("Error connecting a node to a peer! %s", err) + return nil, err + } + } + + return ids, nil +} diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go new file mode 100644 index 000000000..6b866fb74 --- /dev/null +++ b/p2p/simulations/mocker_test.go @@ -0,0 +1,171 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package simulations simulates p2p networks. +// A mokcer simulates starting and stopping real nodes in a network. +package simulations + +import ( + "encoding/json" + "net/http" + "net/url" + "strconv" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +func TestMocker(t *testing.T) { + //start the simulation HTTP server + _, s := testHTTPServer(t) + defer s.Close() + + //create a client + client := NewClient(s.URL) + + //start the network + err := client.StartNetwork() + if err != nil { + t.Fatalf("Could not start test network: %s", err) + } + //stop the network to terminate + defer func() { + err = client.StopNetwork() + if err != nil { + t.Fatalf("Could not stop test network: %s", err) + } + }() + + //get the list of available mocker types + resp, err := http.Get(s.URL + "/mocker") + if err != nil { + t.Fatalf("Could not get mocker list: %s", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received, expected 200, got %d", resp.StatusCode) + } + + //check the list is at least 1 in size + var mockerlist []string + err = json.NewDecoder(resp.Body).Decode(&mockerlist) + if err != nil { + t.Fatalf("Error decoding JSON mockerlist: %s", err) + } + + if len(mockerlist) < 1 { + t.Fatalf("No mockers available") + } + + nodeCount := 10 + var wg sync.WaitGroup + + events := make(chan *Event, 10) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + defer sub.Unsubscribe() + //wait until all nodes are started and connected + //store every node up event in a map (value is irrelevant, mimic Set datatype) + nodemap := make(map[discover.NodeID]bool) + wg.Add(1) + nodesComplete := false + connCount := 0 + go func() { + for { + select { + case event := <-events: + //if the event is a node Up event only + if event.Node != nil && event.Node.Up == true { + //add the correspondent node ID to the map + nodemap[event.Node.Config.ID] = true + //this means all nodes got a nodeUp event, so we can continue the test + if len(nodemap) == nodeCount { + nodesComplete = true + //wait for 3s as the mocker will need time to connect the nodes + //time.Sleep( 3 *time.Second) + } + } else if event.Conn != nil && nodesComplete { + connCount += 1 + if connCount == (nodeCount-1)*2 { + wg.Done() + return + } + } + case <-time.After(30 * time.Second): + wg.Done() + t.Fatalf("Timeout waiting for nodes being started up!") + } + } + }() + + //take the last element of the mockerlist as the default mocker-type to ensure one is enabled + mockertype := mockerlist[len(mockerlist)-1] + //still, use hardcoded "probabilistic" one if available ;) + for _, m := range mockerlist { + if m == "probabilistic" { + mockertype = m + break + } + } + //start the mocker with nodeCount number of nodes + resp, err = http.PostForm(s.URL+"/mocker/start", url.Values{"mocker-type": {mockertype}, "node-count": {strconv.Itoa(nodeCount)}}) + if err != nil { + t.Fatalf("Could not start mocker: %s", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received for starting mocker, expected 200, got %d", resp.StatusCode) + } + + wg.Wait() + + //check there are nodeCount number of nodes in the network + nodes_info, err := client.GetNodes() + if err != nil { + t.Fatalf("Could not get nodes list: %s", err) + } + + if len(nodes_info) != nodeCount { + t.Fatalf("Expected %d number of nodes, got: %d", nodeCount, len(nodes_info)) + } + + //stop the mocker + resp, err = http.Post(s.URL+"/mocker/stop", "", nil) + if err != nil { + t.Fatalf("Could not stop mocker: %s", err) + } + if resp.StatusCode != 200 { + t.Fatalf("Invalid Status Code received for stopping mocker, expected 200, got %d", resp.StatusCode) + } + + //reset the network + _, err = http.Post(s.URL+"/reset", "", nil) + if err != nil { + t.Fatalf("Could not reset network: %s", err) + } + + //now the number of nodes in the network should be zero + nodes_info, err = client.GetNodes() + if err != nil { + t.Fatalf("Could not get nodes list: %s", err) + } + + if len(nodes_info) != 0 { + t.Fatalf("Expected empty list of nodes, got: %d", len(nodes_info)) + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index f3dda2e44..caf428ece 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -501,6 +501,20 @@ func (self *Network) Shutdown() { close(self.quitc) } +//Reset resets all network properties: +//emtpies the nodes and the connection list +func (self *Network) Reset() { + self.lock.Lock() + defer self.lock.Unlock() + + //re-initialize the maps + self.connMap = make(map[string]int) + self.nodeMap = make(map[discover.NodeID]int) + + self.Nodes = nil + self.Conns = nil +} + // Node is a wrapper around adapters.Node which is used to track the status // of a node in the network type Node struct { @@ -664,6 +678,12 @@ func (self *Network) Load(snap *Snapshot) error { } } for _, conn := range snap.Conns { + + if !self.GetNode(conn.One).Up || !self.GetNode(conn.Other).Up { + //in this case, at least one of the nodes of a connection is not up, + //so it would result in the snapshot `Load` to fail + continue + } if err := self.Connect(conn.One, conn.Other); err != nil { return err } -- cgit v1.2.3 From 3654aeaa4f87452ac5bc801a18808189595e2ef8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 13 Dec 2017 03:15:27 +0100 Subject: p2p/simulations: fix gosimple nit (#15661) --- p2p/simulations/mocker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p') diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go index 6b866fb74..de8ec0b33 100644 --- a/p2p/simulations/mocker_test.go +++ b/p2p/simulations/mocker_test.go @@ -91,7 +91,7 @@ func TestMocker(t *testing.T) { select { case event := <-events: //if the event is a node Up event only - if event.Node != nil && event.Node.Up == true { + if event.Node != nil && event.Node.Up { //add the correspondent node ID to the map nodemap[event.Node.Config.ID] = true //this means all nodes got a nodeUp event, so we can continue the test -- cgit v1.2.3 From afa3c72c40eee45dfbf3cbc40505b78cb2c3c6b2 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Mon, 18 Dec 2017 04:03:48 +0100 Subject: p2p/discover: fix leaked goroutine in data expiration --- p2p/discover/database.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'p2p') diff --git a/p2p/discover/database.go b/p2p/discover/database.go index 7206a63c6..b136609f2 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -226,14 +226,14 @@ func (db *nodeDB) ensureExpirer() { // expirer should be started in a go routine, and is responsible for looping ad // infinitum and dropping stale data from the database. func (db *nodeDB) expirer() { - tick := time.Tick(nodeDBCleanupCycle) + tick := time.NewTicker(nodeDBCleanupCycle) + defer tick.Stop() for { select { - case <-tick: + case <-tick.C: if err := db.expireNodes(); err != nil { log.Error("Failed to expire nodedb items", "err", err) } - case <-db.quit: return } -- cgit v1.2.3