diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-18 07:10:32 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-18 07:10:32 +0800 |
commit | 2339ee9910c10fa498f4a4ebc25f6a780ccf78e9 (patch) | |
tree | 11b9a6a3244990d3a3ea888d85eaeafad4aeebef /eth/downloader | |
parent | 73eb8e8c20b784e3a43f826e51a81326619d98ef (diff) | |
parent | 2c2ddcbf8885bf509741105ec57967fe78c744fe (diff) | |
download | go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar.gz go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar.bz2 go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar.lz go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar.xz go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.tar.zst go-tangerine-2339ee9910c10fa498f4a4ebc25f6a780ccf78e9.zip |
Merge branch 'develop' into downloader-proto
Conflicts:
eth/downloader/downloader.go
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 102 | ||||
-rw-r--r-- | eth/downloader/peer.go | 20 |
2 files changed, 117 insertions, 5 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1707e1395..5f9d9ed74 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,6 +1,7 @@ package downloader import ( + "errors" "math" "math/big" "sync" @@ -20,6 +21,12 @@ const ( minDesiredPeerCount = 3 // Amount of peers desired to start syncing ) +var ( + errLowTd = errors.New("peer's TD is too low") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") +) + type hashCheckFn func(common.Hash) bool type chainInsertFn func(types.Blocks) error type hashIterFn func() (common.Hash, error) @@ -82,18 +89,19 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH d.mu.Lock() defer d.mu.Unlock() - glog.V(logger.Detail).Infoln("Register peer", id) + glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td) // Create a new peer and add it to the list of known peers peer := newPeer(id, td, hash, getHashes, getBlocks) // add peer to our peer set d.peers[id] = peer // broadcast new peer - d.newPeerCh <- peer + //d.newPeerCh <- peer return nil } +// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer. func (d *Downloader) UnregisterPeer(id string) { d.mu.Lock() defer d.mu.Unlock() @@ -103,6 +111,73 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } +// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given +// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the +// checks fail an error will be returned. This method is synchronous +func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) { + // Check if we're busy + if d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing() { + return nil, errBusy + } + + // Attempt to select a peer. This can either be nothing, which returns, best peer + // or selected peer. If no peer could be found an error will be returned + var p *peer + if len(id) == 0 { + p = d.peers[id] + if p == nil { + return nil, errUnknownPeer + } + } else { + p = d.peers.bestPeer() + } + + // Make sure our td is lower than the peer's td + if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { + return nil, errLowTd + } + + // Get the hash from the peer and initiate the downloading progress. + err := d.getFromPeer(p, p.recentHash, false) + if err != nil { + return nil, err + } + + return d.queue.blocks, nil +} + +// Synchronise will synchronise using the best peer. +func (d *Downloader) Synchronise() (types.Blocks, error) { + return d.SynchroniseWithPeer("") +} + +func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { + glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + // Start the fetcher. This will block the update entirely + // interupts need to be send to the appropriate channels + // respectively. + if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + // handle error + glog.V(logger.Debug).Infoln("Error fetching hashes:", err) + // XXX Reset + return err + } + + // Start fetching blocks in paralel. The strategy is simple + // take any available peers, seserve a chunk for each peer available, + // let the peer deliver the chunkn and periodically check if a peer + // has timedout. When done downloading, process blocks. + if err := d.startFetchingBlocks(p); err != nil { + glog.V(logger.Debug).Infoln("Error downloading blocks:", err) + // XXX reset + return err + } + + glog.V(logger.Detail).Infoln("Sync completed") + + return nil +} + func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` //itimer := time.NewTicker(5 * time.Second) @@ -116,11 +191,18 @@ out: if len(d.peers) < minDesiredPeerCount { break } + d.selectPeer(d.peers.bestPeer()) case <-itimer.C: // The timer will make sure that the downloader keeps an active state // in which it attempts to always check the network for highest td peers - d.selectPeer(d.peers.bestPeer()) + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := d.peers.bestPeer(); peer != nil { + d.selectPeer(d.peers.bestPeer()) + } else { + itimer.Reset(5 * time.Second) + } case <-d.quit: break out } @@ -142,6 +224,7 @@ func (d *Downloader) selectPeer(p *peer) { glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) d.syncCh <- syncPack{p, p.recentHash, false} } + } func (d *Downloader) update() { @@ -245,8 +328,13 @@ out: for { select { case blockPack := <-d.blockCh: - d.queue.deliver(blockPack.peerId, blockPack.blocks) - d.peers.setState(blockPack.peerId, idleState) + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if d.peers[blockPack.peerId] != nil { + d.peers[blockPack.peerId].promote() + d.queue.deliver(blockPack.peerId, blockPack.blocks) + d.peers.setState(blockPack.peerId, idleState) + } case <-ticker.C: // If there are unrequested hashes left start fetching // from the available peers. @@ -310,6 +398,9 @@ out: // 2) Measure their speed; // 3) Amount and availability. d.queue.deliver(pid, nil) + if peer := d.peers[pid]; peer != nil { + peer.demote() + } } } @@ -343,6 +434,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { peer.td = td peer.recentHash = block.Hash() peer.mu.Unlock() + peer.promote() glog.V(logger.Detail).Infoln("Inserting new block from:", id) d.queue.addBlock(id, block, td) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index f66e5afd8..4cd306a05 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -95,3 +95,23 @@ func (p *peer) fetch(chunk *chunk) error { return nil } + +// promote increases the peer's reputation +func (p *peer) promote() { + p.mu.Lock() + defer p.mu.Unlock() + + p.rep++ +} + +// demote decreases the peer's reputation or leaves it at 0 +func (p *peer) demote() { + p.mu.Lock() + defer p.mu.Unlock() + + if p.rep > 1 { + p.rep -= 2 + } else { + p.rep = 0 + } +} |