diff options
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 115 |
1 files changed, 65 insertions, 50 deletions
diff --git a/p2p/server.go b/p2p/server.go index 9f1478a41..48b4e8be3 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -162,12 +164,18 @@ type Server struct { removestatic chan *discover.Node posthandshake chan *conn addpeer chan *conn - delpeer chan *Peer + delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop } type peerOpFunc func(map[discover.NodeID]*Peer) +type peerDrop struct { + *Peer + err error + requested bool // true if signaled by the peer +} + type connFlag int const ( @@ -204,9 +212,9 @@ type transport interface { } func (c *conn) String() string { - s := c.flags.String() + " conn" + s := c.flags.String() if (c.id != discover.NodeID{}) { - s += fmt.Sprintf(" %x", c.id[:8]) + s += " " + c.id.String() } s += " " + c.fd.RemoteAddr().String() return s @@ -215,16 +223,16 @@ func (c *conn) String() string { func (f connFlag) String() string { s := "" if f&trustedConn != 0 { - s += " trusted" + s += "-trusted" } if f&dynDialedConn != 0 { - s += " dyn dial" + s += "-dyndial" } if f&staticDialedConn != 0 { - s += " static dial" + s += "-staticdial" } if f&inboundConn != 0 { - s += " inbound" + s += "-inbound" } if s != "" { s = s[1:] @@ -288,26 +296,30 @@ func (srv *Server) Self() *discover.Node { srv.lock.Lock() defer srv.lock.Unlock() - // If the server's not running, return an empty node if !srv.running { return &discover.Node{IP: net.ParseIP("0.0.0.0")} } - // If the node is running but discovery is off, manually assemble the node infos - if srv.ntab == nil { - // Inbound connections disabled, use zero address - if srv.listener == nil { + return srv.makeSelf(srv.listener, srv.ntab) +} + +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *discover.Node { + // If the server's not running, return an empty node. + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + // Inbound connections disabled, use zero address. + if listener == nil { return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)} } // Otherwise inject the listener address too - addr := srv.listener.Addr().(*net.TCPAddr) + addr := listener.Addr().(*net.TCPAddr) return &discover.Node{ ID: discover.PubkeyID(&srv.PrivateKey.PublicKey), IP: addr.IP, TCP: uint16(addr.Port), } } - // Otherwise return the live node infos - return srv.ntab.Self() + // Otherwise return the discovery node. + return ntab.Self() } // Stop terminates the server and all active peer connections. @@ -336,7 +348,7 @@ func (srv *Server) Start() (err error) { return errors.New("server already running") } srv.running = true - log.Info(fmt.Sprint("Starting Server")) + log.Info("Starting P2P networking") // static fields if srv.PrivateKey == nil { @@ -350,7 +362,7 @@ func (srv *Server) Start() (err error) { } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) - srv.delpeer = make(chan *Peer) + srv.delpeer = make(chan peerDrop) srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *discover.Node) srv.removestatic = make(chan *discover.Node) @@ -398,7 +410,7 @@ func (srv *Server) Start() (err error) { } } if srv.NoDial && srv.ListenAddr == "" { - log.Warn(fmt.Sprint("I will be kind-of useless, neither dialing nor listening.")) + log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) @@ -466,7 +478,7 @@ func (srv *Server) run(dialstate dialer) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] - log.Trace(fmt.Sprint("new task:", t)) + log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } @@ -489,19 +501,18 @@ running: select { case <-srv.quit: // The server was stopped. Run the cleanup logic. - log.Trace(fmt.Sprint("<-quit: spinning down")) break running case n := <-srv.addstatic: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - log.Trace(fmt.Sprint("<-addstatic:", n)) + log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - log.Trace(fmt.Sprint("<-removestatic:", n)) + log.Debug("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) @@ -514,7 +525,7 @@ running: // A task got done. Tell dialstate about it so it // can update its state and remove it from the active // tasks list. - log.Trace(fmt.Sprint("<-taskdone:", t)) + log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: @@ -524,19 +535,17 @@ running: // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } - log.Trace(fmt.Sprint("<-posthandshake:", c)) // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. c.cont <- srv.encHandshakeChecks(peers, c) case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. - log.Trace(fmt.Sprint("<-addpeer:", c)) err := srv.protoHandshakeChecks(peers, c) - if err != nil { - log.Trace(fmt.Sprintf("Not adding %v as peer: %v", c, err)) - } else { + if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + name := truncateName(c.name) + log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p go srv.runPeer(p) } @@ -544,13 +553,16 @@ running: // dial tasks complete after the peer has been added or // discarded. Unblock the task last. c.cont <- err - case p := <-srv.delpeer: + case pd := <-srv.delpeer: // A peer disconnected. - log.Trace(fmt.Sprint("<-delpeer:", p)) - delete(peers, p.ID()) + d := common.PrettyDuration(mclock.Now() - pd.created) + pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) + delete(peers, pd.ID()) } } + log.Trace("P2P networking is spinning down") + // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { srv.ntab.Close() @@ -565,10 +577,9 @@ running: // Wait for peers to shut down. Pending connections and tasks are // not handled here and will terminate soon-ish because srv.quit // is closed. - log.Trace(fmt.Sprintf("ignoring %d pending tasks at spindown", len(runningTasks))) for len(peers) > 0 { p := <-srv.delpeer - log.Trace(fmt.Sprint("<-delpeer (spindown):", p)) + p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks)) delete(peers, p.ID()) } } @@ -604,7 +615,7 @@ type tempError interface { // inbound connections. func (srv *Server) listenLoop() { defer srv.loopWG.Done() - log.Info(fmt.Sprint("Listening on", srv.listener.Addr())) + log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. @@ -629,10 +640,10 @@ func (srv *Server) listenLoop() { for { fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { - log.Debug(fmt.Sprintf("Temporary read error: %v", err)) + log.Debug("Temporary read error", "err", err) continue } else if err != nil { - log.Debug(fmt.Sprintf("Read error: %v", err)) + log.Debug("Read error", "err", err) return } break @@ -641,7 +652,7 @@ func (srv *Server) listenLoop() { // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { - log.Debug(fmt.Sprintf("Rejected conn %v because it is not whitelisted in NetRestrict", fd.RemoteAddr())) + log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue @@ -649,7 +660,7 @@ func (srv *Server) listenLoop() { } fd = newMeteredConn(fd, true) - log.Debug(fmt.Sprintf("Accepted conn %v", fd.RemoteAddr())) + log.Trace("Accepted connection", "addr", fd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection // has been established. @@ -676,36 +687,37 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { - log.Debug(fmt.Sprintf("%v faild enc handshake: %v", c, err)) + log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) c.close(err) return } + clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { c.close(DiscUnexpectedIdentity) - log.Debug(fmt.Sprintf("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])) + clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) return } if err := srv.checkpoint(c, srv.posthandshake); err != nil { - log.Debug(fmt.Sprintf("%v failed checkpoint posthandshake: %v", c, err)) + clog.Trace("Rejected peer before protocol handshake", "err", err) c.close(err) return } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { - log.Debug(fmt.Sprintf("%v failed proto handshake: %v", c, err)) + clog.Trace("Failed proto handshake", "err", err) c.close(err) return } if phs.ID != c.id { - log.Debug(fmt.Sprintf("%v wrong proto handshake identity: %x", c, phs.ID[:8])) + clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) c.close(DiscUnexpectedIdentity) return } c.caps, c.name = phs.Caps, phs.Name if err := srv.checkpoint(c, srv.addpeer); err != nil { - log.Debug(fmt.Sprintf("%v failed checkpoint addpeer: %v", c, err)) + clog.Trace("Rejected peer", "err", err) c.close(err) return } @@ -713,6 +725,13 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod // launched by run. } +func truncateName(s string) string { + if len(s) > 20 { + return s[:20] + "..." + } + return s +} + // checkpoint sends the conn to run, which performs the // post-handshake checks for the stage (posthandshake, addpeer). func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { @@ -733,17 +752,13 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { // it waits until the Peer logic returns and removes // the peer. func (srv *Server) runPeer(p *Peer) { - log.Debug(fmt.Sprintf("Added %v", p)) - if srv.newPeerHook != nil { srv.newPeerHook(p) } - discreason := p.run() + remoteRequested, err := p.run() // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. - srv.delpeer <- p - - log.Debug(fmt.Sprintf("Removed %v (%v)", p, discreason)) + srv.delpeer <- peerDrop{p, err, remoteRequested} } // NodeInfo represents a short summary of the information known about the host. |