From 1dd272080dfb49a07a87c46e18d8aeaa0fd41a08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 Jul 2016 15:14:14 +0300 Subject: eth, eth/downloader: better remote head tracking --- eth/downloader/downloader.go | 7 ++++--- eth/downloader/downloader_test.go | 17 ++++++++++++++--- eth/downloader/peer.go | 13 +++++++++---- eth/handler.go | 30 ++++++++++++++++-------------- eth/peer.go | 34 +++++++++++----------------------- eth/sync.go | 8 +++++--- 6 files changed, 59 insertions(+), 50 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1e9bc27bc..bf1cb5932 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -236,12 +236,12 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, +func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { + if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -501,7 +501,8 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) // Request the advertised remote head block and wait for the response - go p.getRelHeaders(p.head, 1, 0, false) + head, _ := p.currentHead() + go p.getRelHeaders(head, 1, 0, false) timeout := time.After(d.requestTTL()) for { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4ca28091c..a2efc7469 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -400,11 +400,11 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha var err error switch version { case 62: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) + err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) case 63: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) case 64: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) } if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) @@ -463,6 +463,17 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } +// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash +// and total difficulty. +func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) { + return func() (common.Hash, *big.Int) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.peerHashes[id][0], nil + } +} + // peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c2b7a52d0..b0bfc66c8 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "math" + "math/big" "sort" "strings" "sync" @@ -37,6 +38,9 @@ const ( measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) +// Head hash and total difficulty retriever for +type currentHeadRetrievalFn func() (common.Hash, *big.Int) + // Block header and body fetchers belonging to eth/62 and above type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error @@ -52,8 +56,7 @@ var ( // peer represents an active peer from which hashes and blocks are retrieved. type peer struct { - id string // Unique identifier of the peer - head common.Hash // Hash of the peers latest known block + id string // Unique identifier of the peer headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) @@ -74,6 +77,8 @@ type peer struct { lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) + currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer + getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies @@ -87,14 +92,14 @@ type peer struct { // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, version int, head common.Hash, +func newPeer(id string, version int, currentHead currentHeadRetrievalFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ id: id, - head: head, lacking: make(map[common.Hash]struct{}), + currentHead: currentHead, getRelHeaders: getRelHeaders, getAbsHeaders: getAbsHeaders, getBlockBodies: getBlockBodies, diff --git a/eth/handler.go b/eth/handler.go index 9ad430976..8723300f5 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -272,11 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), - p.RequestHeadersByHash, p.RequestHeadersByNumber, - p.RequestBodies, p.RequestReceipts, p.RequestNodeData, - ) - if err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -413,7 +409,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { - if p.Td().Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { + if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } @@ -619,7 +615,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) - p.SetHead(block.Hash) } // Schedule all the unknown hashes for retrieval unknown := make([]announce, 0, len(announces)) @@ -646,16 +641,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) - p.SetHead(request.Block.Hash()) - pm.fetcher.Enqueue(p.id, request.Block) - // Update the peers total difficulty if needed, schedule a download if gapped - if request.TD.Cmp(p.Td()) > 0 { - p.SetTd(request.TD) + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = request.Block.ParentHash() + trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + ) + // Update the peers total difficulty if better than the previous + if _, td := p.Head(); trueTD.Cmp(td) > 0 { + p.SetHead(trueHead, trueTD) + + // Schedule a sync if above ours. Note, this will not fire a sync for a gap of + // a singe block (as the true TD is below the propagated block), however this + // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() - td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 { + if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } } diff --git a/eth/peer.go b/eth/peer.go index c8c207ecb..aa85631ea 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -84,43 +84,31 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { + hash, td := p.Head() + return &PeerInfo{ Version: p.version, - Difficulty: p.Td(), - Head: fmt.Sprintf("%x", p.Head()), + Difficulty: td, + Head: hash.Hex(), } } -// Head retrieves a copy of the current head (most recent) hash of the peer. -func (p *peer) Head() (hash common.Hash) { +// Head retrieves a copy of the current head hash and total difficulty of the +// peer. +func (p *peer) Head() (hash common.Hash, td *big.Int) { p.lock.RLock() defer p.lock.RUnlock() copy(hash[:], p.head[:]) - return hash + return hash, new(big.Int).Set(p.td) } -// SetHead updates the head (most recent) hash of the peer. -func (p *peer) SetHead(hash common.Hash) { +// SetHead updates the head hash and total difficulty of the peer. +func (p *peer) SetHead(hash common.Hash, td *big.Int) { p.lock.Lock() defer p.lock.Unlock() copy(p.head[:], hash[:]) -} - -// Td retrieves the current total difficulty of a peer. -func (p *peer) Td() *big.Int { - p.lock.RLock() - defer p.lock.RUnlock() - - return new(big.Int).Set(p.td) -} - -// SetTd updates the current total difficulty of a peer. -func (p *peer) SetTd(td *big.Int) { - p.lock.Lock() - defer p.lock.Unlock() - p.td.Set(td) } @@ -411,7 +399,7 @@ func (ps *peerSet) BestPeer() *peer { bestTd *big.Int ) for _, p := range ps.peers { - if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 { + if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { bestPeer, bestTd = p, td } } diff --git a/eth/sync.go b/eth/sync.go index 23cf18c8d..e1946edda 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -161,10 +161,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if peer == nil { return } - // Make sure the peer's TD is higher than our own. If not drop. + // Make sure the peer's TD is higher than our own currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - if peer.Td().Cmp(td) <= 0 { + + pHead, pTd := peer.Head() + if pTd.Cmp(td) <= 0 { return } // Otherwise try to sync with the downloader @@ -172,7 +174,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if atomic.LoadUint32(&pm.fastSync) == 1 { mode = downloader.FastSync } - if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil { + if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done -- cgit v1.2.3