1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package protocols
import (
"time"
"github.com/ethereum/go-ethereum/metrics"
)
// define some metrics
var (
// All metrics are cumulative
// total amount of units credited
mBalanceCredit metrics.Counter
// total amount of units debited
mBalanceDebit metrics.Counter
// total amount of bytes credited
mBytesCredit metrics.Counter
// total amount of bytes debited
mBytesDebit metrics.Counter
// total amount of credited messages
mMsgCredit metrics.Counter
// total amount of debited messages
mMsgDebit metrics.Counter
// how many times local node had to drop remote peers
mPeerDrops metrics.Counter
// how many times local node overdrafted and dropped
mSelfDrops metrics.Counter
MetricsRegistry metrics.Registry
)
// Prices defines how prices are being passed on to the accounting instance
type Prices interface {
// Return the Price for a message
Price(interface{}) *Price
}
type Payer bool
const (
Sender = Payer(true)
Receiver = Payer(false)
)
// Price represents the costs of a message
type Price struct {
Value uint64
PerByte bool // True if the price is per byte or for unit
Payer Payer
}
// For gives back the price for a message
// A protocol provides the message price in absolute value
// This method then returns the correct signed amount,
// depending on who pays, which is identified by the `payer` argument:
// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
// Thus: If Sending and sender pays, amount positive, otherwise negative
// If Receiving, and receiver pays, amount positive, otherwise negative
func (p *Price) For(payer Payer, size uint32) int64 {
price := p.Value
if p.PerByte {
price *= uint64(size)
}
if p.Payer == payer {
return 0 - int64(price)
}
return int64(price)
}
// Balance is the actual accounting instance
// Balance defines the operations needed for accounting
// Implementations internally maintain the balance for every peer
type Balance interface {
// Adds amount to the local balance with remote node `peer`;
// positive amount = credit local node
// negative amount = debit local node
Add(amount int64, peer *Peer) error
}
// Accounting implements the Hook interface
// It interfaces to the balances through the Balance interface,
// while interfacing with protocols and its prices through the Prices interface
type Accounting struct {
Balance // interface to accounting logic
Prices // interface to prices logic
}
func NewAccounting(balance Balance, po Prices) *Accounting {
ah := &Accounting{
Prices: po,
Balance: balance,
}
return ah
}
// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
}
// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
// this message doesn't need accounting
if price == nil {
return nil
}
// evaluate the price for sending messages
costToLocalNode := price.For(Sender, size)
// do the accounting
err := ah.Add(costToLocalNode, peer)
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}
// Receive takes a peer, a size and a msg and
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
// this message doesn't need accounting
if price == nil {
return nil
}
// evaluate the price for receiving messages
costToLocalNode := price.For(Receiver, size)
// do the accounting
err := ah.Add(costToLocalNode, peer)
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}
// record some metrics
// this is not an error handling. `err` is returned by both `Send` and `Receive`
// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
// if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
if price > 0 {
mBalanceCredit.Inc(price)
mBytesCredit.Inc(int64(size))
mMsgCredit.Inc(1)
if err != nil {
// increase the number of times a remote node has been dropped due to "overdraft"
mPeerDrops.Inc(1)
}
} else {
mBalanceDebit.Inc(price)
mBytesDebit.Inc(int64(size))
mMsgDebit.Inc(1)
if err != nil {
// increase the number of times the local node has done an "overdraft" in respect to other nodes
mSelfDrops.Inc(1)
}
}
}
|