diff options
-rw-r--r-- | p2p/protocols/accounting.go | 43 | ||||
-rw-r--r-- | p2p/protocols/accounting_simulation_test.go | 10 | ||||
-rw-r--r-- | p2p/protocols/reporter.go | 147 | ||||
-rw-r--r-- | p2p/protocols/reporter_test.go | 77 | ||||
-rw-r--r-- | swarm/swap/swap.go | 5 | ||||
-rw-r--r-- | swarm/swarm.go | 53 |
6 files changed, 305 insertions, 30 deletions
diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 06a1a5845..770406a27 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -16,29 +16,32 @@ package protocols -import "github.com/ethereum/go-ethereum/metrics" +import ( + "time" + + "github.com/ethereum/go-ethereum/metrics" +) //define some metrics var ( - //NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions //All metrics are cumulative //total amount of units credited - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", nil) + mBalanceCredit metrics.Counter //total amount of units debited - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", nil) + mBalanceDebit metrics.Counter //total amount of bytes credited - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", nil) + mBytesCredit metrics.Counter //total amount of bytes debited - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", nil) + mBytesDebit metrics.Counter //total amount of credited messages - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", nil) + mMsgCredit metrics.Counter //total amount of debited messages - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", nil) + mMsgDebit metrics.Counter //how many times local node had to drop remote peers - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", nil) + mPeerDrops metrics.Counter //how many times local node overdrafted and dropped - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", nil) + mSelfDrops metrics.Counter ) //Prices defines how prices are being passed on to the accounting instance @@ -105,6 +108,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting { return ah } +//SetupAccountingMetrics creates a separate registry for p2p accounting metrics; +//this registry should be independent of any other metrics as it persists at different endpoints. +//It also instantiates the given metrics and starts the persisting go-routine which +//at the passed interval writes the metrics to a LevelDB +func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { + //create an empty registry + registry := metrics.NewRegistry() + //instantiate the metrics + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) + //create the DB and start persisting + return NewAccountingMetrics(registry, reportInterval, path) +} + //Implement Hook.Send // Send takes a peer, a size and a msg and // - calculates the cost for the local node sending a msg of size to peer using the Prices interface diff --git a/p2p/protocols/accounting_simulation_test.go b/p2p/protocols/accounting_simulation_test.go index 65b737abe..e90a1d81d 100644 --- a/p2p/protocols/accounting_simulation_test.go +++ b/p2p/protocols/accounting_simulation_test.go @@ -20,7 +20,10 @@ import ( "context" "flag" "fmt" + "io/ioutil" "math/rand" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -66,6 +69,13 @@ func init() { func TestAccountingSimulation(t *testing.T) { //setup the balances objects for every node bal := newBalances(*nodes) + //setup the metrics system or tests will fail trying to write metrics + dir, err := ioutil.TempDir("", "account-sim") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db")) //define the node.Service for this test services := adapters.Services{ "accounting": func(ctx *adapters.ServiceContext) (node.Service, error) { diff --git a/p2p/protocols/reporter.go b/p2p/protocols/reporter.go new file mode 100644 index 000000000..215d4fe31 --- /dev/null +++ b/p2p/protocols/reporter.go @@ -0,0 +1,147 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "encoding/binary" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + + "github.com/syndtr/goleveldb/leveldb" +) + +//AccountMetrics abstracts away the metrics DB and +//the reporter to persist metrics +type AccountingMetrics struct { + reporter *reporter +} + +//Close will be called when the node is being shutdown +//for a graceful cleanup +func (am *AccountingMetrics) Close() { + close(am.reporter.quit) + am.reporter.db.Close() +} + +//reporter is an internal structure used to write p2p accounting related +//metrics to a LevelDB. It will periodically write the accrued metrics to the DB. +type reporter struct { + reg metrics.Registry //the registry for these metrics (independent of other metrics) + interval time.Duration //duration at which the reporter will persist metrics + db *leveldb.DB //the actual DB + quit chan struct{} //quit the reporter loop +} + +//NewMetricsDB creates a new LevelDB instance used to persist metrics defined +//inside p2p/protocols/accounting.go +func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *AccountingMetrics { + var val = make([]byte, 8) + var err error + + //Create the LevelDB + db, err := leveldb.OpenFile(path, nil) + if err != nil { + log.Error(err.Error()) + return nil + } + + //Check for all defined metrics that there is a value in the DB + //If there is, assign it to the metric. This means that the node + //has been running before and that metrics have been persisted. + metricsMap := map[string]metrics.Counter{ + "account.balance.credit": mBalanceCredit, + "account.balance.debit": mBalanceDebit, + "account.bytes.credit": mBytesCredit, + "account.bytes.debit": mBytesDebit, + "account.msg.credit": mMsgCredit, + "account.msg.debit": mMsgDebit, + "account.peerdrops": mPeerDrops, + "account.selfdrops": mSelfDrops, + } + //iterate the map and get the values + for key, metric := range metricsMap { + val, err = db.Get([]byte(key), nil) + //until the first time a value is being written, + //this will return an error. + //it could be beneficial though to log errors later, + //but that would require a different logic + if err == nil { + metric.Inc(int64(binary.BigEndian.Uint64(val))) + } + } + + //create the reporter + rep := &reporter{ + reg: r, + interval: d, + db: db, + quit: make(chan struct{}), + } + + //run the go routine + go rep.run() + + m := &AccountingMetrics{ + reporter: rep, + } + + return m +} + +//run is the goroutine which periodically sends the metrics to the configured LevelDB +func (r *reporter) run() { + intervalTicker := time.NewTicker(r.interval) + + for { + select { + case <-intervalTicker.C: + //at each tick send the metrics + if err := r.save(); err != nil { + log.Error("unable to send metrics to LevelDB", "err", err) + //If there is an error in writing, exit the routine; we assume here that the error is + //severe and don't attempt to write again. + //Also, this should prevent leaking when the node is stopped + return + } + case <-r.quit: + //graceful shutdown + return + } + } +} + +//send the metrics to the DB +func (r *reporter) save() error { + //create a LevelDB Batch + batch := leveldb.Batch{} + //for each metric in the registry (which is independent)... + r.reg.Each(func(name string, i interface{}) { + metric, ok := i.(metrics.Counter) + if ok { + //assuming every metric here to be a Counter (separate registry) + //...create a snapshot... + ms := metric.Snapshot() + byteVal := make([]byte, 8) + binary.BigEndian.PutUint64(byteVal, uint64(ms.Count())) + //...and save the value to the DB + batch.Put([]byte(name), byteVal) + } + }) + return r.db.Write(&batch, nil) +} diff --git a/p2p/protocols/reporter_test.go b/p2p/protocols/reporter_test.go new file mode 100644 index 000000000..b9f06e674 --- /dev/null +++ b/p2p/protocols/reporter_test.go @@ -0,0 +1,77 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +//TestReporter tests that the metrics being collected for p2p accounting +//are being persisted and available after restart of a node. +//It simulates restarting by just recreating the DB as if the node had restarted. +func TestReporter(t *testing.T) { + //create a test directory + dir, err := ioutil.TempDir("", "reporter-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + //setup the metrics + log.Debug("Setting up metrics first time") + reportInterval := 5 * time.Millisecond + metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + log.Debug("Done.") + + //do some metrics + mBalanceCredit.Inc(12) + mBytesCredit.Inc(34) + mMsgDebit.Inc(9) + + //give the reporter time to write the metrics to DB + time.Sleep(20 * time.Millisecond) + + //set the metrics to nil - this effectively simulates the node having shut down... + mBalanceCredit = nil + mBytesCredit = nil + mMsgDebit = nil + //close the DB also, or we can't create a new one + metrics.Close() + + //setup the metrics again + log.Debug("Setting up metrics second time") + metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + defer metrics.Close() + log.Debug("Done.") + + //now check the metrics, they should have the same value as before "shutdown" + if mBalanceCredit.Count() != 12 { + t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count()) + } + if mBytesCredit.Count() != 34 { + t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count()) + } + if mMsgDebit.Count() != 9 { + t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count()) + } +} diff --git a/swarm/swap/swap.go b/swarm/swap/swap.go index 137eb141d..5d636dc20 100644 --- a/swarm/swap/swap.go +++ b/swarm/swap/swap.go @@ -91,3 +91,8 @@ func (s *Swap) loadState(peer *protocols.Peer) (err error) { } return } + +//Clean up Swap +func (swap *Swap) Close() { + swap.stateStore.Close() +} diff --git a/swarm/swarm.go b/swarm/swarm.go index dc3756d3a..a4ff94051 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -66,20 +66,22 @@ var ( // the swarm stack type Swarm struct { - config *api.Config // swarm configuration - api *api.API // high level api layer (fs/manifest) - dns api.Resolver // DNS registrar - fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support - streamer *stream.Registry - bzz *network.Bzz // the logistic manager - backend chequebook.Backend // simple blockchain Backend - privateKey *ecdsa.PrivateKey - corsString string - swapEnabled bool - netStore *storage.NetStore - sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit - ps *pss.Pss - swap *swap.Swap + config *api.Config // swarm configuration + api *api.API // high level api layer (fs/manifest) + dns api.Resolver // DNS registrar + fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support + streamer *stream.Registry + bzz *network.Bzz // the logistic manager + backend chequebook.Backend // simple blockchain Backend + privateKey *ecdsa.PrivateKey + corsString string + swapEnabled bool + netStore *storage.NetStore + sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit + ps *pss.Pss + swap *swap.Swap + stateStore *state.DBStore + accountingMetrics *protocols.AccountingMetrics tracerClose io.Closer } @@ -134,7 +136,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e LightNode: config.LightNodeEnabled, } - stateStore, err := state.NewDBStore(filepath.Join(config.Path, "state-store.db")) + self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db")) if err != nil { return } @@ -179,6 +181,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e return nil, err } self.swap = swap.New(balancesStore) + self.accountingMetrics = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db")) } var nodeID enode.ID @@ -203,7 +206,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, } - self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions, self.swap) + self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams) @@ -226,7 +229,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e log.Debug("Setup local storage") - self.bzz = network.NewBzz(bzzconfig, to, stateStore, self.streamer.GetSpec(), self.streamer.Run) + self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run) // Pss = postal service over swarm (devp2p over bzz) self.ps, err = pss.NewPss(to, config.Pss) @@ -446,14 +449,24 @@ func (self *Swarm) Stop() error { ch.Stop() ch.Save() } - + if self.swap != nil { + self.swap.Close() + } + if self.accountingMetrics != nil { + self.accountingMetrics.Close() + } if self.netStore != nil { self.netStore.Close() } self.sfs.Stop() stopCounter.Inc(1) self.streamer.Stop() - return self.bzz.Stop() + + err := self.bzz.Stop() + if self.stateStore != nil { + self.stateStore.Close() + } + return err } // implements the node.Service interface |