diff options
Diffstat (limited to 'block_pool.go')
-rw-r--r-- | block_pool.go | 130 |
1 files changed, 85 insertions, 45 deletions
diff --git a/block_pool.go b/block_pool.go index f3a863429..88d1c3739 100644 --- a/block_pool.go +++ b/block_pool.go @@ -3,17 +3,21 @@ package eth import ( "bytes" "container/list" - "fmt" "math" "math/big" "sync" "time" "github.com/ethereum/eth-go/ethchain" + "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) +var poollogger = ethlog.NewLogger("[BPOOL]") + type block struct { + from *Peer peer *Peer block *ethchain.Block reqAt time.Time @@ -30,6 +34,8 @@ type BlockPool struct { td *big.Int quit chan bool + + ChainLength, BlocksProcessed int } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -53,9 +59,9 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.BlockChain().GetBlock(hash) != nil } -func (self *BlockPool) AddHash(hash []byte) { +func (self *BlockPool) AddHash(hash []byte, peer *Peer) { if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{nil, nil, time.Now(), 0} + self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} self.hashPool = append([][]byte{hash}, self.hashPool...) } @@ -66,10 +72,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) - self.pool[hash] = &block{peer, b, time.Now(), 0} + self.pool[hash] = &block{peer, peer, b, time.Now(), 0} } else if self.pool[hash] != nil { self.pool[hash].block = b } + + self.BlocksProcessed++ } func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { @@ -94,18 +102,24 @@ func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks return blocks } -func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { - - var blocks ethchain.Blocks +func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { for _, item := range self.pool { if item.block != nil { blocks = append(blocks, item.block) } } + return +} + +func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) { + blocks := self.Blocks() + ethchain.BlockBy(ethchain.Number).Sort(blocks) for _, block := range blocks { if self.eth.BlockChain().HasBlock(block.PrevHash) { + procAmount++ + f(block) hash := block.Hash() @@ -114,31 +128,58 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { } } + + return } -func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { - self.mut.Lock() - defer self.mut.Unlock() +func (self *BlockPool) DistributeHashes() { + var ( + peerLen = self.eth.peers.Len() + amount = 200 * peerLen + dist = make(map[*Peer][][]byte) + ) num := int(math.Min(float64(amount), float64(len(self.pool)))) - j := 0 - for i := 0; i < len(self.hashPool) && j < num; i++ { - hash := string(self.hashPool[i]) - item := self.pool[hash] - if item != nil && item.block == nil && - (item.peer == nil || - ((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers - (time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) { - self.pool[hash].peer = peer - self.pool[hash].reqAt = time.Now() - self.pool[hash].requested++ - - hashes = append(hashes, self.hashPool[i]) - j++ + for i, j := 0, 0; i < len(self.hashPool) && j < num; i++ { + hash := self.hashPool[i] + item := self.pool[string(hash)] + + if item != nil && item.block == nil { + var peer *Peer + lastFetchFailed := time.Since(item.reqAt) > 5*time.Second + + // Handle failed requests + if lastFetchFailed && item.requested > 0 && item.peer != nil { + if item.requested < 100 { + // Select peer the hash was retrieved off + peer = item.from + } else { + // Remove it + self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + delete(self.pool, string(hash)) + } + } else if lastFetchFailed || item.peer == nil { + // Find a suitable, available peer + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if peer == nil && len(dist[p]) < amount/peerLen { + peer = p + } + }) + } + + if peer != nil { + item.reqAt = time.Now() + item.peer = peer + item.requested++ + + dist[peer] = append(dist[peer], hash) + } } } - return + for peer, hashes := range dist { + peer.FetchBlocks(hashes) + } } func (self *BlockPool) Start() { @@ -158,7 +199,8 @@ out: case <-self.quit: break out case <-serviceTimer.C: - // Clean up hashes that can't be fetched + // Check if we're catching up. If not distribute the hashes to + // the peers and download the blockchain done := true eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { if p.statusKnown && p.FetchingHashes() { @@ -166,29 +208,27 @@ out: } }) - if done { - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if p.statusKnown { - hashes := self.Take(100, p) - if len(hashes) > 0 { - p.FetchBlocks(hashes) - if len(hashes) == 1 { - fmt.Printf("last hash = %x\n", hashes[0]) - } else { - fmt.Println("Requesting", len(hashes), "of", p) - } - } - } - }) + if done && len(self.hashPool) > 0 { + self.DistributeHashes() + } + + if self.ChainLength < len(self.hashPool) { + self.ChainLength = len(self.hashPool) } case <-procTimer.C: - var err error - self.CheckLinkAndProcess(func(block *ethchain.Block) { - err = self.eth.StateManager().Process(block, false) + // XXX We can optimize this lifting this on to a new goroutine. + // We'd need to make sure that the pools are properly protected by a mutex + amount := self.ProcessCanonical(func(block *ethchain.Block) { + err := self.eth.StateManager().Process(block, false) + if err != nil { + poollogger.Infoln(err) + } }) - if err != nil { - peerlogger.Infoln(err) + // Do not propagate to the network on catchups + if amount == 1 { + block := self.eth.BlockChain().CurrentBlock + self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) } } } |