diff options
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r-- | blockpool/peers.go | 139 |
1 files changed, 92 insertions, 47 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go index 5f4889792..80168b206 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -1,37 +1,37 @@ package blockpool import ( - "bytes" "math/big" "math/rand" "sort" "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" - "github.com/ethereum/go-ethereum/common" ) +// the blockpool's model of a peer type peer struct { lock sync.RWMutex // last known blockchain status td *big.Int - currentBlockHash []byte + currentBlockHash common.Hash currentBlock *types.Block - parentHash []byte + parentHash common.Hash headSection *section id string // peer callbacks - requestBlockHashes func([]byte) error - requestBlocks func([][]byte) error + requestBlockHashes func(common.Hash) error + requestBlocks func([]common.Hash) error peerError func(*errs.Error) errors *errs.Errors - sections [][]byte + sections []common.Hash // channels to push new head block and head section for peer a currentBlockC chan *types.Block @@ -48,6 +48,8 @@ type peer struct { blocksRequestTimer <-chan time.Time suicideC <-chan time.Time + addToBlacklist func(id string) + idle bool } @@ -56,20 +58,21 @@ type peer struct { type peers struct { lock sync.RWMutex - bp *BlockPool - errors *errs.Errors - peers map[string]*peer - best *peer - status *status + bp *BlockPool + errors *errs.Errors + peers map[string]*peer + best *peer + status *status + blacklist map[string]time.Time } // peer constructor func (self *peers) newPeer( td *big.Int, - currentBlockHash []byte, + currentBlockHash common.Hash, id string, - requestBlockHashes func([]byte) error, - requestBlocks func([][]byte) error, + requestBlockHashes func(common.Hash) error, + requestBlocks func([]common.Hash) error, peerError func(*errs.Error), ) (p *peer) { @@ -85,29 +88,51 @@ func (self *peers) newPeer( headSectionC: make(chan *section), bp: self.bp, idle: true, + addToBlacklist: self.addToBlacklist, } // at creation the peer is recorded in the peer pool self.peers[id] = p return } -// dispatches an error to a peer if still connected +// dispatches an error to a peer if still connected, adds it to the blacklist func (self *peers) peerError(id string, code int, format string, params ...interface{}) { self.lock.RLock() - defer self.lock.RUnlock() peer, ok := self.peers[id] + self.lock.RUnlock() if ok { peer.addError(code, format, params) } - // blacklisting comes here + self.addToBlacklist(id) +} + +// record time of offence in blacklist to implement suspension for PeerSuspensionInterval +func (self *peers) addToBlacklist(id string) { + self.lock.Lock() + defer self.lock.Unlock() + self.blacklist[id] = time.Now() +} + +// suspended checks if peer is still suspended +func (self *peers) suspended(id string) (s bool) { + self.lock.Lock() + defer self.lock.Unlock() + if suspendedAt, ok := self.blacklist[id]; ok { + if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s { + // no longer suspended, delete entry + delete(self.blacklist, id) + } + } + return } func (self *peer) addError(code int, format string, params ...interface{}) { err := self.errors.New(code, format, params...) self.peerError(err) + self.addToBlacklist(self.id) } -func (self *peer) setChainInfo(td *big.Int, c []byte) { +func (self *peer) setChainInfo(td *big.Int, c common.Hash) { self.lock.Lock() defer self.lock.Unlock() @@ -115,7 +140,7 @@ func (self *peer) setChainInfo(td *big.Int, c []byte) { self.currentBlockHash = c self.currentBlock = nil - self.parentHash = nil + self.parentHash = common.Hash{} self.headSection = nil } @@ -139,8 +164,8 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) { }() } -func (self *peers) requestBlocks(attempts int, hashes [][]byte) { - // distribute block request among known peers +// distribute block request among known peers +func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { self.lock.RLock() defer self.lock.RUnlock() peerCount := len(self.peers) @@ -175,24 +200,33 @@ func (self *peers) requestBlocks(attempts int, hashes [][]byte) { } // addPeer implements the logic for blockpool.AddPeer -// returns true iff peer is promoted as best peer in the pool +// returns 2 bool values +// 1. true iff peer is promoted as best peer in the pool +// 2. true iff peer is still suspended func (self *peers) addPeer( td *big.Int, - currentBlockHash []byte, + currentBlockHash common.Hash, id string, - requestBlockHashes func([]byte) error, - requestBlocks func([][]byte) error, + requestBlockHashes func(common.Hash) error, + requestBlocks func([]common.Hash) error, peerError func(*errs.Error), -) (best bool) { +) (best bool, suspended bool) { - var previousBlockHash []byte + var previousBlockHash common.Hash + if self.suspended(id) { + suspended = true + return + } self.lock.Lock() p, found := self.peers[id] if found { - if !bytes.Equal(p.currentBlockHash, currentBlockHash) { + // when called on an already connected peer, it means a newBlockMsg is received + // peer head info is updated + if p.currentBlockHash != currentBlockHash { previousBlockHash = p.currentBlockHash plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash)) p.setChainInfo(td, currentBlockHash) + self.status.lock.Lock() self.status.values.NewBlocks++ self.status.lock.Unlock() @@ -210,18 +244,18 @@ func (self *peers) addPeer( } self.lock.Unlock() - // check peer current head + // check if peer's current head block is known if self.bp.hasBlock(currentBlockHash) { // peer not ahead plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) - return false + return false, false } if self.best == p { // new block update for active current best peer -> request hashes plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) - if previousBlockHash != nil { + if (previousBlockHash != common.Hash{}) { if entry := self.bp.get(previousBlockHash); entry != nil { p.headSectionC <- nil self.bp.activateChain(entry.section, p, nil) @@ -230,7 +264,8 @@ func (self *peers) addPeer( } best = true } else { - currentTD := common.Big0 + // baseline is our own TD + currentTD := self.bp.getTD() if self.best != nil { currentTD = self.best.td } @@ -238,7 +273,7 @@ func (self *peers) addPeer( self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> promoted best peer", id) + plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) self.bp.switchPeer(self.best, p) self.best = p best = true @@ -258,13 +293,13 @@ func (self *peers) removePeer(id string) { } delete(self.peers, id) - plog.Debugf("addPeer: remove peer <%v>", id) + plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td) // if current best peer is removed, need to find a better one if self.best == p { var newp *peer - // FIXME: own TD - max := common.Big0 + // only peers that are ahead of us are considered + max := self.bp.getTD() // peer with the highest self-acclaimed TD is chosen for _, pp := range self.peers { if pp.td.Cmp(max) > 0 { @@ -276,7 +311,7 @@ func (self *peers) removePeer(id string) { self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td) + plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) } else { plog.Warnln("addPeer: no suitable peers found") } @@ -289,6 +324,7 @@ func (self *peers) removePeer(id string) { func (self *BlockPool) switchPeer(oldp, newp *peer) { // first quit AddBlockHashes, requestHeadSection and activateChain + // by closing the old peer's switchC channel if oldp != nil { plog.DebugDetailf("<%s> quit peer processes", oldp.id) close(oldp.switchC) @@ -318,15 +354,15 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } var connected = make(map[string]*section) - var sections [][]byte + var sections []common.Hash for _, hash := range newp.sections { plog.DebugDetailf("activate chain starting from section [%s]", hex(hash)) // if section not connected (ie, top of a contiguous sequence of sections) - if connected[string(hash)] == nil { + if connected[hash.Str()] == nil { // if not deleted, then reread from pool (it can be orphaned top half of a split section) if entry := self.get(hash); entry != nil { self.activateChain(entry.section, newp, connected) - connected[string(hash)] = entry.section + connected[hash.Str()] = entry.section sections = append(sections, hash) } } @@ -341,11 +377,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // newp activating section process changes the quit channel for this reason if oldp != nil { plog.DebugDetailf("<%s> quit section processes", oldp.id) - // close(oldp.idleC) } } +// getPeer looks up peer by id, returns peer and a bool value +// that is true iff peer is current best peer func (self *peers) getPeer(id string) (p *peer, best bool) { self.lock.RLock() defer self.lock.RUnlock() @@ -356,6 +393,8 @@ func (self *peers) getPeer(id string) (p *peer, best bool) { return } +// head section process + func (self *peer) handleSection(sec *section) { self.lock.Lock() defer self.lock.Unlock() @@ -396,7 +435,7 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) { plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) } else { plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) - self.requestBlocks([][]byte{self.currentBlockHash}) + self.requestBlocks([]common.Hash{self.currentBlockHash}) self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) return } @@ -427,9 +466,14 @@ func (self *peer) getBlockHashes() { self.addError(ErrInvalidBlock, "%v", err) self.bp.status.badPeers[self.id]++ } else { - headKey := string(self.parentHash) + if self.currentBlock.Td != nil { + if self.td.Cmp(self.currentBlock.Td) != 0 { + self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) + } + } + headKey := self.parentHash.Str() height := self.bp.status.chain[headKey] + 1 - self.bp.status.chain[string(self.currentBlockHash)] = height + self.bp.status.chain[self.currentBlockHash.Str()] = height if height > self.bp.status.values.LongestChain { self.bp.status.values.LongestChain = height } @@ -446,6 +490,7 @@ func (self *peer) getBlockHashes() { block: self.currentBlock, hashBy: self.id, blockBy: self.id, + td: self.td, } self.bp.newSection([]*node{n}).activate(self) } else { @@ -486,7 +531,7 @@ func (self *peer) run() { LOOP: for { select { - // to minitor section process behaviou + // to minitor section process behaviour case <-ping.C: plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) @@ -539,7 +584,7 @@ LOOP: // quit case <-quit: - self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, self.currentBlockHash)) + self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash))) self.bp.status.lock.Lock() self.bp.status.badPeers[self.id]++ |