aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go139
1 files changed, 92 insertions, 47 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
index 5f4889792..80168b206 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -1,37 +1,37 @@
package blockpool
import (
- "bytes"
"math/big"
"math/rand"
"sort"
"sync"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
- "github.com/ethereum/go-ethereum/common"
)
+// the blockpool's model of a peer
type peer struct {
lock sync.RWMutex
// last known blockchain status
td *big.Int
- currentBlockHash []byte
+ currentBlockHash common.Hash
currentBlock *types.Block
- parentHash []byte
+ parentHash common.Hash
headSection *section
id string
// peer callbacks
- requestBlockHashes func([]byte) error
- requestBlocks func([][]byte) error
+ requestBlockHashes func(common.Hash) error
+ requestBlocks func([]common.Hash) error
peerError func(*errs.Error)
errors *errs.Errors
- sections [][]byte
+ sections []common.Hash
// channels to push new head block and head section for peer a
currentBlockC chan *types.Block
@@ -48,6 +48,8 @@ type peer struct {
blocksRequestTimer <-chan time.Time
suicideC <-chan time.Time
+ addToBlacklist func(id string)
+
idle bool
}
@@ -56,20 +58,21 @@ type peer struct {
type peers struct {
lock sync.RWMutex
- bp *BlockPool
- errors *errs.Errors
- peers map[string]*peer
- best *peer
- status *status
+ bp *BlockPool
+ errors *errs.Errors
+ peers map[string]*peer
+ best *peer
+ status *status
+ blacklist map[string]time.Time
}
// peer constructor
func (self *peers) newPeer(
td *big.Int,
- currentBlockHash []byte,
+ currentBlockHash common.Hash,
id string,
- requestBlockHashes func([]byte) error,
- requestBlocks func([][]byte) error,
+ requestBlockHashes func(common.Hash) error,
+ requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (p *peer) {
@@ -85,29 +88,51 @@ func (self *peers) newPeer(
headSectionC: make(chan *section),
bp: self.bp,
idle: true,
+ addToBlacklist: self.addToBlacklist,
}
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
-// dispatches an error to a peer if still connected
+// dispatches an error to a peer if still connected, adds it to the blacklist
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
- defer self.lock.RUnlock()
peer, ok := self.peers[id]
+ self.lock.RUnlock()
if ok {
peer.addError(code, format, params)
}
- // blacklisting comes here
+ self.addToBlacklist(id)
+}
+
+// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
+func (self *peers) addToBlacklist(id string) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blacklist[id] = time.Now()
+}
+
+// suspended checks if peer is still suspended
+func (self *peers) suspended(id string) (s bool) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if suspendedAt, ok := self.blacklist[id]; ok {
+ if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
+ // no longer suspended, delete entry
+ delete(self.blacklist, id)
+ }
+ }
+ return
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
+ self.addToBlacklist(self.id)
}
-func (self *peer) setChainInfo(td *big.Int, c []byte) {
+func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
@@ -115,7 +140,7 @@ func (self *peer) setChainInfo(td *big.Int, c []byte) {
self.currentBlockHash = c
self.currentBlock = nil
- self.parentHash = nil
+ self.parentHash = common.Hash{}
self.headSection = nil
}
@@ -139,8 +164,8 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
}()
}
-func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
- // distribute block request among known peers
+// distribute block request among known peers
+func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
self.lock.RLock()
defer self.lock.RUnlock()
peerCount := len(self.peers)
@@ -175,24 +200,33 @@ func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
}
// addPeer implements the logic for blockpool.AddPeer
-// returns true iff peer is promoted as best peer in the pool
+// returns 2 bool values
+// 1. true iff peer is promoted as best peer in the pool
+// 2. true iff peer is still suspended
func (self *peers) addPeer(
td *big.Int,
- currentBlockHash []byte,
+ currentBlockHash common.Hash,
id string,
- requestBlockHashes func([]byte) error,
- requestBlocks func([][]byte) error,
+ requestBlockHashes func(common.Hash) error,
+ requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
-) (best bool) {
+) (best bool, suspended bool) {
- var previousBlockHash []byte
+ var previousBlockHash common.Hash
+ if self.suspended(id) {
+ suspended = true
+ return
+ }
self.lock.Lock()
p, found := self.peers[id]
if found {
- if !bytes.Equal(p.currentBlockHash, currentBlockHash) {
+ // when called on an already connected peer, it means a newBlockMsg is received
+ // peer head info is updated
+ if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash)
+
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
@@ -210,18 +244,18 @@ func (self *peers) addPeer(
}
self.lock.Unlock()
- // check peer current head
+ // check if peer's current head block is known
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
- return false
+ return false, false
}
if self.best == p {
// new block update for active current best peer -> request hashes
plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
- if previousBlockHash != nil {
+ if (previousBlockHash != common.Hash{}) {
if entry := self.bp.get(previousBlockHash); entry != nil {
p.headSectionC <- nil
self.bp.activateChain(entry.section, p, nil)
@@ -230,7 +264,8 @@ func (self *peers) addPeer(
}
best = true
} else {
- currentTD := common.Big0
+ // baseline is our own TD
+ currentTD := self.bp.getTD()
if self.best != nil {
currentTD = self.best.td
}
@@ -238,7 +273,7 @@ func (self *peers) addPeer(
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> promoted best peer", id)
+ plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
self.bp.switchPeer(self.best, p)
self.best = p
best = true
@@ -258,13 +293,13 @@ func (self *peers) removePeer(id string) {
}
delete(self.peers, id)
- plog.Debugf("addPeer: remove peer <%v>", id)
+ plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td)
// if current best peer is removed, need to find a better one
if self.best == p {
var newp *peer
- // FIXME: own TD
- max := common.Big0
+ // only peers that are ahead of us are considered
+ max := self.bp.getTD()
// peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers {
if pp.td.Cmp(max) > 0 {
@@ -276,7 +311,7 @@ func (self *peers) removePeer(id string) {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td)
+ plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
} else {
plog.Warnln("addPeer: no suitable peers found")
}
@@ -289,6 +324,7 @@ func (self *peers) removePeer(id string) {
func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain
+ // by closing the old peer's switchC channel
if oldp != nil {
plog.DebugDetailf("<%s> quit peer processes", oldp.id)
close(oldp.switchC)
@@ -318,15 +354,15 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
var connected = make(map[string]*section)
- var sections [][]byte
+ var sections []common.Hash
for _, hash := range newp.sections {
plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
// if section not connected (ie, top of a contiguous sequence of sections)
- if connected[string(hash)] == nil {
+ if connected[hash.Str()] == nil {
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
if entry := self.get(hash); entry != nil {
self.activateChain(entry.section, newp, connected)
- connected[string(hash)] = entry.section
+ connected[hash.Str()] = entry.section
sections = append(sections, hash)
}
}
@@ -341,11 +377,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
// newp activating section process changes the quit channel for this reason
if oldp != nil {
plog.DebugDetailf("<%s> quit section processes", oldp.id)
- //
close(oldp.idleC)
}
}
+// getPeer looks up peer by id, returns peer and a bool value
+// that is true iff peer is current best peer
func (self *peers) getPeer(id string) (p *peer, best bool) {
self.lock.RLock()
defer self.lock.RUnlock()
@@ -356,6 +393,8 @@ func (self *peers) getPeer(id string) (p *peer, best bool) {
return
}
+// head section process
+
func (self *peer) handleSection(sec *section) {
self.lock.Lock()
defer self.lock.Unlock()
@@ -396,7 +435,7 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) {
plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
} else {
plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
- self.requestBlocks([][]byte{self.currentBlockHash})
+ self.requestBlocks([]common.Hash{self.currentBlockHash})
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
return
}
@@ -427,9 +466,14 @@ func (self *peer) getBlockHashes() {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.badPeers[self.id]++
} else {
- headKey := string(self.parentHash)
+ if self.currentBlock.Td != nil {
+ if self.td.Cmp(self.currentBlock.Td) != 0 {
+ self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash)
+ }
+ }
+ headKey := self.parentHash.Str()
height := self.bp.status.chain[headKey] + 1
- self.bp.status.chain[string(self.currentBlockHash)] = height
+ self.bp.status.chain[self.currentBlockHash.Str()] = height
if height > self.bp.status.values.LongestChain {
self.bp.status.values.LongestChain = height
}
@@ -446,6 +490,7 @@ func (self *peer) getBlockHashes() {
block: self.currentBlock,
hashBy: self.id,
blockBy: self.id,
+ td: self.td,
}
self.bp.newSection([]*node{n}).activate(self)
} else {
@@ -486,7 +531,7 @@ func (self *peer) run() {
LOOP:
for {
select {
- // to minitor section process behaviou
+ // to minitor section process behaviour
case <-ping.C:
plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
@@ -539,7 +584,7 @@ LOOP:
// quit
case <-quit:
- self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, self.currentBlockHash))
+ self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++