aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-04-18 23:35:03 +0800
committerobscuren <geffobscura@gmail.com>2015-04-18 23:35:03 +0800
commit60613b57d1956275bb475a53b5085c4ead4ceb2c (patch)
tree819665229117f4dbee6a11c08bceab4d5f3e531e
parentff67fbf96448b83b778960a6c20ea8dfd854c825 (diff)
downloaddexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.gz
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.bz2
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.lz
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.xz
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.tar.zst
dexon-60613b57d1956275bb475a53b5085c4ead4ceb2c.zip
downloader: make sure that hashes are only accepted from the active peer
-rw-r--r--eth/downloader/downloader.go63
-rw-r--r--eth/handler.go11
2 files changed, 54 insertions, 20 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index c71cfa684..41484e927 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -2,6 +2,7 @@ package downloader
import (
"errors"
+ "fmt"
"math"
"math/big"
"sync"
@@ -18,8 +19,10 @@ import (
const (
maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
- minDesiredPeerCount = 3 // Amount of peers desired to start syncing
- blockTtl = 15 * time.Second // The amount of time it takes for a request to time out
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
+ blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out
+ hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
)
var (
@@ -34,9 +37,10 @@ type hashIterFn func() (common.Hash, error)
type currentTdFn func() *big.Int
type Downloader struct {
- mu sync.RWMutex
- queue *queue
- peers peers
+ mu sync.RWMutex
+ queue *queue
+ peers peers
+ activePeer string
// Callbacks
hasBlock hashCheckFn
@@ -51,7 +55,7 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
syncCh chan syncPack
- HashCh chan []common.Hash
+ hashCh chan []common.Hash
blockCh chan blockPack
quit chan struct{}
}
@@ -76,7 +80,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn)
currentTd: currentTd,
newPeerCh: make(chan *peer, 1),
syncCh: make(chan syncPack, 1),
- HashCh: make(chan []common.Hash, 1),
+ hashCh: make(chan []common.Hash, 1),
blockCh: make(chan blockPack, 1),
quit: make(chan struct{}),
}
@@ -181,8 +185,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
- //itimer := time.NewTicker(5 * time.Second)
- itimer := time.NewTimer(5 * time.Second)
+ itimer := time.NewTimer(peerCountTimeout)
out:
for {
select {
@@ -233,12 +236,16 @@ out:
for {
select {
case sync := <-d.syncCh:
- selectedPeer := sync.peer
- glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id)
+ start := time.Now()
+
+ var peer *peer = sync.peer
+
+ d.activePeer = peer.id
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", peer.id)
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
- if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
+ if err := d.startFetchingHashes(peer, sync.hash, sync.ignoreInitial); err != nil {
// handle error
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
// XXX Reset
@@ -249,13 +256,13 @@ out:
// take any available peers, seserve a chunk for each peer available,
// let the peer deliver the chunkn and periodically check if a peer
// has timedout. When done downloading, process blocks.
- if err := d.startFetchingBlocks(selectedPeer); err != nil {
+ if err := d.startFetchingBlocks(peer); err != nil {
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
// XXX reset
break
}
- glog.V(logger.Detail).Infoln("Sync completed")
+ glog.V(logger.Detail).Infoln("Network sync completed in", time.Since(start))
d.process()
case <-d.quit:
@@ -282,10 +289,12 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
// Get the first batch of hashes
p.getHashes(hash)
+ failureResponse := time.NewTimer(hashTtl)
+
out:
for {
select {
- case hashes := <-d.HashCh:
+ case hashes := <-d.hashCh:
var done bool // determines whether we're done fetching hashes (i.e. common hash found)
hashSet := set.New()
for _, hash := range hashes {
@@ -313,15 +322,20 @@ out:
} else { // we're done
break out
}
+ case <-failureResponse.C:
+ glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
+ d.queue.reset()
+
+ break out
}
}
- glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
+ glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil
}
func (d *Downloader) startFetchingBlocks(p *peer) error {
- glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks")
+ glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
atomic.StoreInt32(&d.downloadingBlocks, 1)
defer atomic.StoreInt32(&d.downloadingBlocks, 0)
@@ -407,7 +421,20 @@ out:
}
}
- glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start))
+ glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
+
+ return nil
+}
+
+func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+ // make sure that the hashes that are being added are actually from the peer
+ // that's the current active peer. hashes that have been received from other
+ // peers are dropped and ignored.
+ if d.activePeer != id {
+ return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
+ }
+
+ d.hashCh <- hashes
return nil
}
diff --git a/eth/handler.go b/eth/handler.go
index f3fad68b7..3aa9815f1 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -194,7 +194,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
if err := msgStream.Decode(&hashes); err != nil {
break
}
- self.downloader.HashCh <- hashes
+ err := self.downloader.AddHashes(p.id, hashes)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
case GetBlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
@@ -259,7 +262,11 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Make sure the block isn't already known. If this is the case simply drop
// the message and move on. If the TD is < currentTd; drop it as well. If this
// chain at some point becomes canonical, the downloader will fetch it.
- if self.chainman.HasBlock(hash) && self.chainman.Td().Cmp(request.TD) > 0 {
+ if self.chainman.HasBlock(hash) {
+ break
+ }
+ if self.chainman.Td().Cmp(request.TD) > 0 {
+ glog.V(logger.Debug).Infoln("dropped block", request.Block.Number(), "due to low TD", request.TD)
break
}