aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go115
1 files changed, 65 insertions, 50 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 9f1478a41..48b4e8be3 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -25,6 +25,8 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
@@ -162,12 +164,18 @@ type Server struct {
removestatic chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
- delpeer chan *Peer
+ delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
}
type peerOpFunc func(map[discover.NodeID]*Peer)
+type peerDrop struct {
+ *Peer
+ err error
+ requested bool // true if signaled by the peer
+}
+
type connFlag int
const (
@@ -204,9 +212,9 @@ type transport interface {
}
func (c *conn) String() string {
- s := c.flags.String() + " conn"
+ s := c.flags.String()
if (c.id != discover.NodeID{}) {
- s += fmt.Sprintf(" %x", c.id[:8])
+ s += " " + c.id.String()
}
s += " " + c.fd.RemoteAddr().String()
return s
@@ -215,16 +223,16 @@ func (c *conn) String() string {
func (f connFlag) String() string {
s := ""
if f&trustedConn != 0 {
- s += " trusted"
+ s += "-trusted"
}
if f&dynDialedConn != 0 {
- s += " dyn dial"
+ s += "-dyndial"
}
if f&staticDialedConn != 0 {
- s += " static dial"
+ s += "-staticdial"
}
if f&inboundConn != 0 {
- s += " inbound"
+ s += "-inbound"
}
if s != "" {
s = s[1:]
@@ -288,26 +296,30 @@ func (srv *Server) Self() *discover.Node {
srv.lock.Lock()
defer srv.lock.Unlock()
- // If the server's not running, return an empty node
if !srv.running {
return &discover.Node{IP: net.ParseIP("0.0.0.0")}
}
- // If the node is running but discovery is off, manually assemble the node infos
- if srv.ntab == nil {
- // Inbound connections disabled, use zero address
- if srv.listener == nil {
+ return srv.makeSelf(srv.listener, srv.ntab)
+}
+
+func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *discover.Node {
+ // If the server's not running, return an empty node.
+ // If the node is running but discovery is off, manually assemble the node infos.
+ if ntab == nil {
+ // Inbound connections disabled, use zero address.
+ if listener == nil {
return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
}
// Otherwise inject the listener address too
- addr := srv.listener.Addr().(*net.TCPAddr)
+ addr := listener.Addr().(*net.TCPAddr)
return &discover.Node{
ID: discover.PubkeyID(&srv.PrivateKey.PublicKey),
IP: addr.IP,
TCP: uint16(addr.Port),
}
}
- // Otherwise return the live node infos
- return srv.ntab.Self()
+ // Otherwise return the discovery node.
+ return ntab.Self()
}
// Stop terminates the server and all active peer connections.
@@ -336,7 +348,7 @@ func (srv *Server) Start() (err error) {
return errors.New("server already running")
}
srv.running = true
- log.Info(fmt.Sprint("Starting Server"))
+ log.Info("Starting P2P networking")
// static fields
if srv.PrivateKey == nil {
@@ -350,7 +362,7 @@ func (srv *Server) Start() (err error) {
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
- srv.delpeer = make(chan *Peer)
+ srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
@@ -398,7 +410,7 @@ func (srv *Server) Start() (err error) {
}
}
if srv.NoDial && srv.ListenAddr == "" {
- log.Warn(fmt.Sprint("I will be kind-of useless, neither dialing nor listening."))
+ log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
@@ -466,7 +478,7 @@ func (srv *Server) run(dialstate dialer) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
- log.Trace(fmt.Sprint("new task:", t))
+ log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
@@ -489,19 +501,18 @@ running:
select {
case <-srv.quit:
// The server was stopped. Run the cleanup logic.
- log.Trace(fmt.Sprint("<-quit: spinning down"))
break running
case n := <-srv.addstatic:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
- log.Trace(fmt.Sprint("<-addstatic:", n))
+ log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
- log.Trace(fmt.Sprint("<-removestatic:", n))
+ log.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
@@ -514,7 +525,7 @@ running:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
- log.Trace(fmt.Sprint("<-taskdone:", t))
+ log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
@@ -524,19 +535,17 @@ running:
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
- log.Trace(fmt.Sprint("<-posthandshake:", c))
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
c.cont <- srv.encHandshakeChecks(peers, c)
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
- log.Trace(fmt.Sprint("<-addpeer:", c))
err := srv.protoHandshakeChecks(peers, c)
- if err != nil {
- log.Trace(fmt.Sprintf("Not adding %v as peer: %v", c, err))
- } else {
+ if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
+ name := truncateName(c.name)
+ log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
go srv.runPeer(p)
}
@@ -544,13 +553,16 @@ running:
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
c.cont <- err
- case p := <-srv.delpeer:
+ case pd := <-srv.delpeer:
// A peer disconnected.
- log.Trace(fmt.Sprint("<-delpeer:", p))
- delete(peers, p.ID())
+ d := common.PrettyDuration(mclock.Now() - pd.created)
+ pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
+ delete(peers, pd.ID())
}
}
+ log.Trace("P2P networking is spinning down")
+
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
srv.ntab.Close()
@@ -565,10 +577,9 @@ running:
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
// is closed.
- log.Trace(fmt.Sprintf("ignoring %d pending tasks at spindown", len(runningTasks)))
for len(peers) > 0 {
p := <-srv.delpeer
- log.Trace(fmt.Sprint("<-delpeer (spindown):", p))
+ p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks))
delete(peers, p.ID())
}
}
@@ -604,7 +615,7 @@ type tempError interface {
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
- log.Info(fmt.Sprint("Listening on", srv.listener.Addr()))
+ log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
@@ -629,10 +640,10 @@ func (srv *Server) listenLoop() {
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
- log.Debug(fmt.Sprintf("Temporary read error: %v", err))
+ log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
- log.Debug(fmt.Sprintf("Read error: %v", err))
+ log.Debug("Read error", "err", err)
return
}
break
@@ -641,7 +652,7 @@ func (srv *Server) listenLoop() {
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
- log.Debug(fmt.Sprintf("Rejected conn %v because it is not whitelisted in NetRestrict", fd.RemoteAddr()))
+ log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
@@ -649,7 +660,7 @@ func (srv *Server) listenLoop() {
}
fd = newMeteredConn(fd, true)
- log.Debug(fmt.Sprintf("Accepted conn %v", fd.RemoteAddr()))
+ log.Trace("Accepted connection", "addr", fd.RemoteAddr())
// Spawn the handler. It will give the slot back when the connection
// has been established.
@@ -676,36 +687,37 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
- log.Debug(fmt.Sprintf("%v faild enc handshake: %v", c, err))
+ log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
c.close(err)
return
}
+ clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
c.close(DiscUnexpectedIdentity)
- log.Debug(fmt.Sprintf("%v dialed identity mismatch, want %x", c, dialDest.ID[:8]))
+ clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
return
}
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
- log.Debug(fmt.Sprintf("%v failed checkpoint posthandshake: %v", c, err))
+ clog.Trace("Rejected peer before protocol handshake", "err", err)
c.close(err)
return
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
- log.Debug(fmt.Sprintf("%v failed proto handshake: %v", c, err))
+ clog.Trace("Failed proto handshake", "err", err)
c.close(err)
return
}
if phs.ID != c.id {
- log.Debug(fmt.Sprintf("%v wrong proto handshake identity: %x", c, phs.ID[:8]))
+ clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
c.close(DiscUnexpectedIdentity)
return
}
c.caps, c.name = phs.Caps, phs.Name
if err := srv.checkpoint(c, srv.addpeer); err != nil {
- log.Debug(fmt.Sprintf("%v failed checkpoint addpeer: %v", c, err))
+ clog.Trace("Rejected peer", "err", err)
c.close(err)
return
}
@@ -713,6 +725,13 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod
// launched by run.
}
+func truncateName(s string) string {
+ if len(s) > 20 {
+ return s[:20] + "..."
+ }
+ return s
+}
+
// checkpoint sends the conn to run, which performs the
// post-handshake checks for the stage (posthandshake, addpeer).
func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
@@ -733,17 +752,13 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
// it waits until the Peer logic returns and removes
// the peer.
func (srv *Server) runPeer(p *Peer) {
- log.Debug(fmt.Sprintf("Added %v", p))
-
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
- discreason := p.run()
+ remoteRequested, err := p.run()
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
- srv.delpeer <- p
-
- log.Debug(fmt.Sprintf("Removed %v (%v)", p, discreason))
+ srv.delpeer <- peerDrop{p, err, remoteRequested}
}
// NodeInfo represents a short summary of the information known about the host.