diff options
Diffstat (limited to 'p2p/protocols')
-rw-r--r-- | p2p/protocols/accounting.go | 43 | ||||
-rw-r--r-- | p2p/protocols/accounting_simulation_test.go | 10 | ||||
-rw-r--r-- | p2p/protocols/reporter.go | 147 | ||||
-rw-r--r-- | p2p/protocols/reporter_test.go | 77 |
4 files changed, 267 insertions, 10 deletions
diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 06a1a5845..770406a27 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -16,29 +16,32 @@ package protocols -import "github.com/ethereum/go-ethereum/metrics" +import ( + "time" + + "github.com/ethereum/go-ethereum/metrics" +) //define some metrics var ( - //NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions //All metrics are cumulative //total amount of units credited - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", nil) + mBalanceCredit metrics.Counter //total amount of units debited - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", nil) + mBalanceDebit metrics.Counter //total amount of bytes credited - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", nil) + mBytesCredit metrics.Counter //total amount of bytes debited - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", nil) + mBytesDebit metrics.Counter //total amount of credited messages - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", nil) + mMsgCredit metrics.Counter //total amount of debited messages - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", nil) + mMsgDebit metrics.Counter //how many times local node had to drop remote peers - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", nil) + mPeerDrops metrics.Counter //how many times local node overdrafted and dropped - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", nil) + mSelfDrops metrics.Counter ) //Prices defines how prices are being passed on to the accounting instance @@ -105,6 +108,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting { return ah } +//SetupAccountingMetrics creates a separate registry for p2p accounting metrics; +//this registry should be independent of any other metrics as it persists at different endpoints. +//It also instantiates the given metrics and starts the persisting go-routine which +//at the passed interval writes the metrics to a LevelDB +func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { + //create an empty registry + registry := metrics.NewRegistry() + //instantiate the metrics + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) + //create the DB and start persisting + return NewAccountingMetrics(registry, reportInterval, path) +} + //Implement Hook.Send // Send takes a peer, a size and a msg and // - calculates the cost for the local node sending a msg of size to peer using the Prices interface diff --git a/p2p/protocols/accounting_simulation_test.go b/p2p/protocols/accounting_simulation_test.go index 65b737abe..e90a1d81d 100644 --- a/p2p/protocols/accounting_simulation_test.go +++ b/p2p/protocols/accounting_simulation_test.go @@ -20,7 +20,10 @@ import ( "context" "flag" "fmt" + "io/ioutil" "math/rand" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -66,6 +69,13 @@ func init() { func TestAccountingSimulation(t *testing.T) { //setup the balances objects for every node bal := newBalances(*nodes) + //setup the metrics system or tests will fail trying to write metrics + dir, err := ioutil.TempDir("", "account-sim") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db")) //define the node.Service for this test services := adapters.Services{ "accounting": func(ctx *adapters.ServiceContext) (node.Service, error) { diff --git a/p2p/protocols/reporter.go b/p2p/protocols/reporter.go new file mode 100644 index 000000000..215d4fe31 --- /dev/null +++ b/p2p/protocols/reporter.go @@ -0,0 +1,147 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "encoding/binary" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + + "github.com/syndtr/goleveldb/leveldb" +) + +//AccountMetrics abstracts away the metrics DB and +//the reporter to persist metrics +type AccountingMetrics struct { + reporter *reporter +} + +//Close will be called when the node is being shutdown +//for a graceful cleanup +func (am *AccountingMetrics) Close() { + close(am.reporter.quit) + am.reporter.db.Close() +} + +//reporter is an internal structure used to write p2p accounting related +//metrics to a LevelDB. It will periodically write the accrued metrics to the DB. +type reporter struct { + reg metrics.Registry //the registry for these metrics (independent of other metrics) + interval time.Duration //duration at which the reporter will persist metrics + db *leveldb.DB //the actual DB + quit chan struct{} //quit the reporter loop +} + +//NewMetricsDB creates a new LevelDB instance used to persist metrics defined +//inside p2p/protocols/accounting.go +func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *AccountingMetrics { + var val = make([]byte, 8) + var err error + + //Create the LevelDB + db, err := leveldb.OpenFile(path, nil) + if err != nil { + log.Error(err.Error()) + return nil + } + + //Check for all defined metrics that there is a value in the DB + //If there is, assign it to the metric. This means that the node + //has been running before and that metrics have been persisted. + metricsMap := map[string]metrics.Counter{ + "account.balance.credit": mBalanceCredit, + "account.balance.debit": mBalanceDebit, + "account.bytes.credit": mBytesCredit, + "account.bytes.debit": mBytesDebit, + "account.msg.credit": mMsgCredit, + "account.msg.debit": mMsgDebit, + "account.peerdrops": mPeerDrops, + "account.selfdrops": mSelfDrops, + } + //iterate the map and get the values + for key, metric := range metricsMap { + val, err = db.Get([]byte(key), nil) + //until the first time a value is being written, + //this will return an error. + //it could be beneficial though to log errors later, + //but that would require a different logic + if err == nil { + metric.Inc(int64(binary.BigEndian.Uint64(val))) + } + } + + //create the reporter + rep := &reporter{ + reg: r, + interval: d, + db: db, + quit: make(chan struct{}), + } + + //run the go routine + go rep.run() + + m := &AccountingMetrics{ + reporter: rep, + } + + return m +} + +//run is the goroutine which periodically sends the metrics to the configured LevelDB +func (r *reporter) run() { + intervalTicker := time.NewTicker(r.interval) + + for { + select { + case <-intervalTicker.C: + //at each tick send the metrics + if err := r.save(); err != nil { + log.Error("unable to send metrics to LevelDB", "err", err) + //If there is an error in writing, exit the routine; we assume here that the error is + //severe and don't attempt to write again. + //Also, this should prevent leaking when the node is stopped + return + } + case <-r.quit: + //graceful shutdown + return + } + } +} + +//send the metrics to the DB +func (r *reporter) save() error { + //create a LevelDB Batch + batch := leveldb.Batch{} + //for each metric in the registry (which is independent)... + r.reg.Each(func(name string, i interface{}) { + metric, ok := i.(metrics.Counter) + if ok { + //assuming every metric here to be a Counter (separate registry) + //...create a snapshot... + ms := metric.Snapshot() + byteVal := make([]byte, 8) + binary.BigEndian.PutUint64(byteVal, uint64(ms.Count())) + //...and save the value to the DB + batch.Put([]byte(name), byteVal) + } + }) + return r.db.Write(&batch, nil) +} diff --git a/p2p/protocols/reporter_test.go b/p2p/protocols/reporter_test.go new file mode 100644 index 000000000..b9f06e674 --- /dev/null +++ b/p2p/protocols/reporter_test.go @@ -0,0 +1,77 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +//TestReporter tests that the metrics being collected for p2p accounting +//are being persisted and available after restart of a node. +//It simulates restarting by just recreating the DB as if the node had restarted. +func TestReporter(t *testing.T) { + //create a test directory + dir, err := ioutil.TempDir("", "reporter-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + //setup the metrics + log.Debug("Setting up metrics first time") + reportInterval := 5 * time.Millisecond + metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + log.Debug("Done.") + + //do some metrics + mBalanceCredit.Inc(12) + mBytesCredit.Inc(34) + mMsgDebit.Inc(9) + + //give the reporter time to write the metrics to DB + time.Sleep(20 * time.Millisecond) + + //set the metrics to nil - this effectively simulates the node having shut down... + mBalanceCredit = nil + mBytesCredit = nil + mMsgDebit = nil + //close the DB also, or we can't create a new one + metrics.Close() + + //setup the metrics again + log.Debug("Setting up metrics second time") + metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + defer metrics.Close() + log.Debug("Done.") + + //now check the metrics, they should have the same value as before "shutdown" + if mBalanceCredit.Count() != 12 { + t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count()) + } + if mBytesCredit.Count() != 34 { + t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count()) + } + if mMsgDebit.Count() != 9 { + t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count()) + } +} |