aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/metrics.go
blob: 8bd902286852f4c31d1b3a7c26681a6c623f8853 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Contains the meters and timers used by the networking layer.

package p2p

import (
    "fmt"
    "net"
    "sync"
    "sync/atomic"
    "time"

    "github.com/dexon-foundation/dexon/p2p/enode"

    "github.com/dexon-foundation/dexon/event"
    "github.com/dexon-foundation/dexon/log"
    "github.com/dexon-foundation/dexon/metrics"
)

const (
    MetricsInboundConnects  = "p2p/InboundConnects"  // Name for the registered inbound connects meter
    MetricsInboundTraffic   = "p2p/InboundTraffic"   // Name for the registered inbound traffic meter
    MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter
    MetricsOutboundTraffic  = "p2p/OutboundTraffic"  // Name for the registered outbound traffic meter

    MeteredPeerLimit = 1024 // This amount of peers are individually metered
)

var (
    ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil)  // Meter counting the ingress connections
    ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil)   // Meter metering the cumulative ingress traffic
    egressConnectMeter  = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
    egressTrafficMeter  = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil)  // Meter metering the cumulative egress traffic

    PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/")  // Registry containing the peer ingress
    PeerEgressRegistry  = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress

    meteredPeerFeed  event.Feed // Event feed for peer metrics
    meteredPeerCount int32      // Actually stored peer connection count
)

// MeteredPeerEventType is the type of peer events emitted by a metered connection.
type MeteredPeerEventType int

const (
    // PeerConnected is the type of event emitted when a peer successfully
    // made the handshake.
    PeerConnected MeteredPeerEventType = iota

    // PeerDisconnected is the type of event emitted when a peer disconnects.
    PeerDisconnected

    // PeerHandshakeFailed is the type of event emitted when a peer fails to
    // make the handshake or disconnects before the handshake.
    PeerHandshakeFailed
)

// MeteredPeerEvent is an event emitted when peers connect or disconnect.
type MeteredPeerEvent struct {
    Type    MeteredPeerEventType // Type of peer event
    IP      net.IP               // IP address of the peer
    ID      enode.ID             // NodeID of the peer
    Elapsed time.Duration        // Time elapsed between the connection and the handshake/disconnection
    Ingress uint64               // Ingress count at the moment of the event
    Egress  uint64               // Egress count at the moment of the event
}

// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
// if metrics collection is enabled.
func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
    return meteredPeerFeed.Subscribe(ch)
}

// meteredConn is a wrapper around a net.Conn that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
    net.Conn // Network connection to wrap with metering

    connected time.Time // Connection time of the peer
    ip        net.IP    // IP address of the peer
    id        enode.ID  // NodeID of the peer

    // trafficMetered denotes if the peer is registered in the traffic registries.
    // Its value is true if the metered peer count doesn't reach the limit in the
    // moment of the peer's connection.
    trafficMetered bool
    ingressMeter   metrics.Meter // Meter for the read bytes of the peer
    egressMeter    metrics.Meter // Meter for the written bytes of the peer

    lock sync.RWMutex // Lock protecting the metered connection's internals
}

// newMeteredConn creates a new metered connection, bumps the ingress or egress
// connection meter and also increases the metered peer count. If the metrics
// system is disabled or the IP address is unspecified, this function returns
// the original object.
func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
    // Short circuit if metrics are disabled
    if !metrics.Enabled {
        return conn
    }
    if ip.IsUnspecified() {
        log.Warn("Peer IP is unspecified")
        return conn
    }
    // Bump the connection counters and wrap the connection
    if ingress {
        ingressConnectMeter.Mark(1)
    } else {
        egressConnectMeter.Mark(1)
    }
    return &meteredConn{
        Conn:      conn,
        ip:        ip,
        connected: time.Now(),
    }
}

// Read delegates a network read to the underlying connection, bumping the common
// and the peer ingress traffic meters along the way.
func (c *meteredConn) Read(b []byte) (n int, err error) {
    n, err = c.Conn.Read(b)
    ingressTrafficMeter.Mark(int64(n))
    c.lock.RLock()
    if c.trafficMetered {
        c.ingressMeter.Mark(int64(n))
    }
    c.lock.RUnlock()
    return n, err
}

// Write delegates a network write to the underlying connection, bumping the common
// and the peer egress traffic meters along the way.
func (c *meteredConn) Write(b []byte) (n int, err error) {
    n, err = c.Conn.Write(b)
    egressTrafficMeter.Mark(int64(n))
    c.lock.RLock()
    if c.trafficMetered {
        c.egressMeter.Mark(int64(n))
    }
    c.lock.RUnlock()
    return n, err
}

// handshakeDone is called when a peer handshake is done. Registers the peer to
// the ingress and the egress traffic registries using the peer's IP and node ID,
// also emits connect event.
func (c *meteredConn) handshakeDone(id enode.ID) {
    if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
        // Don't register the peer in the traffic registries.
        atomic.AddInt32(&meteredPeerCount, -1)
        c.lock.Lock()
        c.id, c.trafficMetered = id, false
        c.lock.Unlock()
        log.Warn("Metered peer count reached the limit")
    } else {
        key := fmt.Sprintf("%s/%s", c.ip, id.String())
        c.lock.Lock()
        c.id, c.trafficMetered = id, true
        c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry)
        c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry)
        c.lock.Unlock()
    }
    meteredPeerFeed.Send(MeteredPeerEvent{
        Type:    PeerConnected,
        IP:      c.ip,
        ID:      id,
        Elapsed: time.Since(c.connected),
    })
}

// Close delegates a close operation to the underlying connection, unregisters
// the peer from the traffic registries and emits close event.
func (c *meteredConn) Close() error {
    err := c.Conn.Close()
    c.lock.RLock()
    if c.id == (enode.ID{}) {
        // If the peer disconnects before the handshake.
        c.lock.RUnlock()
        meteredPeerFeed.Send(MeteredPeerEvent{
            Type:    PeerHandshakeFailed,
            IP:      c.ip,
            Elapsed: time.Since(c.connected),
        })
        return err
    }
    id := c.id
    if !c.trafficMetered {
        // If the peer isn't registered in the traffic registries.
        c.lock.RUnlock()
        meteredPeerFeed.Send(MeteredPeerEvent{
            Type: PeerDisconnected,
            IP:   c.ip,
            ID:   id,
        })
        return err
    }
    ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
    c.lock.RUnlock()

    // Decrement the metered peer count
    atomic.AddInt32(&meteredPeerCount, -1)

    // Unregister the peer from the traffic registries
    key := fmt.Sprintf("%s/%s", c.ip, id)
    PeerIngressRegistry.Unregister(key)
    PeerEgressRegistry.Unregister(key)

    meteredPeerFeed.Send(MeteredPeerEvent{
        Type:    PeerDisconnected,
        IP:      c.ip,
        ID:      id,
        Ingress: ingress,
        Egress:  egress,
    })
    return err
}