aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
authorKurkó Mihály <kurkomisi@users.noreply.github.com>2018-10-16 06:40:51 +0800
committerFelix Lange <fjl@users.noreply.github.com>2018-10-16 06:40:51 +0800
commit16e4d0e0055f7fce620ff6881a1393d955c06cee (patch)
treec612d19c404de92d63b09bf395a18b11b74cb389 /p2p
parent331fa6d3075cb89fc295b36d9039620ec1d741ad (diff)
downloadgo-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar.gz
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar.bz2
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar.lz
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar.xz
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.tar.zst
go-tangerine-16e4d0e0055f7fce620ff6881a1393d955c06cee.zip
p2p: meter peer traffic, emit metered peer events (#17695)
This change extends the peer metrics collection: - traces the life-cycle of the peers - meters the peer traffic separately for every peer - creates event feed for the peer events - emits the peer events
Diffstat (limited to 'p2p')
-rw-r--r--p2p/dial.go2
-rw-r--r--p2p/metrics.go194
-rw-r--r--p2p/server.go9
3 files changed, 187 insertions, 18 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index d228514fc..075a0f936 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -350,7 +350,7 @@ func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
if err != nil {
return &dialError{err}
}
- mfd := newMeteredConn(fd, false)
+ mfd := newMeteredConn(fd, false, dest.IP())
return srv.SetupConn(mfd, t.flags, dest)
}
diff --git a/p2p/metrics.go b/p2p/metrics.go
index 2d52fd1fd..6a7c0bad3 100644
--- a/p2p/metrics.go
+++ b/p2p/metrics.go
@@ -19,53 +19,215 @@
package p2p
import (
+ "fmt"
"net"
+ "sync"
+ "sync/atomic"
+ "time"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
+const (
+ MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter
+ MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter
+ MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter
+ MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter
+
+ MeteredPeerLimit = 1024 // This amount of peers are individually metered
+)
+
var (
- ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil)
- ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
- egressConnectMeter = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil)
- egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
+ ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections
+ ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
+ egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
+ egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
+
+ PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
+ PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
+
+ meteredPeerFeed event.Feed // Event feed for peer metrics
+ meteredPeerCount int32 // Actually stored peer connection count
)
+// MeteredPeerEventType is the type of peer events emitted by a metered connection.
+type MeteredPeerEventType int
+
+const (
+ // PeerConnected is the type of event emitted when a peer successfully
+ // made the handshake.
+ PeerConnected MeteredPeerEventType = iota
+
+ // PeerDisconnected is the type of event emitted when a peer disconnects.
+ PeerDisconnected
+
+ // PeerHandshakeFailed is the type of event emitted when a peer fails to
+ // make the handshake or disconnects before the handshake.
+ PeerHandshakeFailed
+)
+
+// MeteredPeerEvent is an event emitted when peers connect or disconnect.
+type MeteredPeerEvent struct {
+ Type MeteredPeerEventType // Type of peer event
+ IP net.IP // IP address of the peer
+ ID string // NodeID of the peer
+ Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
+ Ingress uint64 // Ingress count at the moment of the event
+ Egress uint64 // Egress count at the moment of the event
+}
+
+// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
+// if metrics collection is enabled.
+func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
+ return meteredPeerFeed.Subscribe(ch)
+}
+
// meteredConn is a wrapper around a net.Conn that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
net.Conn // Network connection to wrap with metering
+
+ connected time.Time // Connection time of the peer
+ ip net.IP // IP address of the peer
+ id string // NodeID of the peer
+
+ // trafficMetered denotes if the peer is registered in the traffic registries.
+ // Its value is true if the metered peer count doesn't reach the limit in the
+ // moment of the peer's connection.
+ trafficMetered bool
+ ingressMeter metrics.Meter // Meter for the read bytes of the peer
+ egressMeter metrics.Meter // Meter for the written bytes of the peer
+
+ lock sync.RWMutex // Lock protecting the metered connection's internals
}
-// newMeteredConn creates a new metered connection, also bumping the ingress or
-// egress connection meter. If the metrics system is disabled, this function
-// returns the original object.
-func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
+// newMeteredConn creates a new metered connection, bumps the ingress or egress
+// connection meter and also increases the metered peer count. If the metrics
+// system is disabled or the IP address is unspecified, this function returns
+// the original object.
+func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
// Short circuit if metrics are disabled
if !metrics.Enabled {
return conn
}
- // Otherwise bump the connection counters and wrap the connection
+ if ip.IsUnspecified() {
+ log.Warn("Peer IP is unspecified")
+ return conn
+ }
+ // Bump the connection counters and wrap the connection
if ingress {
ingressConnectMeter.Mark(1)
} else {
egressConnectMeter.Mark(1)
}
- return &meteredConn{Conn: conn}
+ return &meteredConn{
+ Conn: conn,
+ ip: ip,
+ connected: time.Now(),
+ }
}
-// Read delegates a network read to the underlying connection, bumping the ingress
-// traffic meter along the way.
+// Read delegates a network read to the underlying connection, bumping the common
+// and the peer ingress traffic meters along the way.
func (c *meteredConn) Read(b []byte) (n int, err error) {
n, err = c.Conn.Read(b)
ingressTrafficMeter.Mark(int64(n))
- return
+ c.lock.RLock()
+ if c.trafficMetered {
+ c.ingressMeter.Mark(int64(n))
+ }
+ c.lock.RUnlock()
+ return n, err
}
-// Write delegates a network write to the underlying connection, bumping the
-// egress traffic meter along the way.
+// Write delegates a network write to the underlying connection, bumping the common
+// and the peer egress traffic meters along the way.
func (c *meteredConn) Write(b []byte) (n int, err error) {
n, err = c.Conn.Write(b)
egressTrafficMeter.Mark(int64(n))
- return
+ c.lock.RLock()
+ if c.trafficMetered {
+ c.egressMeter.Mark(int64(n))
+ }
+ c.lock.RUnlock()
+ return n, err
+}
+
+// handshakeDone is called when a peer handshake is done. Registers the peer to
+// the ingress and the egress traffic registries using the peer's IP and node ID,
+// also emits connect event.
+func (c *meteredConn) handshakeDone(nodeID enode.ID) {
+ id := nodeID.String()
+ if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
+ // Don't register the peer in the traffic registries.
+ atomic.AddInt32(&meteredPeerCount, -1)
+ c.lock.Lock()
+ c.id, c.trafficMetered = id, false
+ c.lock.Unlock()
+ log.Warn("Metered peer count reached the limit")
+ } else {
+ key := fmt.Sprintf("%s/%s", c.ip, id)
+ c.lock.Lock()
+ c.id, c.trafficMetered = id, true
+ c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry)
+ c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry)
+ c.lock.Unlock()
+ }
+ meteredPeerFeed.Send(MeteredPeerEvent{
+ Type: PeerConnected,
+ IP: c.ip,
+ ID: id,
+ Elapsed: time.Since(c.connected),
+ })
+}
+
+// Close delegates a close operation to the underlying connection, unregisters
+// the peer from the traffic registries and emits close event.
+func (c *meteredConn) Close() error {
+ err := c.Conn.Close()
+ c.lock.RLock()
+ if c.id == "" {
+ // If the peer disconnects before the handshake.
+ c.lock.RUnlock()
+ meteredPeerFeed.Send(MeteredPeerEvent{
+ Type: PeerHandshakeFailed,
+ IP: c.ip,
+ Elapsed: time.Since(c.connected),
+ })
+ return err
+ }
+ id := c.id
+ if !c.trafficMetered {
+ // If the peer isn't registered in the traffic registries.
+ c.lock.RUnlock()
+ meteredPeerFeed.Send(MeteredPeerEvent{
+ Type: PeerDisconnected,
+ IP: c.ip,
+ ID: id,
+ })
+ return err
+ }
+ ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
+ c.lock.RUnlock()
+
+ // Decrement the metered peer count
+ atomic.AddInt32(&meteredPeerCount, -1)
+
+ // Unregister the peer from the traffic registries
+ key := fmt.Sprintf("%s/%s", c.ip, id)
+ PeerIngressRegistry.Unregister(key)
+ PeerEgressRegistry.Unregister(key)
+
+ meteredPeerFeed.Send(MeteredPeerEvent{
+ Type: PeerDisconnected,
+ IP: c.ip,
+ ID: id,
+ Ingress: ingress,
+ Egress: egress,
+ })
+ return err
}
diff --git a/p2p/server.go b/p2p/server.go
index 6482c0401..38a881f7b 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -864,7 +864,11 @@ func (srv *Server) listenLoop() {
}
}
- fd = newMeteredConn(fd, true)
+ var ip net.IP
+ if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
+ ip = tcp.IP
+ }
+ fd = newMeteredConn(fd, true, ip)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
go func() {
srv.SetupConn(fd, inboundConn, nil)
@@ -917,6 +921,9 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
+ if conn, ok := c.fd.(*meteredConn); ok {
+ conn.handshakeDone(c.node.ID())
+ }
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
err = srv.checkpoint(c, srv.posthandshake)
if err != nil {