From 422490d75cf9a2406430f2d7c0d7dd77ede18f7c Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 25 Feb 2015 19:34:12 +0700 Subject: major rewrite, reorg of blockpool + new features - blockpool moves to its own package - uses errs pkg for its own coded errors - publicly settable config of params (time intervals and batchsizes) - test helpers in subpackage - optional TD in blocks used now to update peers chain info - major improvement in algorithm - fix fragility and sync/parallelisation bugs - implement status for reporting on sync status (peers/hashes/blocks etc) - several tests added and further corner cases covered --- blockpool/blockpool.go | 749 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 749 insertions(+) create mode 100644 blockpool/blockpool.go (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go new file mode 100644 index 000000000..0126d734c --- /dev/null +++ b/blockpool/blockpool.go @@ -0,0 +1,749 @@ +package blockpool + +import ( + "bytes" + "fmt" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/errs" + ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/pow" +) + +var plog = ethlogger.NewLogger("Blockpool") + +var ( + // max number of block hashes sent in one request + blockHashesBatchSize = 512 + // max number of blocks sent in one request + blockBatchSize = 64 + // interval between two consecutive block checks (and requests) + blocksRequestInterval = 3 * time.Second + // level of redundancy in block requests sent + blocksRequestRepetition = 1 + // interval between two consecutive block hash checks (and requests) + blockHashesRequestInterval = 3 * time.Second + // max number of idle iterations, ie., check through a section without new blocks coming in + blocksRequestMaxIdleRounds = 100 + // timeout interval: max time allowed for peer without sending a block hash + blockHashesTimeout = 60 * time.Second + // timeout interval: max time allowed for peer without sending a block + blocksTimeout = 120 * time.Second +) + +// config embedded in components, by default fall back to constants +// by default all resolved to local +type Config struct { + BlockHashesBatchSize int + BlockBatchSize int + BlocksRequestRepetition int + BlocksRequestMaxIdleRounds int + BlockHashesRequestInterval time.Duration + BlocksRequestInterval time.Duration + BlockHashesTimeout time.Duration + BlocksTimeout time.Duration +} + +// blockpool errors +const ( + ErrInvalidBlock = iota + ErrInvalidPoW + ErrUnrequestedBlock + ErrInsufficientChainInfo +) + +var errorToString = map[int]string{ + ErrInvalidBlock: "Invalid block", + ErrInvalidPoW: "Invalid PoW", + ErrUnrequestedBlock: "Unrequested block", + ErrInsufficientChainInfo: "Insufficient chain info", +} + +// init initialises all your laundry +func (self *Config) init() { + if self.BlockHashesBatchSize == 0 { + self.BlockHashesBatchSize = blockHashesBatchSize + } + if self.BlockBatchSize == 0 { + self.BlockBatchSize = blockBatchSize + } + if self.BlocksRequestRepetition == 0 { + self.BlocksRequestRepetition = blocksRequestRepetition + } + if self.BlocksRequestMaxIdleRounds == 0 { + self.BlocksRequestMaxIdleRounds = blocksRequestMaxIdleRounds + } + if self.BlockHashesRequestInterval == 0 { + self.BlockHashesRequestInterval = blockHashesRequestInterval + } + if self.BlocksRequestInterval == 0 { + self.BlocksRequestInterval = blocksRequestInterval + } + if self.BlockHashesTimeout == 0 { + self.BlockHashesTimeout = blockHashesTimeout + } + if self.BlocksTimeout == 0 { + self.BlocksTimeout = blocksTimeout + } +} + +// node is the basic unit of the internal model of block chain/tree in the blockpool +type node struct { + lock sync.RWMutex + hash []byte + block *types.Block + hashBy string + blockBy string + td *big.Int +} + +type index struct { + int +} + +// entry is the struct kept and indexed in the pool +type entry struct { + node *node + section *section + index *index +} + +type BlockPool struct { + Config *Config + + // the minimal interface with blockchain + hasBlock func(hash []byte) bool + insertChain func(types.Blocks) error + verifyPoW func(pow.Block) bool + + pool map[string]*entry + peers *peers + + lock sync.RWMutex + chainLock sync.RWMutex + // alloc-easy pool of hash slices + hashSlicePool chan [][]byte + + status *status + + quit chan bool + wg sync.WaitGroup + running bool +} + +// public constructor +func New( + hasBlock func(hash []byte) bool, + insertChain func(types.Blocks) error, + verifyPoW func(pow.Block) bool, +) *BlockPool { + + return &BlockPool{ + Config: &Config{}, + hasBlock: hasBlock, + insertChain: insertChain, + verifyPoW: verifyPoW, + } +} + +// allows restart +func (self *BlockPool) Start() { + self.lock.Lock() + defer self.lock.Unlock() + + if self.running { + return + } + + self.Config.init() + self.hashSlicePool = make(chan [][]byte, 150) + self.status = newStatus() + self.quit = make(chan bool) + self.pool = make(map[string]*entry) + self.running = true + + self.peers = &peers{ + errors: &errs.Errors{ + Package: "Blockpool", + Errors: errorToString, + }, + peers: make(map[string]*peer), + status: self.status, + bp: self, + } + timer := time.NewTicker(3 * time.Second) + go func() { + for { + select { + case <-self.quit: + return + case <-timer.C: + plog.Debugf("status:\n%v", self.Status()) + } + } + }() + plog.Infoln("Started") +} + +func (self *BlockPool) Stop() { + self.lock.Lock() + if !self.running { + self.lock.Unlock() + return + } + self.running = false + + self.lock.Unlock() + + plog.Infoln("Stopping...") + + close(self.quit) + + self.lock.Lock() + self.peers = nil + self.pool = nil + self.lock.Unlock() + + plog.Infoln("Stopped") +} + +// Wait blocks until active processes finish +func (self *BlockPool) Wait(t time.Duration) { + self.lock.Lock() + if !self.running { + self.lock.Unlock() + return + } + self.lock.Unlock() + + plog.Infoln("Waiting for processes to complete...") + w := make(chan bool) + go func() { + self.wg.Wait() + close(w) + }() + + select { + case <-w: + plog.Infoln("Processes complete") + case <-time.After(t): + plog.Warnf("Timeout") + } +} + +/* +AddPeer is called by the eth protocol instance running on the peer after +the status message has been received with total difficulty and current block hash +Called a second time with the same peer id, it is used to update chain info for a peer. This is used when a new (mined) block message is received. +RemovePeer needs to be called when the peer disconnects. +Peer info is currently not persisted across disconnects (or sessions) +*/ +func (self *BlockPool) AddPeer( + + td *big.Int, currentBlockHash []byte, + peerId string, + requestBlockHashes func([]byte) error, + requestBlocks func([][]byte) error, + peerError func(*errs.Error), + +) (best bool) { + + return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError) +} + +// RemovePeer needs to be called when the peer disconnects +func (self *BlockPool) RemovePeer(peerId string) { + self.peers.removePeer(peerId) +} + +/* +AddBlockHashes + +Entry point for eth protocol to add block hashes received via BlockHashesMsg + +only hashes from the best peer are handled + +initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer) +launches all block request processes on each chain section + +the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns. +*/ +func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) { + + bestpeer, best := self.peers.getPeer(peerId) + if !best { + return + } + // bestpeer is still the best peer + + self.wg.Add(1) + + defer func() { self.wg.Done() }() + + self.status.lock.Lock() + self.status.activePeers[bestpeer.id]++ + self.status.lock.Unlock() + + var n int + var hash []byte + var ok, headSection, peerswitch bool + var sec, child, parent *section + var entry *entry + var nodes []*node + + hash, ok = next() + bestpeer.lock.Lock() + + plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) + + // first check if we are building the head section of a peer's chain + if bytes.Equal(bestpeer.parentHash, hash) { + if self.hasBlock(bestpeer.currentBlockHash) { + return + } + /* + when peer is promoted in switchPeer, a new header section process is launched + as the head section skeleton is actually created here, it is signaled to the process + so that it can quit + in the special case that the node for parent of the head block is found in the blockpool + (with or without fetched block) + */ + headSection = true + if entry := self.get(bestpeer.currentBlockHash); entry == nil { + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) + // if head block is not yet in the pool, create entry and start node list for section + node := &node{ + hash: bestpeer.currentBlockHash, + block: bestpeer.currentBlock, + hashBy: peerId, + blockBy: peerId, + } + // nodes is a list of nodes in one section ordered top-bottom (old to young) + nodes = append(nodes, node) + n++ + } else { + // otherwise set child section iff found node is the root of a section + // this is a possible scenario when a singleton head section was created + // on an earlier occasion this peer or another with the same block was best peer + if entry.node == entry.section.bottom { + child = entry.section + plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) + } + } + } else { + // otherwise : we are not building the head section of the peer + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + } + // the switch channel signals peerswitch event + switchC := bestpeer.switchC + bestpeer.lock.Unlock() + + // iterate over hashes coming from peer (first round we have hash set above) +LOOP: + for ; ok; hash, ok = next() { + + select { + case <-self.quit: + // global quit for blockpool + return + + case <-switchC: + // if the peer is demoted, no more hashes read + plog.DebugDetailf("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + peerswitch = true + break LOOP + default: + } + + // if we reach the blockchain we stop reading more + if self.hasBlock(hash) { + // check if known block connecting the downloaded chain to our blockchain + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + if len(nodes) == 1 { + // create new section if needed and push it to the blockchain + sec = self.newSection(nodes) + sec.addSectionToBlockChain(bestpeer) + } else { + + /* + not added hash yet but according to peer child section built + earlier chain connects with blockchain + this maybe a potential vulnarability + the root block arrives (or already there but its parenthash was not pointing to known block in the blockchain) + we start inserting -> error -> remove the entire chain + instead of punishing this peer + solution: when switching peers always make sure best peers own head block + and td together with blockBy are recorded on the node + */ + if len(nodes) == 0 && child != nil { + child.addSectionToBlockChain(bestpeer) + } + } + break LOOP + } + + // look up node in the pool + entry = self.get(hash) + if entry != nil { + // reached a known chain in the pool + if entry.node == entry.section.bottom && n == 1 { + /* + the first block hash received is an orphan in the pool + this also supports clients that (despite the spec) include hash in their + response to hashes request. Note that by providing we can link sections + without having to wait for the root block of the child section to arrive, so it allows for superior performance + */ + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + // record the entry's chain section as child section + child = entry.section + continue LOOP + } + // otherwise record entry's chain section as parent connecting it to the pool + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + parent = entry.section + break LOOP + } + + // finally if node for block hash does not exist, create it and append node to section nodes + node := &node{ + hash: hash, + hashBy: peerId, + } + nodes = append(nodes, node) + } //for + + /* + we got here if + - run out of hashes (parent = nil) sent by our best peer + - our peer is demoted (peerswitch = true) + - reached blockchain or blockpool + - quitting + */ + self.chainLock.Lock() + + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) + /* + handle forks where connecting node is mid-section + by splitting section at fork + no splitting needed if connecting node is head of a section + */ + if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 { + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + + self.splitSection(parent, entry) + + self.status.lock.Lock() + self.status.values.Forks++ + self.status.lock.Unlock() + } + + /* + if new section is created, link it to parent/child sections + and launch section process fetching blocks and further hashes + */ + sec = self.linkSections(nodes, parent, child) + + self.status.lock.Lock() + self.status.values.BlockHashes += len(nodes) + self.status.lock.Unlock() + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + + self.chainLock.Unlock() + + /* + if a blockpool node is reached (parent section is not nil), + activate section (unless our peer is demoted by now). + this can be the bottom half of a newly split section in case of a fork. + bestPeer is nil if we got here after our peer got demoted while processing. + in this case no activation should happen + */ + if parent != nil && !peerswitch { + self.activateChain(parent, bestpeer, nil) + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) + } + + /* + if a new section was created, + register section iff head section or no child known + activate it with this peer + */ + if sec != nil { + // switch on section process (it is paused by switchC) + if !peerswitch { + if headSection || child == nil { + bestpeer.lock.Lock() + bestpeer.sections = append(bestpeer.sections, sec.top.hash) + bestpeer.lock.Unlock() + } + /* + request next block hashes for parent section here. + but only once, repeating only when bottom block arrives, + otherwise no way to check if it arrived + */ + bestpeer.requestBlockHashes(sec.bottom.hash) + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + sec.activate(bestpeer) + } else { + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + sec.deactivate() + } + } + + // if we are processing peer's head section, signal it to headSection process that it is created + + if headSection { + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) + + var headSec *section + switch { + case sec != nil: + headSec = sec + case child != nil: + headSec = child + default: + headSec = parent + } + if !peerswitch { + plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec)) + bestpeer.headSectionC <- headSec + } + } +} + +/* + AddBlock is the entry point for the eth protocol to call when blockMsg is received. + + It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error + + At the same time it is opportunistic in that if a requested block may be provided by any peer. + + The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit. +*/ +func (self *BlockPool) AddBlock(block *types.Block, peerId string) { + hash := block.Hash() + + sender, _ := self.peers.getPeer(peerId) + if sender == nil { + return + } + + self.status.lock.Lock() + self.status.activePeers[peerId]++ + self.status.lock.Unlock() + + entry := self.get(hash) + + // a peer's current head block is appearing the first time + if bytes.Equal(hash, sender.currentBlockHash) { + if sender.currentBlock == nil { + plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.setChainInfoFromBlock(block) + // sender.currentBlockC <- block + + self.status.lock.Lock() + self.status.values.Blocks++ + self.status.values.BlocksInPool++ + self.status.lock.Unlock() + } else { + plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) + } + } else { + + plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + + sender.lock.Lock() + // update peer chain info if more recent than what we registered + if block.Td != nil && block.Td.Cmp(sender.td) > 0 { + sender.td = block.Td + sender.currentBlockHash = block.Hash() + sender.parentHash = block.ParentHash() + sender.currentBlock = block + sender.headSection = nil + } + sender.lock.Unlock() + + if entry == nil { + // penalise peer for sending what we have not asked + plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.addError(ErrUnrequestedBlock, "%x", hash) + + self.status.lock.Lock() + self.status.badPeers[peerId]++ + self.status.lock.Unlock() + return + } + } + if entry == nil { + return + } + + node := entry.node + node.lock.Lock() + defer node.lock.Unlock() + + // check if block already present + if node.block != nil { + plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy) + return + } + + // check if block is already inserted in the blockchain + if self.hasBlock(hash) { + plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash)) + return + } + + // validate block for PoW + if !self.verifyPoW(block) { + plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.addError(ErrInvalidPoW, "%x", hash) + + self.status.lock.Lock() + self.status.badPeers[peerId]++ + self.status.lock.Unlock() + + return + } + + node.block = block + node.blockBy = peerId + node.td = block.Td // optional field + + self.status.lock.Lock() + self.status.values.Blocks++ + self.status.values.BlocksInPool++ + self.status.lock.Unlock() + +} + +/* + iterates down a chain section by section + activating section process on incomplete sections with peer + relinking orphaned sections with their parent if root block (and its parent hash) is known) +*/ +func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) { + + p.lock.RLock() + switchC := p.switchC + p.lock.RUnlock() + + var i int + +LOOP: + for sec != nil { + parent := self.getParent(sec) + plog.DebugDetailf("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id) + sec.activate(p) + if i > 0 && connected != nil { + connected[string(sec.top.hash)] = sec + } + /* + we need to relink both complete and incomplete sections + the latter could have been blockHashesRequestsComplete before being delinked from its parent + */ + if parent == nil && sec.bottom.block != nil { + if entry := self.get(sec.bottom.block.ParentHash()); entry != nil { + parent = entry.section + plog.DebugDetailf("activateChain: [%s]-[%s] relink", sectionhex(parent), sectionhex(sec)) + link(parent, sec) + } + } + sec = parent + + // stop if peer got demoted + select { + case <-switchC: + break LOOP + case <-self.quit: + break LOOP + default: + } + } +} + +// must run in separate go routine, otherwise +// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock +func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) { + self.wg.Add(1) + go func() { + self.peers.requestBlocks(attempts, hashes) + self.wg.Done() + }() +} + +// convenience methods to access adjacent sections +func (self *BlockPool) getParent(sec *section) *section { + self.chainLock.RLock() + defer self.chainLock.RUnlock() + return sec.parent +} + +func (self *BlockPool) getChild(sec *section) *section { + self.chainLock.RLock() + defer self.chainLock.RUnlock() + return sec.child +} + +// accessor and setter for entries in the pool +func (self *BlockPool) get(hash []byte) *entry { + self.lock.RLock() + defer self.lock.RUnlock() + return self.pool[string(hash)] +} + +func (self *BlockPool) set(hash []byte, e *entry) { + self.lock.Lock() + defer self.lock.Unlock() + self.pool[string(hash)] = e +} + +func (self *BlockPool) remove(sec *section) { + // delete node entries from pool index under pool lock + self.lock.Lock() + defer self.lock.Unlock() + for _, node := range sec.nodes { + delete(self.pool, string(node.hash)) + } +} + +func (self *BlockPool) getHashSlice() (s [][]byte) { + select { + case s = <-self.hashSlicePool: + default: + s = make([][]byte, self.Config.BlockBatchSize) + } + return +} + +// Return returns a Client to the pool. +func (self *BlockPool) putHashSlice(s [][]byte) { + if len(s) == self.Config.BlockBatchSize { + select { + case self.hashSlicePool <- s: + default: + } + } +} + +// pretty prints hash (byte array) with first 4 bytes in hex +func hex(hash []byte) (name string) { + if hash == nil { + name = "" + } else { + name = fmt.Sprintf("%x", hash[:4]) + } + return +} + +// pretty prints a section using first 4 bytes in hex of bottom and top blockhash of the section +func sectionhex(section *section) (name string) { + if section == nil { + name = "" + } else { + name = fmt.Sprintf("%x-%x", section.bottom.hash[:4], section.top.hash[:4]) + } + return +} -- cgit v1.2.3