diff options
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r-- | blockpool/peers.go | 639 |
1 files changed, 0 insertions, 639 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go deleted file mode 100644 index eb2ec6a1f..000000000 --- a/blockpool/peers.go +++ /dev/null @@ -1,639 +0,0 @@ -package blockpool - -import ( - "math/big" - "math/rand" - "sort" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/errs" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" -) - -// the blockpool's model of a peer -type peer struct { - lock sync.RWMutex - - // last known blockchain status - td *big.Int - tdAdvertised bool - currentBlockHash common.Hash - currentBlock *types.Block - parentHash common.Hash - headSection *section - - id string - - // peer callbacks - requestBlockHashes func(common.Hash) error - requestBlocks func([]common.Hash) error - peerError func(*errs.Error) - errors *errs.Errors - - sections []common.Hash - - // channels to push new head block and head section for peer a - currentBlockC chan *types.Block - headSectionC chan *section - - // channels to signal peer switch and peer quit to section processes - idleC chan bool - switchC chan bool - - bp *BlockPool - - // timers for head section process - blockHashesRequestTimer <-chan time.Time - blocksRequestTimer <-chan time.Time - headInfoTimer <-chan time.Time - bestIdleTimer <-chan time.Time - - addToBlacklist func(id string) - - idle bool -} - -// peers is the component keeping a record of peers in a hashmap -// -type peers struct { - lock sync.RWMutex - bllock sync.Mutex - - bp *BlockPool - errors *errs.Errors - peers map[string]*peer - best *peer - status *status - blacklist map[string]time.Time -} - -// peer constructor -func (self *peers) newPeer( - td *big.Int, - currentBlockHash common.Hash, - id string, - requestBlockHashes func(common.Hash) error, - requestBlocks func([]common.Hash) error, - peerError func(*errs.Error), -) (p *peer) { - - p = &peer{ - errors: self.errors, - td: td, - currentBlockHash: currentBlockHash, - id: id, - requestBlockHashes: requestBlockHashes, - requestBlocks: requestBlocks, - peerError: peerError, - currentBlockC: make(chan *types.Block), - headSectionC: make(chan *section), - switchC: make(chan bool), - bp: self.bp, - idle: true, - addToBlacklist: self.addToBlacklist, - } - close(p.switchC) //! hack :(((( - // at creation the peer is recorded in the peer pool - self.peers[id] = p - return -} - -// dispatches an error to a peer if still connected, adds it to the blacklist -func (self *peers) peerError(id string, code int, format string, params ...interface{}) { - self.lock.RLock() - peer, ok := self.peers[id] - self.lock.RUnlock() - if ok { - peer.addError(code, format, params...) - } else { - self.addToBlacklist(id) - } -} - -// record time of offence in blacklist to implement suspension for PeerSuspensionInterval -func (self *peers) addToBlacklist(id string) { - self.bllock.Lock() - defer self.bllock.Unlock() - self.blacklist[id] = time.Now() -} - -// suspended checks if peer is still suspended, caller should hold peers.lock -func (self *peers) suspended(id string) (s bool) { - self.bllock.Lock() - defer self.bllock.Unlock() - if suspendedAt, ok := self.blacklist[id]; ok { - if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s { - // no longer suspended, delete entry - delete(self.blacklist, id) - } - } - return -} - -func (self *peer) addError(code int, format string, params ...interface{}) { - err := self.errors.New(code, format, params...) - self.peerError(err) - if err.Fatal() { - self.addToBlacklist(self.id) - } else { - go self.bp.peers.removePeer(self.id, false) - } -} - -// caller must hold peer lock -func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { - self.lock.Lock() - defer self.lock.Unlock() - if self.currentBlockHash != currentBlockHash { - previousBlockHash := self.currentBlockHash - glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash)) - - self.td = td - self.currentBlockHash = currentBlockHash - self.currentBlock = nil - self.parentHash = common.Hash{} - self.headSection = nil - } - self.tdAdvertised = true -} - -func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) { - hash := block.Hash() - // this happens when block came in a newblock message but - // also if sent in a blockmsg (for instance, if we requested, only if we - // dont apply on blockrequests the restriction of flood control) - currentBlockHash = self.currentBlockHash - if currentBlockHash == hash { - if self.currentBlock == nil { - // signal to head section process - glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) - td = self.td - } else { - glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) - } - } - return -} - -// this will use the TD given by the first peer to update peer td, this helps second best peer selection -func (self *peer) setChainInfoFromNode(n *node) { - // in case best peer is lost - block := n.block - hash := block.Hash() - if n.td != nil && n.td.Cmp(self.td) > 0 { - glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) - self.td = n.td - self.currentBlockHash = block.Hash() - self.parentHash = block.ParentHash() - self.currentBlock = block - self.headSection = nil - } -} - -// distribute block request among known peers -func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { - self.lock.RLock() - - defer self.lock.RUnlock() - peerCount := len(self.peers) - // on first attempt use the best peer - if attempts == 0 && self.best != nil { - glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id) - self.best.requestBlocks(hashes) - return - } - repetitions := self.bp.Config.BlocksRequestRepetition - if repetitions > peerCount { - repetitions = peerCount - } - i := 0 - indexes := rand.Perm(peerCount)[0:repetitions] - sort.Ints(indexes) - - glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) - for _, peer := range self.peers { - if i == indexes[0] { - glog.V(logger.Detail).Infof("request length: %v", len(hashes)) - glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id) - peer.requestBlocks(hashes) - indexes = indexes[1:] - if len(indexes) == 0 { - break - } - } - i++ - } - self.bp.putHashSlice(hashes) -} - -// addPeer implements the logic for blockpool.AddPeer -// returns 2 bool values -// 1. true iff peer is promoted as best peer in the pool -// 2. true iff peer is still suspended -func (self *peers) addPeer( - td *big.Int, - currentBlockHash common.Hash, - id string, - requestBlockHashes func(common.Hash) error, - requestBlocks func([]common.Hash) error, - peerError func(*errs.Error), -) (best bool, suspended bool) { - - self.lock.Lock() - defer self.lock.Unlock() - var previousBlockHash common.Hash - if self.suspended(id) { - suspended = true - return - } - p, found := self.peers[id] - if found { - // when called on an already connected peer, it means a newBlockMsg is received - // peer head info is updated - p.setChainInfo(td, currentBlockHash) - self.status.lock.Lock() - self.status.values.NewBlocks++ - self.status.lock.Unlock() - } else { - p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError) - - self.status.lock.Lock() - - self.status.peers[id]++ - self.status.values.NewBlocks++ - self.status.lock.Unlock() - - glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash)) - } - - // check if peer's current head block is known - if self.bp.hasBlock(currentBlockHash) { - // peer not ahead - glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) - return false, false - } - - if self.best == p { - // new block update for active current best peer -> request hashes - glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) - - if (previousBlockHash != common.Hash{}) { - glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash)) - p.headSectionC <- nil - if entry := self.bp.get(previousBlockHash); entry != nil { - glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) - self.bp.activateChain(entry.section, p, p.switchC, nil) - p.sections = append(p.sections, previousBlockHash) - } - } - best = true - } else { - // baseline is our own TD - currentTD := self.bp.getTD() - bestpeer := self.best - if bestpeer != nil { - bestpeer.lock.RLock() - defer bestpeer.lock.RUnlock() - currentTD = self.best.td - } - if td.Cmp(currentTD) > 0 { - self.status.lock.Lock() - self.status.bestPeers[p.id]++ - self.status.lock.Unlock() - glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) - // fmt.Printf("best peer %v - \n", bestpeer, id) - self.bp.switchPeer(bestpeer, p) - self.best = p - best = true - } - } - - return -} - -// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects -func (self *peers) removePeer(id string, del bool) { - self.lock.Lock() - defer self.lock.Unlock() - - p, found := self.peers[id] - if !found { - return - } - p.lock.Lock() - defer p.lock.Unlock() - - if del { - delete(self.peers, id) - glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td) - } - // if current best peer is removed, need to find a better one - if self.best == p { - var newp *peer - // only peers that are ahead of us are considered - max := self.bp.getTD() - // peer with the highest self-acclaimed TD is chosen - for _, pp := range self.peers { - // demoted peer's td should be 0 - if pp.id == id { - pp.td = common.Big0 - pp.currentBlockHash = common.Hash{} - continue - } - pp.lock.RLock() - if pp.td.Cmp(max) > 0 { - max = pp.td - newp = pp - } - pp.lock.RUnlock() - } - if newp != nil { - self.status.lock.Lock() - self.status.bestPeers[p.id]++ - self.status.lock.Unlock() - glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) - } else { - glog.V(logger.Warn).Infof("addPeer: no suitable peers found") - } - self.best = newp - // fmt.Printf("remove peer %v - %v\n", p.id, newp) - self.bp.switchPeer(p, newp) - } -} - -// switchPeer launches section processes -func (self *BlockPool) switchPeer(oldp, newp *peer) { - - // first quit AddBlockHashes, requestHeadSection and activateChain - // by closing the old peer's switchC channel - if oldp != nil { - glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id) - // fmt.Printf("close %v - %v\n", oldp.id, newp) - close(oldp.switchC) - } - if newp != nil { - // if new best peer has no head section yet, create it and run it - // otherwise head section is an element of peer.sections - newp.idleC = make(chan bool) - newp.switchC = make(chan bool) - if newp.headSection == nil { - glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash)) - - if newp.idle { - self.wg.Add(1) - newp.idle = false - self.syncing() - } - - go func() { - newp.run() - if !newp.idle { - self.wg.Done() - newp.idle = true - } - }() - - } - - var connected = make(map[common.Hash]*section) - var sections []common.Hash - for _, hash := range newp.sections { - glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash)) - // if section not connected (ie, top of a contiguous sequence of sections) - if connected[hash] == nil { - // if not deleted, then reread from pool (it can be orphaned top half of a split section) - if entry := self.get(hash); entry != nil { - self.activateChain(entry.section, newp, newp.switchC, connected) - connected[hash] = entry.section - sections = append(sections, hash) - } - } - } - glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) - // need to lock now that newp is exposed to section processesr - newp.lock.Lock() - newp.sections = sections - newp.lock.Unlock() - } - // finally deactivate section process for sections where newp didnt activate - // newp activating section process changes the quit channel for this reason - if oldp != nil { - glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id) - close(oldp.idleC) - } -} - -// getPeer looks up peer by id, returns peer and a bool value -// that is true iff peer is current best peer -func (self *peers) getPeer(id string) (p *peer, best bool) { - self.lock.RLock() - defer self.lock.RUnlock() - if self.best != nil && self.best.id == id { - return self.best, true - } - p = self.peers[id] - return -} - -// head section process - -func (self *peer) handleSection(sec *section) { - self.lock.Lock() - defer self.lock.Unlock() - glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec)) - - self.headSection = sec - self.blockHashesRequestTimer = nil - - if sec == nil { - if self.idle { - self.idle = false - self.bp.wg.Add(1) - self.bp.syncing() - } - - self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) - self.bestIdleTimer = nil - - glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) - } else { - if !self.idle { - self.idle = true - self.bp.wg.Done() - } - - self.headInfoTimer = nil - self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout) - glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec)) - } -} - -func (self *peer) getCurrentBlock(currentBlock *types.Block) { - // called by update or after AddBlock signals that head block of current peer is received - self.lock.Lock() - defer self.lock.Unlock() - if currentBlock == nil { - if entry := self.bp.get(self.currentBlockHash); entry != nil { - entry.node.lock.Lock() - currentBlock = entry.node.block - entry.node.lock.Unlock() - } - if currentBlock != nil { - glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) - } else { - glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) - self.requestBlocks([]common.Hash{self.currentBlockHash}) - self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) - return - } - } else { - glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash())) - } - - self.currentBlock = currentBlock - self.parentHash = currentBlock.ParentHash() - glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash)) - self.blockHashesRequestTimer = time.After(0) - self.blocksRequestTimer = nil -} - -func (self *peer) getBlockHashes() bool { - self.lock.Lock() - defer self.lock.Unlock() - //if connecting parent is found - if self.bp.hasBlock(self.parentHash) { - glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) - err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock})) - - self.bp.status.lock.Lock() - self.bp.status.values.BlocksInChain++ - self.bp.status.values.BlocksInPool-- - if err != nil { - self.addError(ErrInvalidBlock, "%v", err) - self.bp.status.badPeers[self.id]++ - } else { - // XXX added currentBlock check (?) - if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() { - glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) - if self.td.Cmp(self.currentBlock.Td) != 0 { - self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td) - self.bp.status.badPeers[self.id]++ - } - } - - headKey := self.parentHash - height := self.bp.status.chain[headKey] + 1 - self.bp.status.chain[self.currentBlockHash] = height - if height > self.bp.status.values.LongestChain { - self.bp.status.values.LongestChain = height - } - delete(self.bp.status.chain, headKey) - } - self.bp.status.lock.Unlock() - } else { - if parent := self.bp.get(self.parentHash); parent != nil { - if self.bp.get(self.currentBlockHash) == nil { - glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash)) - self.bp.nodeCacheLock.Lock() - n, ok := self.bp.nodeCache[self.currentBlockHash] - if !ok { - panic("not found in nodeCache") - } - self.bp.nodeCacheLock.Unlock() - self.bp.newSection([]*node{n}).activate(self) - } else { - glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) - self.bp.activateChain(parent.section, self, self.switchC, nil) - } - } else { - glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) - self.requestBlockHashes(self.currentBlockHash) - self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval) - return false - } - } - self.blockHashesRequestTimer = nil - if !self.idle { - self.idle = true - self.headInfoTimer = nil - self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout) - self.bp.wg.Done() - } - return true -} - -// main loop for head section process -func (self *peer) run() { - - self.blocksRequestTimer = time.After(0) - self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) - self.bestIdleTimer = nil - - var ping = time.NewTicker(5 * time.Second) - -LOOP: - for { - select { - // to minitor section process behaviour - case <-ping.C: - glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) - - // signal from AddBlockHashes that head section for current best peer is created - // if sec == nil, it signals that chain info has updated (new block message) - case sec := <-self.headSectionC: - self.handleSection(sec) - - // periodic check for block hashes or parent block/section - case <-self.blockHashesRequestTimer: - self.getBlockHashes() - - // signal from AddBlock that head block of current best peer has been received - case currentBlock := <-self.currentBlockC: - self.getCurrentBlock(currentBlock) - - // keep requesting until found or timed out - case <-self.blocksRequestTimer: - self.getCurrentBlock(nil) - - // quitting on timeout - case <-self.headInfoTimer: - self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block (td: %v, head: %s)", self.td, hex(self.currentBlockHash))) - - self.bp.status.lock.Lock() - self.bp.status.badPeers[self.id]++ - self.bp.status.lock.Unlock() - // there is no persistence here, so GC will just take care of cleaning up - - // signal for peer switch, quit - case <-self.switchC: - var complete = "incomplete " - if self.idle { - complete = "complete" - } - glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete) - break LOOP - - // global quit for blockpool - case <-self.bp.quit: - break LOOP - - // best - case <-self.bestIdleTimer: - self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash))) - - self.bp.status.lock.Lock() - self.bp.status.badPeers[self.id]++ - self.bp.status.lock.Unlock() - glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) - } - } - - if !self.idle { - self.idle = true - self.bp.wg.Done() - } -} |