From 685862d2ce32294aacb2455bf189ec8e5c4efce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 14:26:20 +0300 Subject: eth/downloader: fix #910, thread safe peers & polishes --- eth/downloader/downloader.go | 140 +++++++++++++++++-------------------------- 1 file changed, 56 insertions(+), 84 deletions(-) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 14ca2cd3d..b1e23f58f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -49,12 +49,6 @@ type blockPack struct { blocks []*types.Block } -type syncPack struct { - peer *peer - hash common.Hash - ignoreInitial bool -} - type hashPack struct { peerId string hashes []common.Hash @@ -63,7 +57,7 @@ type hashPack struct { type Downloader struct { mu sync.RWMutex queue *queue - peers peers + peers *peerSet activePeer string // Callbacks @@ -83,7 +77,7 @@ type Downloader struct { func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { downloader := &Downloader{ queue: newQueue(), - peers: make(peers), + peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, newPeerCh: make(chan *peer, 1), @@ -98,29 +92,26 @@ func (d *Downloader) Stats() (current int, max int) { return d.queue.Size() } -func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Register peer", id) - - // Create a new peer and add it to the list of known peers - peer := newPeer(id, hash, getHashes, getBlocks) - // add peer to our peer set - d.peers[id] = peer - // broadcast new peer - +// RegisterPeer injects a new download peer into the set of block source to be +// used for fetching hashes and blocks from. +func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { + glog.V(logger.Detail).Infoln("Registering peer", id) + if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + glog.V(logger.Error).Infoln("Register failed:", err) + return err + } return nil } -// UnregisterPeer unregisters a peer. This will prevent any action from the specified peer. -func (d *Downloader) UnregisterPeer(id string) { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Unregister peer", id) - - delete(d.peers, id) +// UnregisterPeer remove a peer from the known list, preventing any action from +// the specified peer. +func (d *Downloader) UnregisterPeer(id string) error { + glog.V(logger.Detail).Infoln("Unregistering peer", id) + if err := d.peers.Unregister(id); err != nil { + glog.V(logger.Error).Infoln("Unregister failed:", err) + return err + } + return nil } // Synchronise will select the peer and use it for synchronising. If an empty string is given @@ -140,15 +131,16 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue } - // Reset the queue to clean any internal leftover state + // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() + d.peers.Reset() // Retrieve the origin peer and initiate the downloading process - p := d.peers[id] + p := d.peers.Peer(id) if p == nil { return errUnknownPeer } - return d.getFromPeer(p, hash, false) + return d.syncWithPeer(p, hash) } // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler @@ -167,7 +159,9 @@ func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } -func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { +// syncWithPeer starts a block synchronization based on the hash chain from the +// specified peer and head hash. +func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { d.activePeer = p.id defer func() { // reset on error @@ -177,21 +171,12 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) }() glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - // Start the fetcher. This will block the update entirely - // interupts need to be send to the appropriate channels - // respectively. - if err = d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + if err = d.fetchHashes(p, hash); err != nil { return err } - - // Start fetching blocks in paralel. The strategy is simple - // take any available peers, seserve a chunk for each peer available, - // let the peer deliver the chunkn and periodically check if a peer - // has timedout. - if err = d.startFetchingBlocks(p); err != nil { + if err = d.fetchBlocks(); err != nil { return err } - glog.V(logger.Debug).Infoln("Synchronization completed") return nil @@ -234,17 +219,14 @@ blockDone: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { +func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) start := time.Now() - // We ignore the initial hash in some cases (e.g. we received a block without it's parent) - // In such circumstances we don't need to download the block so don't add it to the queue. - if !ignoreInitial { - // Add the hash to the queue first - d.queue.Insert([]common.Hash{h}) - } + // Add the hash to the queue first + d.queue.Insert([]common.Hash{h}) + // Get the first batch of hashes p.getHashes(h) @@ -308,20 +290,18 @@ out: // Attempt to find a new peer by checking inclusion of peers best hash in our // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. - for id, peer := range d.peers { - if d.queue.Has(peer.recentHash) && !attemptedPeers[id] { + for _, peer := range d.peers.AllPeers() { + if d.queue.Has(peer.head) && !attemptedPeers[p.id] { p = peer break } } - // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (hash == common.Hash{}) { d.queue.Reset() return ErrTimeout } - // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. activePeer = p @@ -334,14 +314,11 @@ out: return nil } -func (d *Downloader) startFetchingBlocks(p *peer) error { +// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// any available peers, reserving a chunk of blocks for each, wait for delivery +// and periodically checking for timeouts. +func (d *Downloader) fetchBlocks() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - - // Defer the peer reset. This will empty the peer requested set - // and makes sure there are no lingering peers with an incorrect - // state - defer d.peers.reset() - start := time.Now() // default ticker for re-fetching blocks every now and then @@ -354,19 +331,19 @@ out: case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if d.peers[blockPack.peerId] != nil { - err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) - if err != nil { - glog.V(logger.Debug).Infof("deliver failed for peer %s: %v\n", blockPack.peerId, err) - // FIXME d.UnregisterPeer(blockPack.peerId) + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, but drop the peer if invalid + if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + d.peers.Unregister(blockPack.peerId) break } - if glog.V(logger.Debug) { - glog.Infof("adding %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) } - d.peers[blockPack.peerId].promote() - d.peers.setState(blockPack.peerId, idleState) + // Promote the peer and update it's idle state + peer.Promote() + peer.SetIdle() } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding @@ -381,13 +358,10 @@ out: // 1) Time for them to respond; // 2) Measure their speed; // 3) Amount and availability. - if peer := d.peers[pid]; peer != nil { - peer.demote() - peer.reset() - } + d.peers.Unregister(pid) } // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if len(d.peers) == 0 { + if d.peers.Peers() == 0 { d.queue.Reset() return errNoPeers } @@ -398,31 +372,29 @@ out: if d.queue.Throttle() { continue } - - availablePeers := d.peers.get(idleState) - for _, peer := range availablePeers { + // Send a download request to all idle peers + idlePeers := d.peers.IdlePeers() + for _, peer := range idlePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. request := d.queue.Reserve(peer, maxBlockFetch) if request == nil { continue } - // XXX make fetch blocking. // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue - if err := peer.fetch(request); err != nil { - // log for tracing - glog.V(logger.Debug).Infof("peer %s received double work (state = %v)\n", peer.id, peer.state) + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) d.queue.Cancel(request) } } - // make sure that we have peers available for fetching. If all peers have been tried + // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if d.queue.InFlight() == 0 { d.queue.Reset() - return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.Pending()) + return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending()) } } else if d.queue.InFlight() == 0 { -- cgit v1.2.3 From ebbd8b0743f9639ebf2b23ed04946cc20a83ff6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 16:47:58 +0300 Subject: eth/downloader: revert to demotion, use harsher penalty --- eth/downloader/downloader.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b1e23f58f..5e9931f59 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -335,7 +335,7 @@ out: // Deliver the received chunk of blocks, but drop the peer if invalid if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) - d.peers.Unregister(blockPack.peerId) + peer.Demote() break } if glog.V(logger.Debug) { @@ -358,7 +358,9 @@ out: // 1) Time for them to respond; // 2) Measure their speed; // 3) Amount and availability. - d.peers.Unregister(pid) + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading if d.peers.Peers() == 0 { @@ -372,9 +374,13 @@ out: if d.queue.Throttle() { continue } - // Send a download request to all idle peers + // Send a download request to all idle peers, until throttled idlePeers := d.peers.IdlePeers() for _, peer := range idlePeers { + // Short circuit if throttling activated since above + if d.queue.Throttle() { + break + } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. request := d.queue.Reserve(peer, maxBlockFetch) -- cgit v1.2.3 From fa53c5e0747925e065ca968c1cca9c974f40dc6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 17:06:42 +0300 Subject: eth/downloader: use count instead of peers, clearer --- eth/downloader/downloader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5e9931f59..4c7ebe646 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -363,7 +363,7 @@ out: } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if d.peers.Peers() == 0 { + if d.peers.Len() == 0 { d.queue.Reset() return errNoPeers } @@ -400,7 +400,7 @@ out: if d.queue.InFlight() == 0 { d.queue.Reset() - return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending()) + return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending()) } } else if d.queue.InFlight() == 0 { -- cgit v1.2.3