diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/discover/table.go | 2 | ||||
-rw-r--r-- | p2p/discv5/net.go | 9 | ||||
-rw-r--r-- | p2p/protocols/accounting.go | 43 | ||||
-rw-r--r-- | p2p/protocols/accounting_simulation_test.go | 10 | ||||
-rw-r--r-- | p2p/protocols/protocol.go | 2 | ||||
-rw-r--r-- | p2p/protocols/reporter.go | 147 | ||||
-rw-r--r-- | p2p/protocols/reporter_test.go | 77 | ||||
-rw-r--r-- | p2p/server.go | 9 | ||||
-rw-r--r-- | p2p/simulations/network.go | 75 |
9 files changed, 352 insertions, 22 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index afd4c9a27..9f7f1d41b 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -434,7 +434,7 @@ func (tab *Table) loadSeedNodes() { for i := range seeds { seed := seeds[i] age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }} - log.Debug("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) + log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) tab.add(seed) } } diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index a6cabf080..cdeb28dd5 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -567,12 +567,11 @@ loop: net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node, topic Topic) []byte { if n.state != nil && n.state.canQuery { return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration - } else { - if n.state == unknown { - net.ping(n, n.addr()) - } - return nil } + if n.state == unknown { + net.ping(n, n.addr()) + } + return nil }) case <-statsDump.C: diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 06a1a5845..770406a27 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -16,29 +16,32 @@ package protocols -import "github.com/ethereum/go-ethereum/metrics" +import ( + "time" + + "github.com/ethereum/go-ethereum/metrics" +) //define some metrics var ( - //NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions //All metrics are cumulative //total amount of units credited - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", nil) + mBalanceCredit metrics.Counter //total amount of units debited - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", nil) + mBalanceDebit metrics.Counter //total amount of bytes credited - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", nil) + mBytesCredit metrics.Counter //total amount of bytes debited - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", nil) + mBytesDebit metrics.Counter //total amount of credited messages - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", nil) + mMsgCredit metrics.Counter //total amount of debited messages - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", nil) + mMsgDebit metrics.Counter //how many times local node had to drop remote peers - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", nil) + mPeerDrops metrics.Counter //how many times local node overdrafted and dropped - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", nil) + mSelfDrops metrics.Counter ) //Prices defines how prices are being passed on to the accounting instance @@ -105,6 +108,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting { return ah } +//SetupAccountingMetrics creates a separate registry for p2p accounting metrics; +//this registry should be independent of any other metrics as it persists at different endpoints. +//It also instantiates the given metrics and starts the persisting go-routine which +//at the passed interval writes the metrics to a LevelDB +func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { + //create an empty registry + registry := metrics.NewRegistry() + //instantiate the metrics + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) + //create the DB and start persisting + return NewAccountingMetrics(registry, reportInterval, path) +} + //Implement Hook.Send // Send takes a peer, a size and a msg and // - calculates the cost for the local node sending a msg of size to peer using the Prices interface diff --git a/p2p/protocols/accounting_simulation_test.go b/p2p/protocols/accounting_simulation_test.go index 65b737abe..e90a1d81d 100644 --- a/p2p/protocols/accounting_simulation_test.go +++ b/p2p/protocols/accounting_simulation_test.go @@ -20,7 +20,10 @@ import ( "context" "flag" "fmt" + "io/ioutil" "math/rand" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -66,6 +69,13 @@ func init() { func TestAccountingSimulation(t *testing.T) { //setup the balances objects for every node bal := newBalances(*nodes) + //setup the metrics system or tests will fail trying to write metrics + dir, err := ioutil.TempDir("", "account-sim") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db")) //define the node.Service for this test services := adapters.Services{ "accounting": func(ctx *adapters.ServiceContext) (node.Service, error) { diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 7dddd852f..b16720dd3 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -381,7 +381,7 @@ func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) // * arguments // * context // * the local handshake to be sent to the remote peer -// * funcion to be called on the remote handshake (can be nil) +// * function to be called on the remote handshake (can be nil) // * expects a remote handshake back of the same type // * the dialing peer needs to send the handshake first and then waits for remote // * the listening peer waits for the remote handshake and then sends it diff --git a/p2p/protocols/reporter.go b/p2p/protocols/reporter.go new file mode 100644 index 000000000..215d4fe31 --- /dev/null +++ b/p2p/protocols/reporter.go @@ -0,0 +1,147 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "encoding/binary" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + + "github.com/syndtr/goleveldb/leveldb" +) + +//AccountMetrics abstracts away the metrics DB and +//the reporter to persist metrics +type AccountingMetrics struct { + reporter *reporter +} + +//Close will be called when the node is being shutdown +//for a graceful cleanup +func (am *AccountingMetrics) Close() { + close(am.reporter.quit) + am.reporter.db.Close() +} + +//reporter is an internal structure used to write p2p accounting related +//metrics to a LevelDB. It will periodically write the accrued metrics to the DB. +type reporter struct { + reg metrics.Registry //the registry for these metrics (independent of other metrics) + interval time.Duration //duration at which the reporter will persist metrics + db *leveldb.DB //the actual DB + quit chan struct{} //quit the reporter loop +} + +//NewMetricsDB creates a new LevelDB instance used to persist metrics defined +//inside p2p/protocols/accounting.go +func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *AccountingMetrics { + var val = make([]byte, 8) + var err error + + //Create the LevelDB + db, err := leveldb.OpenFile(path, nil) + if err != nil { + log.Error(err.Error()) + return nil + } + + //Check for all defined metrics that there is a value in the DB + //If there is, assign it to the metric. This means that the node + //has been running before and that metrics have been persisted. + metricsMap := map[string]metrics.Counter{ + "account.balance.credit": mBalanceCredit, + "account.balance.debit": mBalanceDebit, + "account.bytes.credit": mBytesCredit, + "account.bytes.debit": mBytesDebit, + "account.msg.credit": mMsgCredit, + "account.msg.debit": mMsgDebit, + "account.peerdrops": mPeerDrops, + "account.selfdrops": mSelfDrops, + } + //iterate the map and get the values + for key, metric := range metricsMap { + val, err = db.Get([]byte(key), nil) + //until the first time a value is being written, + //this will return an error. + //it could be beneficial though to log errors later, + //but that would require a different logic + if err == nil { + metric.Inc(int64(binary.BigEndian.Uint64(val))) + } + } + + //create the reporter + rep := &reporter{ + reg: r, + interval: d, + db: db, + quit: make(chan struct{}), + } + + //run the go routine + go rep.run() + + m := &AccountingMetrics{ + reporter: rep, + } + + return m +} + +//run is the goroutine which periodically sends the metrics to the configured LevelDB +func (r *reporter) run() { + intervalTicker := time.NewTicker(r.interval) + + for { + select { + case <-intervalTicker.C: + //at each tick send the metrics + if err := r.save(); err != nil { + log.Error("unable to send metrics to LevelDB", "err", err) + //If there is an error in writing, exit the routine; we assume here that the error is + //severe and don't attempt to write again. + //Also, this should prevent leaking when the node is stopped + return + } + case <-r.quit: + //graceful shutdown + return + } + } +} + +//send the metrics to the DB +func (r *reporter) save() error { + //create a LevelDB Batch + batch := leveldb.Batch{} + //for each metric in the registry (which is independent)... + r.reg.Each(func(name string, i interface{}) { + metric, ok := i.(metrics.Counter) + if ok { + //assuming every metric here to be a Counter (separate registry) + //...create a snapshot... + ms := metric.Snapshot() + byteVal := make([]byte, 8) + binary.BigEndian.PutUint64(byteVal, uint64(ms.Count())) + //...and save the value to the DB + batch.Put([]byte(name), byteVal) + } + }) + return r.db.Write(&batch, nil) +} diff --git a/p2p/protocols/reporter_test.go b/p2p/protocols/reporter_test.go new file mode 100644 index 000000000..b9f06e674 --- /dev/null +++ b/p2p/protocols/reporter_test.go @@ -0,0 +1,77 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +//TestReporter tests that the metrics being collected for p2p accounting +//are being persisted and available after restart of a node. +//It simulates restarting by just recreating the DB as if the node had restarted. +func TestReporter(t *testing.T) { + //create a test directory + dir, err := ioutil.TempDir("", "reporter-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + //setup the metrics + log.Debug("Setting up metrics first time") + reportInterval := 5 * time.Millisecond + metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + log.Debug("Done.") + + //do some metrics + mBalanceCredit.Inc(12) + mBytesCredit.Inc(34) + mMsgDebit.Inc(9) + + //give the reporter time to write the metrics to DB + time.Sleep(20 * time.Millisecond) + + //set the metrics to nil - this effectively simulates the node having shut down... + mBalanceCredit = nil + mBytesCredit = nil + mMsgDebit = nil + //close the DB also, or we can't create a new one + metrics.Close() + + //setup the metrics again + log.Debug("Setting up metrics second time") + metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db")) + defer metrics.Close() + log.Debug("Done.") + + //now check the metrics, they should have the same value as before "shutdown" + if mBalanceCredit.Count() != 12 { + t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count()) + } + if mBytesCredit.Count() != 34 { + t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count()) + } + if mMsgDebit.Count() != 9 { + t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count()) + } +} diff --git a/p2p/server.go b/p2p/server.go index 667860863..566f01ffc 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -22,7 +22,6 @@ import ( "crypto/ecdsa" "encoding/hex" "errors" - "fmt" "net" "sort" "sync" @@ -391,7 +390,7 @@ type sharedUDPConn struct { func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { packet, ok := <-s.unhandled if !ok { - return 0, nil, fmt.Errorf("Connection was closed") + return 0, nil, errors.New("Connection was closed") } l := len(packet.Data) if l > len(b) { @@ -425,7 +424,7 @@ func (srv *Server) Start() (err error) { // static fields if srv.PrivateKey == nil { - return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") + return errors.New("Server.PrivateKey must be set to a non-nil key") } if srv.newTransport == nil { srv.newTransport = newRLPX @@ -903,7 +902,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro if dialDest != nil { dialPubkey = new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { - return fmt.Errorf("dial destination doesn't have a secp256k1 public key") + return errors.New("dial destination doesn't have a secp256k1 public key") } } // Run the encryption handshake. @@ -937,7 +936,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro return err } if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { - clog.Trace("Wrong devp2p handshake identity", "phsid", fmt.Sprintf("%x", phs.ID)) + clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 92ccfde81..ab9f582c5 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "sync" "time" @@ -705,8 +706,11 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn return snap, nil } +var snapshotLoadTimeout = 120 * time.Second + // Load loads a network snapshot func (net *Network) Load(snap *Snapshot) error { + // Start nodes. for _, n := range snap.Nodes { if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { return err @@ -718,6 +722,69 @@ func (net *Network) Load(snap *Snapshot) error { return err } } + + // Prepare connection events counter. + allConnected := make(chan struct{}) // closed when all connections are established + done := make(chan struct{}) // ensures that the event loop goroutine is terminated + defer close(done) + + // Subscribe to event channel. + // It needs to be done outside of the event loop goroutine (created below) + // to ensure that the event channel is blocking before connect calls are made. + events := make(chan *Event) + sub := net.Events().Subscribe(events) + defer sub.Unsubscribe() + + go func() { + // Expected number of connections. + total := len(snap.Conns) + // Set of all established connections from the snapshot, not other connections. + // Key array element 0 is the connection One field value, and element 1 connection Other field. + connections := make(map[[2]enode.ID]struct{}, total) + + for { + select { + case e := <-events: + // Ignore control events as they do not represent + // connect or disconnect (Up) state change. + if e.Control { + continue + } + // Detect only connection events. + if e.Type != EventTypeConn { + continue + } + connection := [2]enode.ID{e.Conn.One, e.Conn.Other} + // Nodes are still not connected or have been disconnected. + if !e.Conn.Up { + // Delete the connection from the set of established connections. + // This will prevent false positive in case disconnections happen. + delete(connections, connection) + log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other) + continue + } + // Check that the connection is from the snapshot. + for _, conn := range snap.Conns { + if conn.One == e.Conn.One && conn.Other == e.Conn.Other { + // Add the connection to the set of established connections. + connections[connection] = struct{}{} + if len(connections) == total { + // Signal that all nodes are connected. + close(allConnected) + return + } + + break + } + } + case <-done: + // Load function returned, terminate this goroutine. + return + } + } + }() + + // Start connecting. for _, conn := range snap.Conns { if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { @@ -729,6 +796,14 @@ func (net *Network) Load(snap *Snapshot) error { return err } } + + select { + // Wait until all connections from the snapshot are established. + case <-allConnected: + // Make sure that we do not wait forever. + case <-time.After(snapshotLoadTimeout): + return errors.New("snapshot connections not established") + } return nil } |