diff options
-rw-r--r-- | block_pool.go | 101 | ||||
-rw-r--r-- | ethereum.go | 1 | ||||
-rw-r--r-- | peer.go | 97 |
3 files changed, 154 insertions, 45 deletions
diff --git a/block_pool.go b/block_pool.go index 6753308b6..f3a863429 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,17 +1,23 @@ package eth import ( + "bytes" + "container/list" + "fmt" "math" "math/big" "sync" + "time" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethutil" ) type block struct { - peer *Peer - block *ethchain.Block + peer *Peer + block *ethchain.Block + reqAt time.Time + requested int } type BlockPool struct { @@ -22,7 +28,8 @@ type BlockPool struct { hashPool [][]byte pool map[string]*block - td *big.Int + td *big.Int + quit chan bool } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -30,6 +37,7 @@ func NewBlockPool(eth *Ethereum) *BlockPool { eth: eth, pool: make(map[string]*block), td: ethutil.Big0, + quit: make(chan bool), } } @@ -47,7 +55,7 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { func (self *BlockPool) AddHash(hash []byte) { if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{nil, nil} + self.pool[string(hash)] = &block{nil, nil, time.Now(), 0} self.hashPool = append([][]byte{hash}, self.hashPool...) } @@ -58,12 +66,34 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) - self.pool[hash] = &block{peer, b} + self.pool[hash] = &block{peer, b, time.Now(), 0} } else if self.pool[hash] != nil { self.pool[hash].block = b } } +func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { + for _, item := range self.pool { + if item.block != nil { + if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 { + return item.block + } + } + } + + return nil +} + +func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks { + var blocks ethchain.Blocks + + for b := block; b != nil; b = self.getParent(b) { + blocks = append(ethchain.Blocks{b}, blocks...) + } + + return blocks +} + func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { var blocks ethchain.Blocks @@ -94,8 +124,14 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { j := 0 for i := 0; i < len(self.hashPool) && j < num; i++ { hash := string(self.hashPool[i]) - if self.pool[hash] != nil && (self.pool[hash].peer == nil || self.pool[hash].peer == peer) && self.pool[hash].block == nil { + item := self.pool[hash] + if item != nil && item.block == nil && + (item.peer == nil || + ((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers + (time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) { self.pool[hash].peer = peer + self.pool[hash].reqAt = time.Now() + self.pool[hash].requested++ hashes = append(hashes, self.hashPool[i]) j++ @@ -104,3 +140,56 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { return } + +func (self *BlockPool) Start() { + go self.update() +} + +func (self *BlockPool) Stop() { + close(self.quit) +} + +func (self *BlockPool) update() { + serviceTimer := time.NewTicker(100 * time.Millisecond) + procTimer := time.NewTicker(500 * time.Millisecond) +out: + for { + select { + case <-self.quit: + break out + case <-serviceTimer.C: + // Clean up hashes that can't be fetched + done := true + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if p.statusKnown && p.FetchingHashes() { + done = false + } + }) + + if done { + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if p.statusKnown { + hashes := self.Take(100, p) + if len(hashes) > 0 { + p.FetchBlocks(hashes) + if len(hashes) == 1 { + fmt.Printf("last hash = %x\n", hashes[0]) + } else { + fmt.Println("Requesting", len(hashes), "of", p) + } + } + } + }) + } + case <-procTimer.C: + var err error + self.CheckLinkAndProcess(func(block *ethchain.Block) { + err = self.eth.StateManager().Process(block, false) + }) + + if err != nil { + peerlogger.Infoln(err) + } + } + } +} diff --git a/ethereum.go b/ethereum.go index 09665336c..8f667b786 100644 --- a/ethereum.go +++ b/ethereum.go @@ -383,7 +383,6 @@ func (s *Ethereum) ReapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { s.reactor.Start() - s.blockPool.Start() // Bind to addr and port ln, err := net.Listen("tcp", ":"+s.Port) if err != nil { @@ -131,6 +131,7 @@ type Peer struct { // Last received pong message lastPong int64 lastBlockReceived time.Time + LastHashReceived time.Time host []byte port uint16 @@ -176,6 +177,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { caps: ethereum.ServerCaps(), version: ethereum.ClientIdentity().String(), protocolCaps: ethutil.NewValue(nil), + td: big.NewInt(0), } } @@ -191,6 +193,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { caps: caps, version: ethereum.ClientIdentity().String(), protocolCaps: ethutil.NewValue(nil), + td: big.NewInt(0), } // Set up the connection in another goroutine so we don't block the main thread @@ -505,6 +508,9 @@ func (p *Peer) HandleInbound() { for it.Next() { hash := it.Value().Bytes() + p.lastReceivedHash = hash + p.LastHashReceived = time.Now() + if blockPool.HasCommonHash(hash) { foundCommonHash = true @@ -512,15 +518,16 @@ func (p *Peer) HandleInbound() { } blockPool.AddHash(hash) - - p.lastReceivedHash = hash - - p.lastBlockReceived = time.Now() } - if foundCommonHash || msg.Data.Len() == 0 { - p.FetchBlocks() - } else { + /* + if foundCommonHash || msg.Data.Len() == 0 { + p.FetchBlocks() + } else { + p.FetchHashes() + } + */ + if !foundCommonHash && msg.Data.Len() != 0 { p.FetchHashes() } @@ -539,19 +546,21 @@ func (p *Peer) HandleInbound() { p.lastBlockReceived = time.Now() } - var err error - blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { - err = p.ethereum.StateManager().Process(block, false) - }) - - if err != nil { - peerlogger.Infoln(err) - } else { - // Don't trigger if there's just one block. - if blockPool.Len() != 0 && msg.Data.Len() > 1 { - p.FetchBlocks() + /* + var err error + blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { + err = p.ethereum.StateManager().Process(block, false) + }) + + if err != nil { + peerlogger.Infoln(err) + } else { + // Don't trigger if there's just one block. + if blockPool.Len() != 0 && msg.Data.Len() > 1 { + p.FetchBlocks() + } } - } + */ } } } @@ -560,10 +569,7 @@ func (p *Peer) HandleInbound() { p.Stop() } -func (self *Peer) FetchBlocks() { - blockPool := self.ethereum.blockPool - - hashes := blockPool.Take(100, self) +func (self *Peer) FetchBlocks(hashes [][]byte) { if len(hashes) > 0 { self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) } @@ -572,7 +578,7 @@ func (self *Peer) FetchBlocks() { func (self *Peer) FetchHashes() { blockPool := self.ethereum.blockPool - if self.td.Cmp(blockPool.td) >= 0 { + if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 { blockPool.td = self.td if !blockPool.HasLatestHash() { @@ -581,6 +587,10 @@ func (self *Peer) FetchHashes() { } } +func (self *Peer) FetchingHashes() bool { + return time.Since(self.LastHashReceived) < 5*time.Second +} + // General update method func (self *Peer) update() { serviceTimer := time.NewTicker(5 * time.Second) @@ -589,11 +599,22 @@ out: for { select { case <-serviceTimer.C: - since := time.Since(self.lastBlockReceived) - if since > 10*time.Second && self.ethereum.blockPool.Len() != 0 && self.IsCap("eth") { - self.FetchHashes() - } else if since > 5*time.Second { - self.catchingUp = false + if self.IsCap("eth") { + var ( + sinceBlock = time.Since(self.lastBlockReceived) + sinceHash = time.Since(self.LastHashReceived) + ) + + if sinceBlock > 5*time.Second && sinceHash > 5*time.Second { + self.catchingUp = false + } + + if sinceHash > 10*time.Second && self.ethereum.blockPool.Len() != 0 { + // XXX While this is completely and utterly incorrect, in order to do anything on the test net is to do it this way + // Assume that when fetching hashes timeouts, we are done. + //self.FetchHashes() + //self.FetchBlocks() + } } case <-self.quit: break out @@ -761,6 +782,14 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { return } + // Self connect detection + pubkey := p.ethereum.KeyManager().PublicKey() + if bytes.Compare(pubkey[1:], pub) == 0 { + p.Stop() + + return + } + usedPub := 0 // This peer is already added to the peerlist so we expect to find a double pubkey at least once eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { @@ -779,16 +808,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { // If this is an inbound connection send an ack back if p.inbound { p.port = uint16(port) - - // Self connect detection - pubkey := p.ethereum.KeyManager().PublicKey() - if bytes.Compare(pubkey, p.pubkey) == 0 { - p.Stop() - - return - } - } + p.SetVersion(clientId) p.versionKnown = true |