aboutsummaryrefslogtreecommitdiffstats
path: root/block_pool.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-10-23 21:01:27 +0800
committerFelix Lange <fjl@twurst.com>2014-10-23 21:01:27 +0800
commit69baa465ea69ae60eed802445cf0132b9eb69934 (patch)
treeb09da7582b5c4850d4db13aee808f2fef2f97de0 /block_pool.go
parent50fd46924900869e7210217c6a07979b544991c8 (diff)
parentfeef194829b07570e91873ed5d1e8cc51e8fa430 (diff)
downloaddexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar.gz
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar.bz2
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar.lz
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar.xz
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.tar.zst
dexon-69baa465ea69ae60eed802445cf0132b9eb69934.zip
Merge eth-go repository into go-ethereum
mist, etheruem have been moved to cmd/
Diffstat (limited to 'block_pool.go')
-rw-r--r--block_pool.go340
1 files changed, 340 insertions, 0 deletions
diff --git a/block_pool.go b/block_pool.go
new file mode 100644
index 000000000..f65d9d576
--- /dev/null
+++ b/block_pool.go
@@ -0,0 +1,340 @@
+package eth
+
+import (
+ "bytes"
+ "container/list"
+ "fmt"
+ "math"
+ "math/big"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/ethchain"
+ "github.com/ethereum/go-ethereum/ethlog"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/ethwire"
+)
+
+var poollogger = ethlog.NewLogger("BPOOL")
+
+type block struct {
+ from *Peer
+ peer *Peer
+ block *ethchain.Block
+ reqAt time.Time
+ requested int
+}
+
+type BlockPool struct {
+ mut sync.Mutex
+
+ eth *Ethereum
+
+ hashes [][]byte
+ pool map[string]*block
+
+ td *big.Int
+ quit chan bool
+
+ fetchingHashes bool
+ downloadStartedAt time.Time
+
+ ChainLength, BlocksProcessed int
+
+ peer *Peer
+}
+
+func NewBlockPool(eth *Ethereum) *BlockPool {
+ return &BlockPool{
+ eth: eth,
+ pool: make(map[string]*block),
+ td: ethutil.Big0,
+ quit: make(chan bool),
+ }
+}
+
+func (self *BlockPool) Len() int {
+ return len(self.hashes)
+}
+
+func (self *BlockPool) Reset() {
+ self.pool = make(map[string]*block)
+ self.hashes = nil
+}
+
+func (self *BlockPool) HasLatestHash() bool {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil
+}
+
+func (self *BlockPool) HasCommonHash(hash []byte) bool {
+ return self.eth.ChainManager().GetBlock(hash) != nil
+}
+
+func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
+ for _, item := range self.pool {
+ if item.block != nil {
+ blocks = append(blocks, item.block)
+ }
+ }
+
+ return
+}
+
+func (self *BlockPool) FetchHashes(peer *Peer) bool {
+ highestTd := self.eth.HighestTDPeer()
+
+ if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer {
+ if self.peer != peer {
+ poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
+
+ if self.peer != nil {
+ self.peer.doneFetchingHashes = true
+ }
+ }
+
+ self.peer = peer
+ self.td = peer.td
+
+ if !self.HasLatestHash() {
+ peer.doneFetchingHashes = false
+
+ const amount = 256
+ peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4])
+ peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
+ }
+
+ return true
+ }
+
+ return false
+}
+
+func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ if self.pool[string(hash)] == nil {
+ self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
+
+ self.hashes = append([][]byte{hash}, self.hashes...)
+ }
+}
+
+func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) {
+ self.addBlock(b, peer, false)
+}
+
+func (self *BlockPool) AddNew(b *ethchain.Block, peer *Peer) {
+ self.addBlock(b, peer, true)
+}
+
+func (self *BlockPool) addBlock(b *ethchain.Block, peer *Peer, newBlock bool) {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ hash := string(b.Hash())
+
+ if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) {
+ poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4])
+
+ self.hashes = append(self.hashes, b.Hash())
+ self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
+
+ // The following is only performed on an unrequested new block
+ if newBlock {
+ fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4]))
+ fmt.Println("2.", self.pool[string(b.PrevHash)] == nil)
+ fmt.Println("3.", !self.fetchingHashes)
+ if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
+ poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4])
+ peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
+ }
+ }
+ } else if self.pool[hash] != nil {
+ self.pool[hash].block = b
+ }
+
+ self.BlocksProcessed++
+}
+
+func (self *BlockPool) Remove(hash []byte) {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
+ delete(self.pool, string(hash))
+}
+
+func (self *BlockPool) DistributeHashes() {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ var (
+ peerLen = self.eth.peers.Len()
+ amount = 256 * peerLen
+ dist = make(map[*Peer][][]byte)
+ )
+
+ num := int(math.Min(float64(amount), float64(len(self.pool))))
+ for i, j := 0, 0; i < len(self.hashes) && j < num; i++ {
+ hash := self.hashes[i]
+ item := self.pool[string(hash)]
+
+ if item != nil && item.block == nil {
+ var peer *Peer
+ lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
+
+ // Handle failed requests
+ if lastFetchFailed && item.requested > 5 && item.peer != nil {
+ if item.requested < 100 {
+ // Select peer the hash was retrieved off
+ peer = item.from
+ } else {
+ // Remove it
+ self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
+ delete(self.pool, string(hash))
+ }
+ } else if lastFetchFailed || item.peer == nil {
+ // Find a suitable, available peer
+ eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
+ if peer == nil && len(dist[p]) < amount/peerLen {
+ peer = p
+ }
+ })
+ }
+
+ if peer != nil {
+ item.reqAt = time.Now()
+ item.peer = peer
+ item.requested++
+
+ dist[peer] = append(dist[peer], hash)
+ }
+ }
+ }
+
+ for peer, hashes := range dist {
+ peer.FetchBlocks(hashes)
+ }
+
+ if len(dist) > 0 {
+ self.downloadStartedAt = time.Now()
+ }
+}
+
+func (self *BlockPool) Start() {
+ go self.downloadThread()
+ go self.chainThread()
+}
+
+func (self *BlockPool) Stop() {
+ close(self.quit)
+}
+
+func (self *BlockPool) downloadThread() {
+ serviceTimer := time.NewTicker(100 * time.Millisecond)
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case <-serviceTimer.C:
+ // Check if we're catching up. If not distribute the hashes to
+ // the peers and download the blockchain
+ self.fetchingHashes = false
+ eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
+ if p.statusKnown && p.FetchingHashes() {
+ self.fetchingHashes = true
+ }
+ })
+
+ if len(self.hashes) > 0 {
+ self.DistributeHashes()
+ }
+
+ if self.ChainLength < len(self.hashes) {
+ self.ChainLength = len(self.hashes)
+ }
+
+ /*
+ if !self.fetchingHashes {
+ blocks := self.Blocks()
+ ethchain.BlockBy(ethchain.Number).Sort(blocks)
+
+ if len(blocks) > 0 {
+ if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
+ }
+ }
+ }
+ */
+ }
+ }
+}
+
+func (self *BlockPool) chainThread() {
+ procTimer := time.NewTicker(500 * time.Millisecond)
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case <-procTimer.C:
+ blocks := self.Blocks()
+ ethchain.BlockBy(ethchain.Number).Sort(blocks)
+
+ // Find common block
+ for i, block := range blocks {
+ if self.eth.ChainManager().HasBlock(block.PrevHash) {
+ blocks = blocks[i:]
+ break
+ }
+ }
+
+ if len(blocks) > 0 {
+ if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) {
+ for i, block := range blocks[1:] {
+ // NOTE: The Ith element in this loop refers to the previous block in
+ // outer "blocks"
+ if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 {
+ blocks = blocks[:i]
+
+ break
+ }
+ }
+ } else {
+ blocks = nil
+ }
+ }
+
+ var err error
+ for i, block := range blocks {
+ err = self.eth.StateManager().Process(block, false)
+ if err != nil {
+ poollogger.Infoln(err)
+ poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
+ poollogger.Debugln(block)
+
+ blocks = blocks[i:]
+
+ break
+ }
+
+ self.Remove(block.Hash())
+ }
+
+ if err != nil {
+ self.Reset()
+
+ poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
+ // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
+ self.eth.BlacklistPeer(self.peer)
+ self.peer.StopWithReason(DiscBadPeer)
+ self.td = ethutil.Big0
+ self.peer = nil
+ }
+ }
+ }
+}