aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/depo.go15
-rw-r--r--swarm/network/hive.go10
-rw-r--r--swarm/network/kademlia/kademlia.go28
-rw-r--r--swarm/network/protocol.go24
4 files changed, 76 insertions, 1 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
index 17540d2f9..5ffbf8be1 100644
--- a/swarm/network/depo.go
+++ b/swarm/network/depo.go
@@ -23,9 +23,19 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/storage"
)
+//metrics variables
+var (
+ syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil)
+ syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil)
+ syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil)
+ syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil)
+ syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil)
+)
+
// Handler for storage/retrieval related protocol requests
// implements the StorageHandler interface used by the bzz protocol
type Depo struct {
@@ -107,6 +117,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
// not found in memory cache, ie., a genuine store request
// create chunk
+ syncReceiveCount.Inc(1)
chunk = storage.NewChunk(req.Key, nil)
case chunk.SData == nil:
@@ -116,6 +127,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
default:
// data is found, store request ignored
// this should update access count?
+ syncReceiveIgnore.Inc(1)
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
islocal = true
//return
@@ -172,11 +184,14 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
SData: chunk.SData,
requestTimeout: req.timeout, //
}
+ syncSendCount.Inc(1)
p.syncer.addRequest(sreq, DeliverReq)
} else {
+ syncSendRefused.Inc(1)
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
}
} else {
+ syncSendNotFound.Inc(1)
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
}
}
diff --git a/swarm/network/hive.go b/swarm/network/hive.go
index 2504a4610..8404ffcc2 100644
--- a/swarm/network/hive.go
+++ b/swarm/network/hive.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/swarm/network/kademlia"
@@ -39,6 +40,12 @@ import (
// connections and disconnections are reported and relayed
// to keep the nodetable uptodate
+var (
+ peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil)
+ addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil)
+ removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil)
+)
+
type Hive struct {
listenAddr func() string
callInterval uint64
@@ -192,6 +199,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee
func (self *Hive) keepAlive() {
alarm := time.NewTicker(time.Duration(self.callInterval)).C
for {
+ peersNumGauge.Update(int64(self.kad.Count()))
select {
case <-alarm:
if self.kad.DBCount() > 0 {
@@ -223,6 +231,7 @@ func (self *Hive) Stop() error {
// called at the end of a successful protocol handshake
func (self *Hive) addPeer(p *peer) error {
+ addPeerCounter.Inc(1)
defer func() {
select {
case self.more <- true:
@@ -247,6 +256,7 @@ func (self *Hive) addPeer(p *peer) error {
// called after peer disconnected
func (self *Hive) removePeer(p *peer) {
+ removePeerCounter.Inc(1)
log.Debug(fmt.Sprintf("bee %v removed", p))
self.kad.Off(p, saveSync)
select {
diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go
index 0abc42a19..b5999b52d 100644
--- a/swarm/network/kademlia/kademlia.go
+++ b/swarm/network/kademlia/kademlia.go
@@ -24,6 +24,16 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+)
+
+//metrics variables
+//For metrics, we want to count how many times peers are added/removed
+//at a certain index. Thus we do that with an array of counters with
+//entry for each index
+var (
+ bucketAddIndexCount []metrics.Counter
+ bucketRmIndexCount []metrics.Counter
)
const (
@@ -88,12 +98,14 @@ type Node interface {
// params is KadParams configuration
func New(addr Address, params *KadParams) *Kademlia {
buckets := make([][]Node, params.MaxProx+1)
- return &Kademlia{
+ kad := &Kademlia{
addr: addr,
KadParams: params,
buckets: buckets,
db: newKadDb(addr, params),
}
+ kad.initMetricsVariables()
+ return kad
}
// accessor for KAD base address
@@ -138,6 +150,7 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error
// TODO: give priority to peers with active traffic
if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
self.buckets[index] = append(bucket, node)
+ bucketAddIndexCount[index].Inc(1)
log.Debug(fmt.Sprintf("add node %v to table", node))
self.setProxLimit(index, true)
record.node = node
@@ -178,6 +191,7 @@ func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
defer self.lock.Unlock()
index := self.proximityBin(node.Addr())
+ bucketRmIndexCount[index].Inc(1)
bucket := self.buckets[index]
for i := 0; i < len(bucket); i++ {
if node.Addr() == bucket[i].Addr() {
@@ -426,3 +440,15 @@ func (self *Kademlia) String() string {
rows = append(rows, "=========================================================================")
return strings.Join(rows, "\n")
}
+
+//We have to build up the array of counters for each index
+func (self *Kademlia) initMetricsVariables() {
+ //create the arrays
+ bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1)
+ bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1)
+ //at each index create a metrics counter
+ for i := 0; i < (self.KadParams.MaxProx + 1); i++ {
+ bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil)
+ bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil)
+ }
+}
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
index a418c1dbb..1cbe00a97 100644
--- a/swarm/network/protocol.go
+++ b/swarm/network/protocol.go
@@ -39,12 +39,26 @@ import (
"github.com/ethereum/go-ethereum/contracts/chequebook"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
"github.com/ethereum/go-ethereum/swarm/services/swap/swap"
"github.com/ethereum/go-ethereum/swarm/storage"
)
+//metrics variables
+var (
+ storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil)
+ retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil)
+ peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil)
+ syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil)
+ unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil)
+ deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil)
+ paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil)
+ invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil)
+ handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil)
+)
+
const (
Version = 0
ProtocolLength = uint64(8)
@@ -206,6 +220,7 @@ func (self *bzz) handle() error {
case storeRequestMsg:
// store requests are dispatched to netStore
+ storeRequestMsgCounter.Inc(1)
var req storeRequestMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<- %v: %v", msg, err)
@@ -221,6 +236,7 @@ func (self *bzz) handle() error {
case retrieveRequestMsg:
// retrieve Requests are dispatched to netStore
+ retrieveRequestMsgCounter.Inc(1)
var req retrieveRequestMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<- %v: %v", msg, err)
@@ -241,6 +257,7 @@ func (self *bzz) handle() error {
case peersMsg:
// response to lookups and immediate response to retrieve requests
// dispatches new peer data to the hive that adds them to KADDB
+ peersMsgCounter.Inc(1)
var req peersMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<- %v: %v", msg, err)
@@ -250,6 +267,7 @@ func (self *bzz) handle() error {
self.hive.HandlePeersMsg(&req, &peer{bzz: self})
case syncRequestMsg:
+ syncRequestMsgCounter.Inc(1)
var req syncRequestMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<- %v: %v", msg, err)
@@ -260,6 +278,7 @@ func (self *bzz) handle() error {
case unsyncedKeysMsg:
// coming from parent node offering
+ unsyncedKeysMsgCounter.Inc(1)
var req unsyncedKeysMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<- %v: %v", msg, err)
@@ -274,6 +293,7 @@ func (self *bzz) handle() error {
case deliveryRequestMsg:
// response to syncKeysMsg hashes filtered not existing in db
// also relays the last synced state to the source
+ deliverRequestMsgCounter.Inc(1)
var req deliveryRequestMsgData
if err := msg.Decode(&req); err != nil {
return fmt.Errorf("<-msg %v: %v", msg, err)
@@ -287,6 +307,7 @@ func (self *bzz) handle() error {
case paymentMsg:
// swap protocol message for payment, Units paid for, Cheque paid with
+ paymentMsgCounter.Inc(1)
if self.swapEnabled {
var req paymentMsgData
if err := msg.Decode(&req); err != nil {
@@ -298,6 +319,7 @@ func (self *bzz) handle() error {
default:
// no other message is allowed
+ invalidMsgCounter.Inc(1)
return fmt.Errorf("invalid message code: %v", msg.Code)
}
return nil
@@ -332,6 +354,8 @@ func (self *bzz) handleStatus() (err error) {
return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg)
}
+ handleStatusMsgCounter.Inc(1)
+
if msg.Size > ProtocolMaxMsgSize {
return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
}