package blockpool import ( "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) /* section is the worker on each chain section in the block pool - remove the section if there are blocks missing after an absolute time - remove the section if there are maxIdleRounds of idle rounds of block requests with no response - periodically polls the chain section for missing blocks which are then requested from peers - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they removed (inserted in blockchain, invalid or expired) - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking - when turned back on it recursively calls itself on the root of the next chain section */ type section struct { lock sync.RWMutex parent *section // connecting section back in time towards blockchain child *section // connecting section forward in time top *node // the topmost node = head node = youngest node within the chain section bottom *node // the bottom node = root node = oldest node within the chain section nodes []*node peer *peer parentHash common.Hash blockHashes []common.Hash poolRootIndex int bp *BlockPool controlC chan *peer // to (de)register the current best peer poolRootC chan *peer // indicate connectedness to blockchain (well, known blocks) offC chan bool // closed if process terminated suicideC chan bool // initiate suicide on the section quitInitC chan bool // to signal end of initialisation forkC chan chan bool // freeze section process while splitting switchC chan bool // switching idleC chan bool // channel to indicate thai food processC chan *node // missingC chan *node // blocksRequestTimer <-chan time.Time blockHashesRequestTimer <-chan time.Time suicideTimer <-chan time.Time blocksRequests int blockHashesRequests int blocksRequestsComplete bool blockHashesRequestsComplete bool ready bool same bool initialised bool active bool step int idle int missing int lastMissing int depth int invalid bool poolRoot bool } // func (self *BlockPool) newSection(nodes []*node) *section { sec := §ion{ bottom: nodes[len(nodes)-1], top: nodes[0], nodes: nodes, poolRootIndex: len(nodes), bp: self, controlC: make(chan *peer), poolRootC: make(chan *peer), offC: make(chan bool), } for i, n := range nodes { entry := &entry{node: n, section: sec, index: &index{i}} self.set(n.hash, entry) } glog.V(logger.Detail).Infof("[%s] setup section process", sectionhex(sec)) go sec.run() return sec } func (self *section) addSectionToBlockChain(p *peer) { self.bp.wg.Add(1) go func() { self.lock.Lock() defer self.lock.Unlock() defer func() { self.bp.wg.Done() }() var nodes []*node var n *node var keys []common.Hash var blocks []*types.Block for self.poolRootIndex > 0 { n = self.nodes[self.poolRootIndex-1] n.lock.RLock() block := n.block n.lock.RUnlock() if block == nil { break } self.poolRootIndex-- keys = append(keys, n.hash) blocks = append(blocks, block) nodes = append(nodes, n) } if len(blocks) == 0 { return } self.bp.lock.Lock() for _, key := range keys { delete(self.bp.pool, key) } self.bp.lock.Unlock() glog.V(logger.Debug).Infof("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash())) err := self.bp.insertChain(blocks) if err != nil { self.invalid = true self.bp.peers.peerError(n.blockBy, ErrInvalidBlock, "%v", err) glog.V(logger.Error).Infof("invalid block %x", n.hash) glog.V(logger.Error).Infof("penalise peers %v (hash), %v (block)", n.hashBy, n.blockBy) // or invalid block and the entire chain needs to be removed self.removeChain() } else { // check tds self.bp.wg.Add(1) go func() { self.bp.checkTD(nodes...) self.bp.wg.Done() }() // if all blocks inserted in this section // then need to try to insert blocks in child section if self.poolRootIndex == 0 { // if there is a child section, then recursively call itself: // also if section process is not terminated, // then signal blockchain connectivity with poolRootC if child := self.bp.getChild(self); child != nil { select { case <-child.offC: glog.V(logger.Detail).Infof("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) case child.poolRootC <- p: glog.V(logger.Detail).Infof("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) } child.addSectionToBlockChain(p) } else { glog.V(logger.Detail).Infof("[%s] no child section in pool", sectionhex(self)) } glog.V(logger.Detail).Infof("[%s] section completely inserted to blockchain - remove", sectionhex(self)) // complete sections are removed. if called from within section process, // this must run in its own go routine to avoid deadlock self.remove() } } self.bp.status.lock.Lock() if err == nil { headKey := blocks[0].ParentHash() height := self.bp.status.chain[headKey] + len(blocks) self.bp.status.chain[blocks[len(blocks)-1].Hash()] = height if height > self.bp.status.values.LongestChain { self.bp.status.values.LongestChain = height } delete(self.bp.status.chain, headKey) } self.bp.status.values.BlocksInChain += len(blocks) self.bp.status.values.BlocksInPool -= len(blocks) if err != nil { self.bp.status.badPeers[n.blockBy]++ } self.bp.status.lock.Unlock() }() } func (self *section) run() { // absolute time after which sub-chain is killed if not complete (some blocks are missing) self.suicideC = make(chan bool) self.forkC = make(chan chan bool) self.suicideTimer = time.After(self.bp.Config.BlocksTimeout) // node channels for the section // container for missing block hashes var checking bool var ping = time.NewTicker(5 * time.Second) LOOP: for !self.blockHashesRequestsComplete || !self.blocksRequestsComplete { select { case <-ping.C: var name = "no peer" if self.peer != nil { name = self.peer.id } glog.V(logger.Detail).Infof("[%s] peer <%s> active: %v", sectionhex(self), name, self.active) // global quit from blockpool case <-self.bp.quit: break LOOP // pause for peer switching case <-self.switchC: self.switchC = nil case p := <-self.poolRootC: // signal on pool root channel indicates that the blockpool is // connected to the blockchain, insert the longest chain of blocks // ignored in idle mode to avoid inserting chain sections of non-live peers self.poolRoot = true // switch off hash requests in case they were on self.blockHashesRequestTimer = nil self.blockHashesRequestsComplete = true self.switchOn(p) // peer quit or demoted, put section in idle mode case <-self.idleC: // peer quit or demoted, put section in idle mode glog.V(logger.Debug).Infof("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id) self.switchOff() self.idleC = nil // timebomb - if section is not complete in time, nuke the entire chain case <-self.suicideTimer: self.removeChain() glog.V(logger.Debug).Infof("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.suicideTimer = nil break LOOP // closing suicideC triggers section suicide: removes section nodes from pool and terminates section process case <-self.suicideC: glog.V(logger.Detail).Infof("[%s] quit", sectionhex(self)) break LOOP // alarm for checking blocks in the section case <-self.blocksRequestTimer: glog.V(logger.Detail).Infof("[%s] alarm: block request time", sectionhex(self)) self.processC = self.missingC // alarm for checking parent of the section or sending out hash requests case <-self.blockHashesRequestTimer: glog.V(logger.Detail).Infof("[%s] alarm: hash request time", sectionhex(self)) self.blockHashesRequest() // activate this section process with a peer case p := <-self.controlC: if p == nil { self.switchOff() } else { self.switchOn(p) } self.bp.wg.Done() // blocks the process until section is split at the fork case waiter := <-self.forkC: <-waiter self.initialised = false self.quitInitC = nil // case n, ok := <-self.processC: // channel closed, first iteration finished if !ok && !self.initialised { glog.V(logger.Detail).Infof("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) self.initialised = true self.processC = nil self.checkRound() checking = false break } if !checking { self.step = 0 self.missing = 0 checking = true } self.step++ n.lock.RLock() block := n.block n.lock.RUnlock() // if node has no block, request it (buffer it for batch request) // feed it to missingC channel for the next round if block == nil { pos := self.missing % self.bp.Config.BlockBatchSize if pos == 0 { if self.missing != 0 { self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:]) } self.blockHashes = self.bp.getHashSlice() } self.blockHashes[pos] = n.hash self.missing++ self.missingC <- n } else { // checking for parent block if self.poolRoot { // if node has got block (received via async AddBlock call from protocol) if self.step == self.lastMissing { // current root of the pool glog.V(logger.Detail).Infof("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash)) self.addSectionToBlockChain(self.peer) } } else { if (self.parentHash == common.Hash{}) && n == self.bottom { self.parentHash = block.ParentHash() glog.V(logger.Detail).Infof("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash)) self.blockHashesRequest() } } } if self.initialised && self.step == self.lastMissing { glog.V(logger.Detail).Infof("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.checkRound() checking = false } } // select } // for close(self.offC) if self.peer != nil { self.active = false self.bp.wg.Done() } glog.V(logger.Detail).Infof("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests) } func (self *section) switchOn(newpeer *peer) { oldpeer := self.peer // reset switchC/switchC to current best peer self.idleC = newpeer.idleC self.switchC = newpeer.switchC self.peer = newpeer if oldpeer != newpeer { oldp := "no peer" newp := "no peer" if oldpeer != nil { oldp = oldpeer.id } if newpeer != nil { newp = newpeer.id } glog.V(logger.Detail).Infof("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp) } // activate section with current peer if oldpeer == nil { self.bp.wg.Add(1) self.active = true if !self.blockHashesRequestsComplete { self.blockHashesRequestTimer = time.After(0) } if !self.blocksRequestsComplete { if !self.initialised { if self.quitInitC != nil { <-self.quitInitC } self.missingC = make(chan *node, self.bp.Config.BlockHashesBatchSize) self.processC = make(chan *node, self.bp.Config.BlockHashesBatchSize) self.quitInitC = make(chan bool) self.step = 0 self.missing = 0 self.depth = len(self.nodes) self.lastMissing = self.depth self.feedNodes() } else { self.blocksRequestTimer = time.After(0) } } } } // put the section to idle mode func (self *section) switchOff() { // active -> idle if self.peer != nil { oldp := "no peer" oldpeer := self.peer if oldpeer != nil { oldp = oldpeer.id } glog.V(logger.Detail).Infof("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth) self.active = false self.peer = nil // turn off timers self.blocksRequestTimer = nil self.blockHashesRequestTimer = nil if self.quitInitC != nil { <-self.quitInitC self.quitInitC = nil } self.processC = nil self.bp.wg.Done() } } // iterates through nodes of a section to feed processC // used to initialise chain section func (self *section) feedNodes() { // if not run at least once fully, launch iterator self.bp.wg.Add(1) go func() { self.lock.Lock() defer self.lock.Unlock() defer func() { self.bp.wg.Done() }() var n *node INIT: for _, n = range self.nodes { select { case self.processC <- n: case <-self.bp.quit: break INIT } } close(self.processC) close(self.quitInitC) }() } func (self *section) blockHashesRequest() { if self.switchC != nil { self.bp.chainLock.Lock() parentSection := self.parent if parentSection == nil { // only link to new parent if not switching peers if (self.parentHash != common.Hash{}) { if parent := self.bp.get(self.parentHash); parent != nil { parentSection = parent.section glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection)) link(parentSection, self) } else { if self.bp.hasBlock(self.parentHash) { self.poolRoot = true glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self)) self.addSectionToBlockChain(self.peer) self.blockHashesRequestTimer = nil self.blockHashesRequestsComplete = true } } } } self.bp.chainLock.Unlock() if !self.poolRoot { if parentSection != nil { // activate parent section with this peer // but only if not during switch mode glog.V(logger.Detail).Infof("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection)) self.bp.activateChain(parentSection, self.peer, self.peer.switchC, nil) // if not root of chain, switch off glog.V(logger.Detail).Infof("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests) self.blockHashesRequestTimer = nil self.blockHashesRequestsComplete = true } else { self.blockHashesRequests++ glog.V(logger.Detail).Infof("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests) self.peer.requestBlockHashes(self.bottom.hash) self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval) } } } } // checks number of missing blocks after each round of request and acts accordingly func (self *section) checkRound() { if self.missing == 0 { // no missing blocks glog.V(logger.Detail).Infof("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.blocksRequestsComplete = true self.blocksRequestTimer = nil } else { // some missing blocks glog.V(logger.Detail).Infof("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) self.blocksRequests++ pos := self.missing % self.bp.Config.BlockBatchSize if pos == 0 { pos = self.bp.Config.BlockBatchSize } self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:pos]) // handle idle rounds if self.missing == self.lastMissing { // idle round if self.same { // more than once self.idle++ // too many idle rounds if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds { glog.V(logger.Detail).Infof("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth) self.removeChain() } } else { self.idle = 0 } self.same = true } else { self.same = false } self.lastMissing = self.missing // put processC offline self.processC = nil self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) } } /* link connects two sections via parent/child fields creating a doubly linked list caller must hold BlockPool chainLock */ func link(parent *section, child *section) { if parent != nil { exChild := parent.child parent.child = child if exChild != nil && exChild != child { if child != nil { // if child is nil it is not a real fork glog.V(logger.Detail).Infof("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child)) } exChild.parent = nil } } if child != nil { exParent := child.parent if exParent != nil && exParent != parent { if parent != nil { // if parent is nil it is not a real fork, but suicide delinking section glog.V(logger.Detail).Infof("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent)) } exParent.child = nil } child.parent = parent } } /* handle forks where connecting node is mid-section by splitting section at fork no splitting needed if connecting node is head of a section caller must hold chain lock */ func (self *BlockPool) splitSection(parent *section, entry *entry) { glog.V(logger.Detail).Infof("[%s] split section at fork", sectionhex(parent)) parent.deactivate() waiter := make(chan bool) parent.wait(waiter) chain := parent.nodes parent.nodes = chain[entry.index.int:] parent.top = parent.nodes[0] parent.poolRootIndex -= entry.index.int orphan := self.newSection(chain[0:entry.index.int]) link(orphan, parent.child) close(waiter) orphan.deactivate() } func (self *section) wait(waiter chan bool) { self.forkC <- waiter } func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec *section) { // if new section is created, link it to parent/child sections // and launch section process fetching block and further hashes if len(nodes) > 0 { sec = self.newSection(nodes) glog.V(logger.Debug).Infof("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child)) link(parent, sec) link(sec, child) } else { if parent != nil && child != nil { // now this can only happen if we allow response to hash request to include hash // in this case we just link parent and child (without needing root block of child section) glog.V(logger.Debug).Infof("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child)) link(parent, child) } } return } func (self *section) activate(p *peer) { self.bp.wg.Add(1) select { case <-self.offC: glog.V(logger.Detail).Infof("[%s] completed section process. cannot activate for peer <%s>", sectionhex(self), p.id) self.bp.wg.Done() case self.controlC <- p: glog.V(logger.Detail).Infof("[%s] activate section process for peer <%s>", sectionhex(self), p.id) } } func (self *section) deactivate() { self.bp.wg.Add(1) self.controlC <- nil } // removes this section exacly func (self *section) remove() { select { case <-self.offC: close(self.suicideC) glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self)) case <-self.suicideC: glog.V(logger.Detail).Infof("[%s] remove: suicided already", sectionhex(self)) default: glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self)) close(self.suicideC) } self.unlink() self.bp.remove(self) glog.V(logger.Detail).Infof("[%s] removed section.", sectionhex(self)) } // remove a section and all its descendents from the pool func (self *section) removeChain() { // need to get the child before removeSection delinks the section self.bp.chainLock.RLock() child := self.child self.bp.chainLock.RUnlock() glog.V(logger.Detail).Infof("[%s] remove chain", sectionhex(self)) self.remove() if child != nil { child.removeChain() } } // unlink a section from its parent/child func (self *section) unlink() { // first delink from child and parent under chainlock self.bp.chainLock.Lock() link(nil, self) link(self, nil) self.bp.chainLock.Unlock() }