aboutsummaryrefslogtreecommitdiffstats
path: root/eth/fetcher/fetcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/fetcher/fetcher.go')
-rw-r--r--eth/fetcher/fetcher.go35
1 files changed, 26 insertions, 9 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index f54256788..b8ec1fc55 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -347,18 +347,19 @@ func (f *Fetcher) loop() {
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
- announceMeter.Mark(1)
+ propAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
+ propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
- discardMeter.Mark(1)
+ propAnnounceDropMeter.Mark(1)
break
}
}
@@ -377,7 +378,7 @@ func (f *Fetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
- broadcastMeter.Mark(1)
+ propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
case hash := <-f.done:
@@ -425,10 +426,12 @@ func (f *Fetcher) loop() {
}
if fetchBlocks != nil {
// Use old eth/61 protocol to retrieve whole blocks
+ blockFetchMeter.Mark(int64(len(hashes)))
fetchBlocks(hashes)
} else {
// Use new eth/62 protocol to retrieve headers first
for _, hash := range hashes {
+ headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}
@@ -467,6 +470,7 @@ func (f *Fetcher) loop() {
if f.completingHook != nil {
f.completingHook(hashes)
}
+ bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending
@@ -480,6 +484,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ blockFilterInMeter.Mark(int64(len(blocks)))
explicit, download := []*types.Block{}, []*types.Block{}
for _, block := range blocks {
@@ -498,6 +503,7 @@ func (f *Fetcher) loop() {
}
}
+ blockFilterOutMeter.Mark(int64(len(download)))
select {
case filter <- download:
case <-f.quit:
@@ -520,6 +526,8 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ headerFilterInMeter.Mark(int64(len(task.headers)))
+
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
@@ -544,7 +552,10 @@ func (f *Fetcher) loop() {
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
- complete = append(complete, types.NewBlockWithHeader(header))
+ block := types.NewBlockWithHeader(header)
+ block.ReceivedAt = task.time
+
+ complete = append(complete, block)
f.completing[hash] = announce
continue
}
@@ -559,6 +570,7 @@ func (f *Fetcher) loop() {
unknown = append(unknown, header)
}
}
+ headerFilterOutMeter.Mark(int64(len(unknown)))
select {
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
case <-f.quit:
@@ -590,6 +602,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{}
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
@@ -606,7 +619,10 @@ func (f *Fetcher) loop() {
matched = true
if f.getBlock(hash) == nil {
- blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]))
+ block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
+ block.ReceivedAt = task.time
+
+ blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
@@ -621,6 +637,7 @@ func (f *Fetcher) loop() {
}
}
+ bodyFilterOutMeter.Mark(int64(len(task.transactions)))
select {
case filter <- task:
case <-f.quit:
@@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
count := f.queues[peer] + 1
if count > blockLimit {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
+ propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
- discardMeter.Mark(1)
+ propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
@@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
switch err := f.validateBlock(block, parent); err {
case nil:
// All ok, quickly propagate to our peers
- broadcastTimer.UpdateSince(block.ReceivedAt)
+ propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case core.BlockFutureErr:
- futureMeter.Mark(1)
// Weird future block, don't fail, but neither propagate
default:
@@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
return
}
// If import succeeded, broadcast the block
- announceTimer.UpdateSince(block.ReceivedAt)
+ propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed