From 12f30e6220354c4a8b08ecf41bb53444143f3660 Mon Sep 17 00:00:00 2001 From: Maran Date: Tue, 20 May 2014 11:50:34 +0200 Subject: Refactored a lot of the chain catchup/reorg. --- peer.go | 125 +++++++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 73 insertions(+), 52 deletions(-) (limited to 'peer.go') diff --git a/peer.go b/peer.go index 45ff0a795..879361b2b 100644 --- a/peer.go +++ b/peer.go @@ -127,6 +127,7 @@ type Peer struct { // Indicated whether the node is catching up or not catchingUp bool + diverted bool blocksRequested int Version string @@ -190,7 +191,6 @@ func (p *Peer) QueueMessage(msg *ethwire.Msg) { if atomic.LoadInt32(&p.connected) != 1 { return } - p.outputQueue <- msg } @@ -268,7 +268,6 @@ func (p *Peer) HandleInbound() { for atomic.LoadInt32(&p.disconnect) == 0 { // HMM? time.Sleep(500 * time.Millisecond) - // Wait for a message from the peer msgs, err := ethwire.ReadMessages(p.conn) if err != nil { @@ -300,32 +299,36 @@ func (p *Peer) HandleInbound() { var err error // Make sure we are actually receiving anything - if msg.Data.Len()-1 > 1 && p.catchingUp { + if msg.Data.Len()-1 > 1 && p.diverted { // We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find // common ground to start syncing from lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1)) - if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) { - // If we can't find a common ancenstor we need to request more blocks. - // FIXME: At one point this won't scale anymore since we are not asking for an offset - // we just keep increasing the amount of blocks. - //fmt.Println("[PEER] No common ancestor found, requesting more blocks.") - p.blocksRequested = p.blocksRequested * 2 - p.catchingUp = false - p.SyncWithBlocks() - } - + ethutil.Config.Log.Infof("[PEER] Last block: %x. Checking if we have it locally.\n", lastBlock.Hash()) for i := msg.Data.Len() - 1; i >= 0; i-- { block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) // Do we have this block on our chain? If so we can continue if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) { // We don't have this block, but we do have a block with the same prevHash, diversion time! if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) { - if p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) { - return + p.diverted = false + if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) { + p.SyncWithPeerToLastKnown() } + break } } } + if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) { + // If we can't find a common ancenstor we need to request more blocks. + // FIXME: At one point this won't scale anymore since we are not asking for an offset + // we just keep increasing the amount of blocks. + p.blocksRequested = p.blocksRequested * 2 + + ethutil.Config.Log.Infof("[PEER] No common ancestor found, requesting %d more blocks.\n", p.blocksRequested) + p.catchingUp = false + p.FindCommonParentBlock() + break + } } for i := msg.Data.Len() - 1; i >= 0; i-- { @@ -346,23 +349,28 @@ func (p *Peer) HandleInbound() { } } + if msg.Data.Len() == 0 { + // Set catching up to false if + // the peer has nothing left to give + p.catchingUp = false + } + if err != nil { // If the parent is unknown try to catch up with this peer if ethchain.IsParentErr(err) { - ethutil.Config.Log.Infoln("Attempting to catch up") + ethutil.Config.Log.Infoln("Attempting to catch up since we don't know the parent") p.catchingUp = false p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) } else if ethchain.IsValidationErr(err) { - fmt.Println(err) + fmt.Println("Err:", err) p.catchingUp = false } } else { - // XXX Do we want to catch up if there were errors? // If we're catching up, try to catch up further. if p.catchingUp && msg.Data.Len() > 1 { - if ethutil.Config.Debug && lastBlock != nil { + if lastBlock != nil { blockInfo := lastBlock.BlockInfo() - ethutil.Config.Log.Infof("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash) + ethutil.Config.Log.Debugf("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash) } p.catchingUp = false @@ -372,11 +380,6 @@ func (p *Peer) HandleInbound() { } } - if msg.Data.Len() == 0 { - // Set catching up to false if - // the peer has nothing left to give - p.catchingUp = false - } case ethwire.MsgTxTy: // If the message was a transaction queue the transaction // in the TxPool where it will undergo validation and @@ -444,7 +447,7 @@ func (p *Peer) HandleInbound() { } } else { - ethutil.Config.Log.Debugf("[PEER] Could not find a similar block") + //ethutil.Config.Log.Debugf("[PEER] Could not find a similar block") // If no blocks are found we send back a reply with msg not in chain // and the last hash from get chain lastHash := msg.Data.Get(l - 1) @@ -452,8 +455,14 @@ func (p *Peer) HandleInbound() { p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()})) } case ethwire.MsgNotInChainTy: - ethutil.Config.Log.Debugf("Not in chain %x\n", msg.Data) - // TODO + ethutil.Config.Log.Debugf("Not in chain: %x\n", msg.Data.Get(0).Bytes()) + if p.diverted == true { + // If were already looking for a common parent and we get here again we need to go deeper + p.blocksRequested = p.blocksRequested * 2 + } + p.diverted = true + p.catchingUp = false + p.FindCommonParentBlock() case ethwire.MsgGetTxsTy: // Get the current transactions of the pool txs := p.ethereum.TxPool().CurrentTransactions() @@ -471,7 +480,6 @@ func (p *Peer) HandleInbound() { } } } - p.Stop() } @@ -581,14 +589,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } - // Catch up with the connected peer - p.SyncWithBlocks() - // Set the peer's caps p.caps = Caps(c.Get(3).Byte()) // Get a reference to the peers version p.Version = c.Get(2).Str() + // Catch up with the connected peer + if !p.ethereum.IsUpToDate() { + ethutil.Config.Log.Debugln("Already syncing up with a peer; sleeping") + time.Sleep(10 * time.Second) + } + p.SyncWithPeerToLastKnown() + ethutil.Config.Log.Debugln("[PEER]", p) } @@ -609,38 +621,47 @@ func (p *Peer) String() string { return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.Version, p.caps) } -func (p *Peer) SyncWithBlocks() { - if !p.catchingUp { - p.catchingUp = true - // FIXME: THIS SHOULD NOT BE NEEDED - if p.blocksRequested == 0 { - p.blocksRequested = 10 - } - blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested) +func (p *Peer) SyncWithPeerToLastKnown() { + p.catchingUp = false + p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) +} - var hashes []interface{} - for _, block := range blocks { - hashes = append(hashes, block.Hash()) - } +func (p *Peer) FindCommonParentBlock() { + if p.catchingUp { + return + } - msgInfo := append(hashes, uint64(50)) + p.catchingUp = true + if p.blocksRequested == 0 { + p.blocksRequested = 20 + } + blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested) - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo) - p.QueueMessage(msg) + var hashes []interface{} + for _, block := range blocks { + hashes = append(hashes, block.Hash()) } -} + msgInfo := append(hashes, uint64(len(hashes))) + + ethutil.Config.Log.Infof("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String()) + + msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo) + p.QueueMessage(msg) +} func (p *Peer) CatchupWithPeer(blockHash []byte) { if !p.catchingUp { + // Make sure nobody else is catching up when you want to do this p.catchingUp = true msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(50)}) p.QueueMessage(msg) - ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4]) + ethutil.Config.Log.Debugf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr()) - msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{}) - p.QueueMessage(msg) - ethutil.Config.Log.Debugln("Requested transactions") + /* + msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{}) + p.QueueMessage(msg) + */ } } -- cgit v1.2.3