aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-04-19 06:09:12 +0800
committerobscuren <geffobscura@gmail.com>2015-04-19 06:09:12 +0800
commit71aa5fe8a31bdf12a38e940730e60964bbf4a1c0 (patch)
tree7a71713a6447f6a76e8f982f2527e73085473451 /eth/downloader
parent84f1af6413b172c7c88d567e8b9033197196b5bd (diff)
parent164b878854b58aed833eb704579343099854735f (diff)
downloaddexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.gz
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.bz2
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.lz
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.xz
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.zst
dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.zip
Merge branch 'downloader-proto' into develop
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go205
-rw-r--r--eth/downloader/peer.go25
-rw-r--r--eth/downloader/queue.go23
-rw-r--r--eth/downloader/synchronous.go79
4 files changed, 257 insertions, 75 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 4e795af6d..8f955b483 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,6 +1,8 @@
package downloader
import (
+ "errors"
+ "fmt"
"math"
"math/big"
"sync"
@@ -16,8 +18,21 @@ import (
)
const (
- maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
- minDesiredPeerCount = 3 // Amount of peers desired to start syncing
+ maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
+ blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out
+ hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
+)
+
+var (
+ errLowTd = errors.New("peer's TD is too low")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
)
type hashCheckFn func(common.Hash) bool
@@ -26,9 +41,10 @@ type hashIterFn func() (common.Hash, error)
type currentTdFn func() *big.Int
type Downloader struct {
- mu sync.RWMutex
- queue *queue
- peers peers
+ mu sync.RWMutex
+ queue *queue
+ peers peers
+ activePeer string
// Callbacks
hasBlock hashCheckFn
@@ -43,7 +59,7 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
syncCh chan syncPack
- HashCh chan []common.Hash
+ hashCh chan []common.Hash
blockCh chan blockPack
quit chan struct{}
}
@@ -68,7 +84,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn)
currentTd: currentTd,
newPeerCh: make(chan *peer, 1),
syncCh: make(chan syncPack, 1),
- HashCh: make(chan []common.Hash, 1),
+ hashCh: make(chan []common.Hash, 1),
blockCh: make(chan blockPack, 1),
quit: make(chan struct{}),
}
@@ -82,7 +98,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
d.mu.Lock()
defer d.mu.Unlock()
- glog.V(logger.Detail).Infoln("Register peer", id)
+ glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td)
// Create a new peer and add it to the list of known peers
peer := newPeer(id, td, hash, getHashes, getBlocks)
@@ -94,6 +110,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
return nil
}
+// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer.
func (d *Downloader) UnregisterPeer(id string) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -105,8 +122,7 @@ func (d *Downloader) UnregisterPeer(id string) {
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
- //itimer := time.NewTicker(5 * time.Second)
- itimer := time.NewTimer(5 * time.Second)
+ itimer := time.NewTimer(peerCountTimeout)
out:
for {
select {
@@ -116,11 +132,18 @@ out:
if len(d.peers) < minDesiredPeerCount {
break
}
+
d.selectPeer(d.peers.bestPeer())
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state
// in which it attempts to always check the network for highest td peers
- d.selectPeer(d.peers.bestPeer())
+ // Either select the peer or restart the timer if no peers could
+ // be selected.
+ if peer := d.peers.bestPeer(); peer != nil {
+ d.selectPeer(d.peers.bestPeer())
+ } else {
+ itimer.Reset(5 * time.Second)
+ }
case <-d.quit:
break out
}
@@ -131,7 +154,7 @@ func (d *Downloader) selectPeer(p *peer) {
// Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching.
- if !(d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing()) {
+ if !d.isBusy() {
// selected peer must be better than our own
// XXX we also check the peer's recent hash to make sure we
// don't have it. Some peers report (i think) incorrect TD.
@@ -142,6 +165,7 @@ func (d *Downloader) selectPeer(p *peer) {
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
d.syncCh <- syncPack{p, p.recentHash, false}
}
+
}
func (d *Downloader) update() {
@@ -149,30 +173,13 @@ out:
for {
select {
case sync := <-d.syncCh:
- selectedPeer := sync.peer
- glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id)
- // Start the fetcher. This will block the update entirely
- // interupts need to be send to the appropriate channels
- // respectively.
- if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
- // handle error
- glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
- // XXX Reset
+ var peer *peer = sync.peer
+ err := d.getFromPeer(peer, sync.hash, sync.ignoreInitial)
+ if err != nil {
+ glog.V(logger.Detail).Infoln(err)
break
}
- // Start fetching blocks in paralel. The strategy is simple
- // take any available peers, seserve a chunk for each peer available,
- // let the peer deliver the chunkn and periodically check if a peer
- // has timedout. When done downloading, process blocks.
- if err := d.startFetchingBlocks(selectedPeer); err != nil {
- glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
- // XXX reset
- break
- }
-
- glog.V(logger.Detail).Infoln("Sync completed")
-
d.process()
case <-d.quit:
break out
@@ -182,6 +189,9 @@ out:
// XXX Make synchronous
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
+ atomic.StoreInt32(&d.fetchingHashes, 1)
+ defer atomic.StoreInt32(&d.fetchingHashes, 0)
+
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
start := time.Now()
@@ -192,15 +202,15 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
// Add the hash to the queue first
d.queue.hashPool.Add(hash)
}
-
// Get the first batch of hashes
p.getHashes(hash)
- atomic.StoreInt32(&d.fetchingHashes, 1)
+
+ failureResponse := time.NewTimer(hashTtl)
out:
for {
select {
- case hashes := <-d.HashCh:
+ case hashes := <-d.hashCh:
var done bool // determines whether we're done fetching hashes (i.e. common hash found)
hashSet := set.New()
for _, hash := range hashes {
@@ -216,26 +226,36 @@ out:
d.queue.put(hashSet)
// Add hashes to the chunk set
- // Check if we're done fetching
- if !done && len(hashes) > 0 {
- //fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
+ if len(hashes) == 0 { // Make sure the peer actually gave you something valid
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id)
+ d.queue.reset()
+
+ return errEmptyHashSet
+ } else if !done { // Check if we're done fetching
// Get the next set of hashes
p.getHashes(hashes[len(hashes)-1])
- atomic.StoreInt32(&d.fetchingHashes, 1)
- } else {
- atomic.StoreInt32(&d.fetchingHashes, 0)
+ } else { // we're done
break out
}
+ case <-failureResponse.C:
+ glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
+ // TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
+ // 1. check for peer's best hash to be included in the current hash set;
+ // 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer.
+ d.queue.reset()
+
+ return errTimeout
}
}
- glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
+ glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil
}
func (d *Downloader) startFetchingBlocks(p *peer) error {
- glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks")
+ glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
atomic.StoreInt32(&d.downloadingBlocks, 1)
+ defer atomic.StoreInt32(&d.downloadingBlocks, 0)
start := time.Now()
@@ -245,18 +265,18 @@ out:
for {
select {
case blockPack := <-d.blockCh:
- d.peers[blockPack.peerId].promote()
- d.queue.deliver(blockPack.peerId, blockPack.blocks)
- d.peers.setState(blockPack.peerId, idleState)
+ // If the peer was previously banned and failed to deliver it's pack
+ // in a reasonable time frame, ignore it's message.
+ if d.peers[blockPack.peerId] != nil {
+ d.peers[blockPack.peerId].promote()
+ d.queue.deliver(blockPack.peerId, blockPack.blocks)
+ d.peers.setState(blockPack.peerId, idleState)
+ }
case <-ticker.C:
// If there are unrequested hashes left start fetching
// from the available peers.
if d.queue.hashPool.Size() > 0 {
availablePeers := d.peers.get(idleState)
- if len(availablePeers) == 0 {
- glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
- }
-
for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
@@ -265,7 +285,6 @@ out:
continue
}
- //fmt.Println("fetching for", peer.id)
// XXX make fetch blocking.
// Fetch the chunk and check for error. If the peer was somehow
// already fetching a chunk due to a bug, it will be returned to
@@ -276,13 +295,20 @@ out:
d.queue.put(chunk.hashes)
}
}
- atomic.StoreInt32(&d.downloadingBlocks, 1)
+
+ // make sure that we have peers available for fetching. If all peers have been tried
+ // and all failed throw an error
+ if len(d.queue.fetching) == 0 {
+ d.queue.reset()
+ d.peers.reset()
+
+ return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers))
+ }
+
} else if len(d.queue.fetching) == 0 {
// When there are no more queue and no more `fetching`. We can
// safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing
- atomic.StoreInt32(&d.downloadingBlocks, 0)
- // Break out so that we can process with processing blocks
break out
} else {
// Check for bad peers. Bad peers may indicate a peer not responding
@@ -293,10 +319,10 @@ out:
d.queue.mu.Lock()
var badPeers []string
for pid, chunk := range d.queue.fetching {
- if time.Since(chunk.itime) > 5*time.Second {
+ if time.Since(chunk.itime) > blockTtl {
badPeers = append(badPeers, pid)
// remove peer as good peer from peer list
- d.UnregisterPeer(pid)
+ //d.UnregisterPeer(pid)
}
}
d.queue.mu.Unlock()
@@ -313,26 +339,42 @@ out:
d.queue.deliver(pid, nil)
if peer := d.peers[pid]; peer != nil {
peer.demote()
+ peer.reset()
}
}
}
- //fmt.Println(d.queue.hashPool.Size(), len(d.queue.fetching))
}
}
- glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start))
+ glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
+
+ return nil
+}
+
+func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+ // make sure that the hashes that are being added are actually from the peer
+ // that's the current active peer. hashes that have been received from other
+ // peers are dropped and ignored.
+ if d.activePeer != id {
+ return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
+ }
+
+ d.hashCh <- hashes
return nil
}
// Add an (unrequested) block to the downloader. This is usually done through the
// NewBlockMsg by the protocol handler.
-func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
+// Adding blocks is done synchronously. if there are missing blocks, blocks will be
+// fetched first. If the downloader is busy or if some other processed failed an error
+// will be returned.
+func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error {
hash := block.Hash()
if d.hasBlock(hash) {
- return
+ return fmt.Errorf("known block %x", hash.Bytes()[:4])
}
peer := d.peers.getPeer(id)
@@ -340,7 +382,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
// and add the block. Otherwise just ignore it
if peer == nil {
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
- return
+ return errBadPeer
}
peer.mu.Lock()
@@ -353,17 +395,24 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
d.queue.addBlock(id, block, td)
// if neither go ahead to process
- if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
- // Check if the parent of the received block is known.
- // If the block is not know, request it otherwise, request.
- phash := block.ParentHash()
- if !d.hasBlock(phash) {
- glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
- d.syncCh <- syncPack{peer, peer.recentHash, true}
- } else {
- d.process()
+ if d.isBusy() {
+ return errBusy
+ }
+
+ // Check if the parent of the received block is known.
+ // If the block is not know, request it otherwise, request.
+ phash := block.ParentHash()
+ if !d.hasBlock(phash) {
+ glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
+
+ // Get the missing hashes from the peer (synchronously)
+ err := d.getFromPeer(peer, peer.recentHash, true)
+ if err != nil {
+ return err
}
}
+
+ return d.process()
}
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
@@ -383,8 +432,11 @@ func (d *Downloader) process() error {
// to a seperate goroutine where it periodically checks for linked pieces.
types.BlockBy(types.Number).Sort(d.queue.blocks)
blocks := d.queue.blocks
+ if len(blocks) == 0 {
+ return nil
+ }
- glog.V(logger.Debug).Infoln("Inserting chain with", len(blocks), "blocks")
+ glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
var err error
// Loop untill we're out of blocks
@@ -408,6 +460,11 @@ func (d *Downloader) process() error {
}
}
break
+ } else if err != nil {
+ // Reset chain completely. This needs much, much improvement.
+ // instead: check all blocks leading down to this block false block and remove it
+ blocks = nil
+ break
}
blocks = blocks[max:]
}
@@ -432,3 +489,7 @@ func (d *Downloader) isDownloadingBlocks() bool {
func (d *Downloader) isProcessing() bool {
return atomic.LoadInt32(&d.processingBlocks) == 1
}
+
+func (d *Downloader) isBusy() bool {
+ return d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing()
+}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 4cd306a05..88ede16f9 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -6,6 +6,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/common"
+ "gopkg.in/fatih/set.v0"
)
const (
@@ -19,6 +20,12 @@ type blockFetcherFn func([]common.Hash) error
// XXX make threadsafe!!!!
type peers map[string]*peer
+func (p peers) reset() {
+ for _, peer := range p {
+ peer.reset()
+ }
+}
+
func (p peers) get(state int) []*peer {
var peers []*peer
for _, peer := range p {
@@ -64,13 +71,23 @@ type peer struct {
td *big.Int
recentHash common.Hash
+ requested *set.Set
+
getHashes hashFetcherFn
getBlocks blockFetcherFn
}
// create a new peer
func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
- return &peer{id: id, td: td, recentHash: hash, getHashes: getHashes, getBlocks: getBlocks, state: idleState}
+ return &peer{
+ id: id,
+ td: td,
+ recentHash: hash,
+ getHashes: getHashes,
+ getBlocks: getBlocks,
+ state: idleState,
+ requested: set.New(),
+ }
}
// fetch a chunk using the peer
@@ -82,6 +99,8 @@ func (p *peer) fetch(chunk *chunk) error {
return errors.New("peer already fetching chunk")
}
+ p.requested.Merge(chunk.hashes)
+
// set working state
p.state = workingState
// convert the set to a fetchable slice
@@ -115,3 +134,7 @@ func (p *peer) demote() {
p.rep = 0
}
}
+
+func (p *peer) reset() {
+ p.state = idleState
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 4d1aa4e93..ce3aa9850 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -31,6 +31,17 @@ func newqueue() *queue {
}
}
+func (c *queue) reset() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.hashPool.Clear()
+ c.fetchPool.Clear()
+ c.blockHashes.Clear()
+ c.blocks = nil
+ c.fetching = make(map[string]*chunk)
+}
+
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
@@ -49,11 +60,19 @@ func (c *queue) get(p *peer, max int) *chunk {
return false
}
- hashes.Add(v)
- i++
+ // Skip any hashes that have previously been requested from the peer
+ if !p.requested.Has(v) {
+ hashes.Add(v)
+ i++
+ }
return true
})
+ // if no hashes can be requested return a nil chunk
+ if hashes.Size() == 0 {
+ return nil
+ }
+
// remove the fetchable hashes from hash pool
c.hashPool.Separate(hashes)
c.fetchPool.Merge(hashes)
diff --git a/eth/downloader/synchronous.go b/eth/downloader/synchronous.go
new file mode 100644
index 000000000..7bb49d24e
--- /dev/null
+++ b/eth/downloader/synchronous.go
@@ -0,0 +1,79 @@
+package downloader
+
+import (
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
+
+// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
+ // Check if we're busy
+ if d.isBusy() {
+ return nil, errBusy
+ }
+
+ // Attempt to select a peer. This can either be nothing, which returns, best peer
+ // or selected peer. If no peer could be found an error will be returned
+ var p *peer
+ if len(id) == 0 {
+ p = d.peers[id]
+ if p == nil {
+ return nil, errUnknownPeer
+ }
+ } else {
+ p = d.peers.bestPeer()
+ }
+
+ // Make sure our td is lower than the peer's td
+ if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
+ return nil, errLowTd
+ }
+
+ // Get the hash from the peer and initiate the downloading progress.
+ err := d.getFromPeer(p, p.recentHash, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return d.queue.blocks, nil
+}
+
+// Synchronise will synchronise using the best peer.
+func (d *Downloader) Synchronise() (types.Blocks, error) {
+ return d.SynchroniseWithPeer("")
+}
+
+func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
+ d.activePeer = p.id
+
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+ // Start the fetcher. This will block the update entirely
+ // interupts need to be send to the appropriate channels
+ // respectively.
+ if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
+ // handle error
+ glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
+ // XXX Reset
+ return err
+ }
+
+ // Start fetching blocks in paralel. The strategy is simple
+ // take any available peers, seserve a chunk for each peer available,
+ // let the peer deliver the chunkn and periodically check if a peer
+ // has timedout. When done downloading, process blocks.
+ if err := d.startFetchingBlocks(p); err != nil {
+ glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
+ // XXX reset
+ return err
+ }
+
+ glog.V(logger.Detail).Infoln("Sync completed")
+
+ return nil
+}