diff options
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r-- | blockpool/peers.go | 52 |
1 files changed, 26 insertions, 26 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go index 3f514c9e9..019399038 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -57,7 +57,8 @@ type peer struct { // peers is the component keeping a record of peers in a hashmap // type peers struct { - lock sync.RWMutex + lock sync.RWMutex + bllock sync.Mutex bp *BlockPool errors *errs.Errors @@ -109,15 +110,15 @@ func (self *peers) peerError(id string, code int, format string, params ...inter // record time of offence in blacklist to implement suspension for PeerSuspensionInterval func (self *peers) addToBlacklist(id string) { - self.lock.Lock() - defer self.lock.Unlock() + self.bllock.Lock() + defer self.bllock.Unlock() self.blacklist[id] = time.Now() } -// suspended checks if peer is still suspended +// suspended checks if peer is still suspended, caller should hold peers.lock func (self *peers) suspended(id string) (s bool) { - self.lock.Lock() - defer self.lock.Unlock() + self.bllock.Lock() + defer self.bllock.Unlock() if suspendedAt, ok := self.blacklist[id]; ok { if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s { // no longer suspended, delete entry @@ -142,9 +143,8 @@ func (self *peer) setChainInfo(td *big.Int, c common.Hash) { self.headSection = nil } +// caller must hold peer lock func (self *peer) setChainInfoFromBlock(block *types.Block) { - self.lock.Lock() - defer self.lock.Unlock() // use the optional TD to update peer td, this helps second best peer selection // in case best peer is lost if block.Td != nil && block.Td.Cmp(self.td) > 0 { @@ -155,16 +155,12 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) { self.currentBlock = block self.headSection = nil } - self.bp.wg.Add(1) - go func() { - self.currentBlockC <- block - self.bp.wg.Done() - }() } // distribute block request among known peers func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { self.lock.RLock() + defer self.lock.RUnlock() peerCount := len(self.peers) // on first attempt use the best peer @@ -210,13 +206,14 @@ func (self *peers) addPeer( peerError func(*errs.Error), ) (best bool, suspended bool) { + self.lock.Lock() + defer self.lock.Unlock() + var previousBlockHash common.Hash if self.suspended(id) { suspended = true return } - self.lock.Lock() - defer self.lock.Unlock() p, found := self.peers[id] if found { // when called on an already connected peer, it means a newBlockMsg is received @@ -260,7 +257,7 @@ func (self *peers) addPeer( p.headSectionC <- nil if entry := self.bp.get(previousBlockHash); entry != nil { plog.DebugDetailf("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) - self.bp.activateChain(entry.section, p, nil) + self.bp.activateChain(entry.section, p, p.switchC, nil) p.sections = append(p.sections, previousBlockHash) } } @@ -270,8 +267,8 @@ func (self *peers) addPeer( currentTD := self.bp.getTD() bestpeer := self.best if bestpeer != nil { - bestpeer.lock.Lock() - defer bestpeer.lock.Unlock() + bestpeer.lock.RLock() + defer bestpeer.lock.RUnlock() currentTD = self.best.td } if td.Cmp(currentTD) > 0 { @@ -367,14 +364,14 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { if connected[hash] == nil { // if not deleted, then reread from pool (it can be orphaned top half of a split section) if entry := self.get(hash); entry != nil { - self.activateChain(entry.section, newp, connected) + self.activateChain(entry.section, newp, newp.switchC, connected) connected[hash] = entry.section sections = append(sections, hash) } } } plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) - // need to lock now that newp is exposed to section processes + // need to lock now that newp is exposed to section processesr newp.lock.Lock() newp.sections = sections newp.lock.Unlock() @@ -462,6 +459,8 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) { } func (self *peer) getBlockHashes() bool { + self.lock.Lock() + defer self.lock.Unlock() //if connecting parent is found if self.bp.hasBlock(self.parentHash) { plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) @@ -475,10 +474,11 @@ func (self *peer) getBlockHashes() bool { self.bp.status.badPeers[self.id]++ } else { // XXX added currentBlock check (?) - if self.currentBlock != nil && self.currentBlock.Td != nil { + if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() { + plog.DebugDetailf("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) if self.td.Cmp(self.currentBlock.Td) != 0 { - //self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) - //self.bp.status.badPeers[self.id]++ + self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) + self.bp.status.badPeers[self.id]++ } } headKey := self.parentHash @@ -504,7 +504,7 @@ func (self *peer) getBlockHashes() bool { self.bp.newSection([]*node{n}).activate(self) } else { plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) - self.bp.activateChain(parent.section, self, nil) + self.bp.activateChain(parent.section, self, self.switchC, nil) } } else { plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) @@ -528,6 +528,7 @@ func (self *peer) run() { self.lock.RLock() switchC := self.switchC + plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash)) self.lock.RUnlock() self.blockHashesRequestTimer = nil @@ -570,7 +571,6 @@ LOOP: self.bp.status.badPeers[self.id]++ self.bp.status.lock.Unlock() // there is no persistence here, so GC will just take care of cleaning up - break LOOP // signal for peer switch, quit case <-switchC: @@ -593,9 +593,9 @@ LOOP: self.bp.status.badPeers[self.id]++ self.bp.status.lock.Unlock() plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) - break LOOP } } + if !self.idle { self.idle = true self.bp.wg.Done() |