diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-08-25 18:57:49 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-08-25 22:48:47 +0800 |
commit | 17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d (patch) | |
tree | 946f0e2478d8002194c09705e638c74b9d3a8ec5 /eth/fetcher | |
parent | 47a7fe5d22fe2a6be783f6576070814fe951eaaf (diff) | |
download | dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.gz dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.bz2 dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.lz dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.xz dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.zst dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.zip |
eth: update metrics collection to handle eth/62 algos
Diffstat (limited to 'eth/fetcher')
-rw-r--r-- | eth/fetcher/fetcher.go | 35 | ||||
-rw-r--r-- | eth/fetcher/metrics.go | 26 |
2 files changed, 46 insertions, 15 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index f54256788..b8ec1fc55 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -347,18 +347,19 @@ func (f *Fetcher) loop() { case notification := <-f.notify: // A block was announced, make sure the peer isn't DOSing us - announceMeter.Mark(1) + propAnnounceInMeter.Mark(1) count := f.announces[notification.origin] + 1 if count > hashLimit { glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit) + propAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist) - discardMeter.Mark(1) + propAnnounceDropMeter.Mark(1) break } } @@ -377,7 +378,7 @@ func (f *Fetcher) loop() { case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps - broadcastMeter.Mark(1) + propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block) case hash := <-f.done: @@ -425,10 +426,12 @@ func (f *Fetcher) loop() { } if fetchBlocks != nil { // Use old eth/61 protocol to retrieve whole blocks + blockFetchMeter.Mark(int64(len(hashes))) fetchBlocks(hashes) } else { // Use new eth/62 protocol to retrieve headers first for _, hash := range hashes { + headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } } @@ -467,6 +470,7 @@ func (f *Fetcher) loop() { if f.completingHook != nil { f.completingHook(hashes) } + bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } // Schedule the next fetch if blocks are still pending @@ -480,6 +484,7 @@ func (f *Fetcher) loop() { case <-f.quit: return } + blockFilterInMeter.Mark(int64(len(blocks))) explicit, download := []*types.Block{}, []*types.Block{} for _, block := range blocks { @@ -498,6 +503,7 @@ func (f *Fetcher) loop() { } } + blockFilterOutMeter.Mark(int64(len(download))) select { case filter <- download: case <-f.quit: @@ -520,6 +526,8 @@ func (f *Fetcher) loop() { case <-f.quit: return } + headerFilterInMeter.Mark(int64(len(task.headers))) + // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} @@ -544,7 +552,10 @@ func (f *Fetcher) loop() { if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4]) - complete = append(complete, types.NewBlockWithHeader(header)) + block := types.NewBlockWithHeader(header) + block.ReceivedAt = task.time + + complete = append(complete, block) f.completing[hash] = announce continue } @@ -559,6 +570,7 @@ func (f *Fetcher) loop() { unknown = append(unknown, header) } } + headerFilterOutMeter.Mark(int64(len(unknown))) select { case filter <- &headerFilterTask{headers: unknown, time: task.time}: case <-f.quit: @@ -590,6 +602,7 @@ func (f *Fetcher) loop() { case <-f.quit: return } + bodyFilterInMeter.Mark(int64(len(task.transactions))) blocks := []*types.Block{} for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { @@ -606,7 +619,10 @@ func (f *Fetcher) loop() { matched = true if f.getBlock(hash) == nil { - blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])) + block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) + block.ReceivedAt = task.time + + blocks = append(blocks, block) } else { f.forgetHash(hash) } @@ -621,6 +637,7 @@ func (f *Fetcher) loop() { } } + bodyFilterOutMeter.Mark(int64(len(task.transactions))) select { case filter <- task: case <-f.quit: @@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { count := f.queues[peer] + 1 if count > blockLimit { glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit) + propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) - discardMeter.Mark(1) + propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } @@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) { switch err := f.validateBlock(block, parent); err { case nil: // All ok, quickly propagate to our peers - broadcastTimer.UpdateSince(block.ReceivedAt) + propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) case core.BlockFutureErr: - futureMeter.Mark(1) // Weird future block, don't fail, but neither propagate default: @@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) { return } // If import succeeded, broadcast the block - announceTimer.UpdateSince(block.ReceivedAt) + propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) // Invoke the testing hook if needed diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index 76cc49226..b82d3ca01 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -23,10 +23,24 @@ import ( ) var ( - announceMeter = metrics.NewMeter("eth/sync/RemoteAnnounces") - announceTimer = metrics.NewTimer("eth/sync/LocalAnnounces") - broadcastMeter = metrics.NewMeter("eth/sync/RemoteBroadcasts") - broadcastTimer = metrics.NewTimer("eth/sync/LocalBroadcasts") - discardMeter = metrics.NewMeter("eth/sync/DiscardedBlocks") - futureMeter = metrics.NewMeter("eth/sync/FutureBlocks") + propAnnounceInMeter = metrics.NewMeter("eth/fetcher/prop/announces/in") + propAnnounceOutTimer = metrics.NewTimer("eth/fetcher/prop/announces/out") + propAnnounceDropMeter = metrics.NewMeter("eth/fetcher/prop/announces/drop") + propAnnounceDOSMeter = metrics.NewMeter("eth/fetcher/prop/announces/dos") + + propBroadcastInMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/in") + propBroadcastOutTimer = metrics.NewTimer("eth/fetcher/prop/broadcasts/out") + propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop") + propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos") + + blockFetchMeter = metrics.NewMeter("eth/fetcher/fetch/blocks") + headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers") + bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies") + + blockFilterInMeter = metrics.NewMeter("eth/fetcher/filter/blocks/in") + blockFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/blocks/out") + headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in") + headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out") + bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in") + bodyFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/bodies/out") ) |