diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-18 23:35:03 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-18 23:35:03 +0800 |
commit | 60613b57d1956275bb475a53b5085c4ead4ceb2c (patch) | |
tree | 819665229117f4dbee6a11c08bceab4d5f3e531e | |
parent | ff67fbf96448b83b778960a6c20ea8dfd854c825 (diff) | |
download | go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.gz go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.bz2 go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.lz go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.xz go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.zst go-tangerine-60613b57d1956275bb475a53b5085c4ead4ceb2c.zip |
downloader: make sure that hashes are only accepted from the active peer
-rw-r--r-- | eth/downloader/downloader.go | 63 | ||||
-rw-r--r-- | eth/handler.go | 11 |
2 files changed, 54 insertions, 20 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c71cfa684..41484e927 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -2,6 +2,7 @@ package downloader import ( "errors" + "fmt" "math" "math/big" "sync" @@ -18,8 +19,10 @@ import ( const ( maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk - minDesiredPeerCount = 3 // Amount of peers desired to start syncing - blockTtl = 15 * time.Second // The amount of time it takes for a request to time out + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out + hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out ) var ( @@ -34,9 +37,10 @@ type hashIterFn func() (common.Hash, error) type currentTdFn func() *big.Int type Downloader struct { - mu sync.RWMutex - queue *queue - peers peers + mu sync.RWMutex + queue *queue + peers peers + activePeer string // Callbacks hasBlock hashCheckFn @@ -51,7 +55,7 @@ type Downloader struct { // Channels newPeerCh chan *peer syncCh chan syncPack - HashCh chan []common.Hash + hashCh chan []common.Hash blockCh chan blockPack quit chan struct{} } @@ -76,7 +80,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) currentTd: currentTd, newPeerCh: make(chan *peer, 1), syncCh: make(chan syncPack, 1), - HashCh: make(chan []common.Hash, 1), + hashCh: make(chan []common.Hash, 1), blockCh: make(chan blockPack, 1), quit: make(chan struct{}), } @@ -181,8 +185,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` - //itimer := time.NewTicker(5 * time.Second) - itimer := time.NewTimer(5 * time.Second) + itimer := time.NewTimer(peerCountTimeout) out: for { select { @@ -233,12 +236,16 @@ out: for { select { case sync := <-d.syncCh: - selectedPeer := sync.peer - glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id) + start := time.Now() + + var peer *peer = sync.peer + + d.activePeer = peer.id + glog.V(logger.Detail).Infoln("Synchronising with the network using:", peer.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. - if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil { + if err := d.startFetchingHashes(peer, sync.hash, sync.ignoreInitial); err != nil { // handle error glog.V(logger.Debug).Infoln("Error fetching hashes:", err) // XXX Reset @@ -249,13 +256,13 @@ out: // take any available peers, seserve a chunk for each peer available, // let the peer deliver the chunkn and periodically check if a peer // has timedout. When done downloading, process blocks. - if err := d.startFetchingBlocks(selectedPeer); err != nil { + if err := d.startFetchingBlocks(peer); err != nil { glog.V(logger.Debug).Infoln("Error downloading blocks:", err) // XXX reset break } - glog.V(logger.Detail).Infoln("Sync completed") + glog.V(logger.Detail).Infoln("Network sync completed in", time.Since(start)) d.process() case <-d.quit: @@ -282,10 +289,12 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia // Get the first batch of hashes p.getHashes(hash) + failureResponse := time.NewTimer(hashTtl) + out: for { select { - case hashes := <-d.HashCh: + case hashes := <-d.hashCh: var done bool // determines whether we're done fetching hashes (i.e. common hash found) hashSet := set.New() for _, hash := range hashes { @@ -313,15 +322,20 @@ out: } else { // we're done break out } + case <-failureResponse.C: + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + d.queue.reset() + + break out } } - glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start)) + glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start)) return nil } func (d *Downloader) startFetchingBlocks(p *peer) error { - glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks") + glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)") atomic.StoreInt32(&d.downloadingBlocks, 1) defer atomic.StoreInt32(&d.downloadingBlocks, 0) @@ -407,7 +421,20 @@ out: } } - glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start)) + glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start)) + + return nil +} + +func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { + // make sure that the hashes that are being added are actually from the peer + // that's the current active peer. hashes that have been received from other + // peers are dropped and ignored. + if d.activePeer != id { + return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer) + } + + d.hashCh <- hashes return nil } diff --git a/eth/handler.go b/eth/handler.go index f3fad68b7..3aa9815f1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -194,7 +194,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } - self.downloader.HashCh <- hashes + err := self.downloader.AddHashes(p.id, hashes) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } case GetBlocksMsg: msgStream := rlp.NewStream(msg.Payload) @@ -259,7 +262,11 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Make sure the block isn't already known. If this is the case simply drop // the message and move on. If the TD is < currentTd; drop it as well. If this // chain at some point becomes canonical, the downloader will fetch it. - if self.chainman.HasBlock(hash) && self.chainman.Td().Cmp(request.TD) > 0 { + if self.chainman.HasBlock(hash) { + break + } + if self.chainman.Td().Cmp(request.TD) > 0 { + glog.V(logger.Debug).Infoln("dropped block", request.Block.Number(), "due to low TD", request.TD) break } |