From eda10c731758217f099bf4a4e232bdb3dca1e478 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 16 Apr 2015 00:14:31 +0200 Subject: downloader: updated downloader and fixed issues with catch up Properly ignore blocks coming from peers not in our peer list (blocked) and do never request anything from bad peers. Added some checks to account for blocks known when requesting hashes (missing parents). --- eth/downloader/downloader.go | 68 ++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 21 deletions(-) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 83e6b8d32..56671dc2e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,8 +54,9 @@ type blockPack struct { } type syncPack struct { - peer *peer - hash common.Hash + peer *peer + hash common.Hash + ignoreInitial bool } func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader { @@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) { func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTicker(5 * time.Second) + //itimer := time.NewTicker(5 * time.Second) + itimer := time.NewTimer(5 * time.Second) out: for { select { case <-d.newPeerCh: + itimer.Stop() // Meet the `minDesiredPeerCount` before we select our best peer if len(d.peers) < minDesiredPeerCount { break @@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) { } glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash} + d.syncCh <- syncPack{p, p.recentHash, false} } } @@ -151,7 +154,7 @@ out: // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. - if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil { + if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil { // handle error glog.V(logger.Debug).Infoln("Error fetching hashes:", err) // XXX Reset @@ -178,11 +181,18 @@ out: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error { - glog.V(logger.Debug).Infoln("Downloading hashes") +func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) start := time.Now() + // We ignore the initial hash in some cases (e.g. we received a block without it's parent) + // In such circumstances we don't need to download the block so don't add it to the queue. + if !ignoreInitial { + // Add the hash to the queue first + d.queue.hashPool.Add(hash) + } + // Get the first batch of hashes p.getHashes(hash) atomic.StoreInt32(&d.fetchingHashes, 1) @@ -195,7 +205,7 @@ out: hashSet := set.New() for _, hash := range hashes { if d.hasBlock(hash) { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash) + glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) done = true break @@ -207,7 +217,7 @@ out: // Add hashes to the chunk set // Check if we're done fetching - if !done { + if !done && len(hashes) > 0 { //fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) // Get the next set of hashes p.getHashes(hashes[len(hashes)-1]) @@ -218,7 +228,7 @@ out: } } } - glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start)) + glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start)) return nil } @@ -242,6 +252,10 @@ out: // from the available peers. if d.queue.hashPool.Size() > 0 { availablePeers := d.peers.get(idleState) + if len(availablePeers) == 0 { + glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers)) + } + for _, peer := range availablePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. @@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { return } - glog.V(logger.Detail).Infoln("Inserting new block from:", id) - d.queue.addBlock(id, block, td) - + peer := d.peers.getPeer(id) // if the peer is in our healthy list of peers; update the td - // here is a good chance to add the peer back to the list - if peer := d.peers.getPeer(id); peer != nil { - peer.mu.Lock() - peer.td = td - peer.recentHash = block.Hash() - peer.mu.Unlock() + // and add the block. Otherwise just ignore it + if peer == nil { + glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id) + return } + peer.mu.Lock() + peer.td = td + peer.recentHash = block.Hash() + peer.mu.Unlock() + + glog.V(logger.Detail).Infoln("Inserting new block from:", id) + d.queue.addBlock(id, block, td) + // if neither go ahead to process if !(d.isFetchingHashes() || d.isDownloadingBlocks()) { - d.process() + // Check if the parent of the received block is known. + // If the block is not know, request it otherwise, request. + phash := block.ParentHash() + if !d.hasBlock(phash) { + glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4]) + d.syncCh <- syncPack{peer, peer.recentHash, true} + } else { + d.process() + } } } @@ -369,7 +395,7 @@ func (d *Downloader) process() error { // TODO change this. This shite for i, block := range blocks[:max] { if !d.hasBlock(block.ParentHash()) { - d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()} + d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true} // remove processed blocks blocks = blocks[i:] -- cgit v1.2.3 From 205378016fc342a9ee683b3d040b3e7d0e2ebb51 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 16 Apr 2015 02:16:33 +0200 Subject: downloader: added demotion / promotion in prep. for rep. system --- eth/downloader/downloader.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 56671dc2e..91cc65249 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -245,6 +245,7 @@ out: for { select { case blockPack := <-d.blockCh: + d.peers[blockPack.peerId].promote() d.queue.deliver(blockPack.peerId, blockPack.blocks) d.peers.setState(blockPack.peerId, idleState) case <-ticker.C: @@ -310,6 +311,9 @@ out: // 2) Measure their speed; // 3) Amount and availability. d.queue.deliver(pid, nil) + if peer := p.peers[pid]; peer != nil { + peer.demote() + } } } @@ -343,6 +347,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { peer.td = td peer.recentHash = block.Hash() peer.mu.Unlock() + peer.promote() glog.V(logger.Detail).Infoln("Inserting new block from:", id) d.queue.addBlock(id, block, td) -- cgit v1.2.3 From eac2df02d17f9a2a42310b8d6580dd5bea53a48e Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 17 Apr 2015 00:11:41 +0200 Subject: downloader: fixed a typo --- eth/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/downloader/downloader.go') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 91cc65249..4e795af6d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -311,7 +311,7 @@ out: // 2) Measure their speed; // 3) Amount and availability. d.queue.deliver(pid, nil) - if peer := p.peers[pid]; peer != nil { + if peer := d.peers[pid]; peer != nil { peer.demote() } } -- cgit v1.2.3