aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go90
1 files changed, 68 insertions, 22 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
index d94d6ac46..80168b206 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/errs"
)
+// the blockpool's model of a peer
type peer struct {
lock sync.RWMutex
@@ -47,6 +48,8 @@ type peer struct {
blocksRequestTimer <-chan time.Time
suicideC <-chan time.Time
+ addToBlacklist func(id string)
+
idle bool
}
@@ -55,11 +58,12 @@ type peer struct {
type peers struct {
lock sync.RWMutex
- bp *BlockPool
- errors *errs.Errors
- peers map[string]*peer
- best *peer
- status *status
+ bp *BlockPool
+ errors *errs.Errors
+ peers map[string]*peer
+ best *peer
+ status *status
+ blacklist map[string]time.Time
}
// peer constructor
@@ -84,26 +88,48 @@ func (self *peers) newPeer(
headSectionC: make(chan *section),
bp: self.bp,
idle: true,
+ addToBlacklist: self.addToBlacklist,
}
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
-// dispatches an error to a peer if still connected
+// dispatches an error to a peer if still connected, adds it to the blacklist
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
- defer self.lock.RUnlock()
peer, ok := self.peers[id]
+ self.lock.RUnlock()
if ok {
peer.addError(code, format, params)
}
- // blacklisting comes here
+ self.addToBlacklist(id)
+}
+
+// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
+func (self *peers) addToBlacklist(id string) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blacklist[id] = time.Now()
+}
+
+// suspended checks if peer is still suspended
+func (self *peers) suspended(id string) (s bool) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if suspendedAt, ok := self.blacklist[id]; ok {
+ if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
+ // no longer suspended, delete entry
+ delete(self.blacklist, id)
+ }
+ }
+ return
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
+ self.addToBlacklist(self.id)
}
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
@@ -138,8 +164,8 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
}()
}
+// distribute block request among known peers
func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
- // distribute block request among known peers
self.lock.RLock()
defer self.lock.RUnlock()
peerCount := len(self.peers)
@@ -174,7 +200,9 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
}
// addPeer implements the logic for blockpool.AddPeer
-// returns true iff peer is promoted as best peer in the pool
+// returns 2 bool values
+// 1. true iff peer is promoted as best peer in the pool
+// 2. true iff peer is still suspended
func (self *peers) addPeer(
td *big.Int,
currentBlockHash common.Hash,
@@ -182,16 +210,23 @@ func (self *peers) addPeer(
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
-) (best bool) {
+) (best bool, suspended bool) {
var previousBlockHash common.Hash
+ if self.suspended(id) {
+ suspended = true
+ return
+ }
self.lock.Lock()
p, found := self.peers[id]
if found {
+ // when called on an already connected peer, it means a newBlockMsg is received
+ // peer head info is updated
if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash)
+
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
@@ -209,11 +244,11 @@ func (self *peers) addPeer(
}
self.lock.Unlock()
- // check peer current head
+ // check if peer's current head block is known
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
- return false
+ return false, false
}
if self.best == p {
@@ -229,7 +264,8 @@ func (self *peers) addPeer(
}
best = true
} else {
- currentTD := common.Big0
+ // baseline is our own TD
+ currentTD := self.bp.getTD()
if self.best != nil {
currentTD = self.best.td
}
@@ -237,7 +273,7 @@ func (self *peers) addPeer(
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> promoted best peer", id)
+ plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
self.bp.switchPeer(self.best, p)
self.best = p
best = true
@@ -257,13 +293,13 @@ func (self *peers) removePeer(id string) {
}
delete(self.peers, id)
- plog.Debugf("addPeer: remove peer <%v>", id)
+ plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td)
// if current best peer is removed, need to find a better one
if self.best == p {
var newp *peer
- // FIXME: own TD
- max := common.Big0
+ // only peers that are ahead of us are considered
+ max := self.bp.getTD()
// peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers {
if pp.td.Cmp(max) > 0 {
@@ -275,7 +311,7 @@ func (self *peers) removePeer(id string) {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td)
+ plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
} else {
plog.Warnln("addPeer: no suitable peers found")
}
@@ -288,6 +324,7 @@ func (self *peers) removePeer(id string) {
func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain
+ // by closing the old peer's switchC channel
if oldp != nil {
plog.DebugDetailf("<%s> quit peer processes", oldp.id)
close(oldp.switchC)
@@ -340,11 +377,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
// newp activating section process changes the quit channel for this reason
if oldp != nil {
plog.DebugDetailf("<%s> quit section processes", oldp.id)
- //
close(oldp.idleC)
}
}
+// getPeer looks up peer by id, returns peer and a bool value
+// that is true iff peer is current best peer
func (self *peers) getPeer(id string) (p *peer, best bool) {
self.lock.RLock()
defer self.lock.RUnlock()
@@ -355,6 +393,8 @@ func (self *peers) getPeer(id string) (p *peer, best bool) {
return
}
+// head section process
+
func (self *peer) handleSection(sec *section) {
self.lock.Lock()
defer self.lock.Unlock()
@@ -426,6 +466,11 @@ func (self *peer) getBlockHashes() {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.badPeers[self.id]++
} else {
+ if self.currentBlock.Td != nil {
+ if self.td.Cmp(self.currentBlock.Td) != 0 {
+ self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash)
+ }
+ }
headKey := self.parentHash.Str()
height := self.bp.status.chain[headKey] + 1
self.bp.status.chain[self.currentBlockHash.Str()] = height
@@ -445,6 +490,7 @@ func (self *peer) getBlockHashes() {
block: self.currentBlock,
hashBy: self.id,
blockBy: self.id,
+ td: self.td,
}
self.bp.newSection([]*node{n}).activate(self)
} else {
@@ -485,7 +531,7 @@ func (self *peer) run() {
LOOP:
for {
select {
- // to minitor section process behaviou
+ // to minitor section process behaviour
case <-ping.C:
plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
@@ -538,7 +584,7 @@ LOOP:
// quit
case <-quit:
- self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, self.currentBlockHash))
+ self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++