aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go289
1 files changed, 175 insertions, 114 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 577152a21..1bc81406c 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -2,41 +2,47 @@ package downloader
import (
"errors"
- "fmt"
+ "math/rand"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
const (
- maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk
+ maxHashFetch = 512 // Amount of hashes to be fetched per chunk
+ maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
+ hashTTL = 5 * time.Second // Time it takes for a hash request to time out
)
var (
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
-
- errLowTd = errors.New("peer's TD is too low")
- ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer's unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errNoPeers = errors.New("no peers to keep download active")
- ErrPendingQueue = errors.New("pending items in queue")
- ErrTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errBlockNumberOverflow = errors.New("received block which overflows")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ blockTTL = 5 * time.Second // Time it takes for a block request to time out
+ crossCheckCycle = time.Second // Period after which to check for expired cross checks
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+)
+
+var (
+ errLowTd = errors.New("peer's TD is too low")
+ ErrBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ ErrBadPeer = errors.New("action from bad peer ignored")
+ errNoPeers = errors.New("no peers to keep download active")
+ ErrPendingQueue = errors.New("pending items in queue")
+ ErrTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ ErrInvalidChain = errors.New("retrieved hash chain is invalid")
+ ErrCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
+ errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type hashCheckFn func(common.Hash) bool
@@ -55,10 +61,12 @@ type hashPack struct {
}
type Downloader struct {
- mu sync.RWMutex
- queue *queue
- peers *peerSet
- activePeer string
+ mux *event.TypeMux
+
+ mu sync.RWMutex
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain
// Callbacks
hasBlock hashCheckFn
@@ -66,16 +74,20 @@ type Downloader struct {
// Status
synchronising int32
+ notified int32
// Channels
newPeerCh chan *peer
hashCh chan hashPack
blockCh chan blockPack
- cancelCh chan struct{}
+
+ cancelCh chan struct{} // Channel to cancel mid-flight syncs
+ cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
}
-func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
downloader := &Downloader{
+ mux: mux,
queue: newQueue(),
peers: newPeerSet(),
hasBlock: hasBlock,
@@ -84,7 +96,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1),
}
-
return downloader
}
@@ -92,6 +103,11 @@ func (d *Downloader) Stats() (current int, max int) {
return d.queue.Size()
}
+// Synchronising returns the state of the downloader
+func (d *Downloader) Synchronising() bool {
+ return atomic.LoadInt32(&d.synchronising) > 0
+}
+
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
@@ -124,8 +140,17 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
}
defer atomic.StoreInt32(&d.synchronising, 0)
- // Create cancel channel for aborting midflight
+ // Post a user notification of the sync (only once per session)
+ if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
+ glog.V(logger.Info).Infoln("Block synchronisation started")
+ }
+
+ d.mux.Post(StartEvent{})
+
+ // Create cancel channel for aborting mid-flight
+ d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
@@ -134,6 +159,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
+ d.checks = make(map[common.Hash]time.Time)
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
@@ -143,16 +169,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
return d.syncWithPeer(p, hash)
}
-// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
-// it's possible it yields no blocks
+// TakeBlocks takes blocks from the queue and yields them to the caller.
func (d *Downloader) TakeBlocks() types.Blocks {
- // Check that there are blocks available and its parents are known
- head := d.queue.GetHeadBlock()
- if head == nil || !d.hasBlock(head.ParentHash()) {
- return nil
- }
- // Retrieve a full batch of blocks
- return d.queue.TakeBlocks(head)
+ return d.queue.TakeBlocks()
}
func (d *Downloader) Has(hash common.Hash) bool {
@@ -162,11 +181,13 @@ func (d *Downloader) Has(hash common.Hash) bool {
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
- d.activePeer = p.id
defer func() {
// reset on error
if err != nil {
- d.queue.Reset()
+ d.Cancel()
+ d.mux.Post(FailedEvent{err})
+ } else {
+ d.mux.Post(DoneEvent{})
}
}()
@@ -185,32 +206,20 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
// Cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
func (d *Downloader) Cancel() bool {
- hs, bs := d.queue.Size()
// If we're not syncing just return.
+ hs, bs := d.queue.Size()
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
return false
}
-
- close(d.cancelCh)
-
- // clean up
-hashDone:
- for {
- select {
- case <-d.hashCh:
- default:
- break hashDone
- }
- }
-
-blockDone:
- for {
- select {
- case <-d.blockCh:
- default:
- break blockDone
- }
+ // Close the current cancel channel
+ d.cancelLock.Lock()
+ select {
+ case <-d.cancelCh:
+ // Channel was already closed
+ default:
+ close(d.cancelCh)
}
+ d.cancelLock.Unlock()
// reset the queue
d.queue.Reset()
@@ -224,66 +233,100 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
start := time.Now()
- // Add the hash to the queue first
+ // Add the hash to the queue first, and start hash retrieval
d.queue.Insert([]common.Hash{h})
-
- // Get the first batch of hashes
p.getHashes(h)
var (
- failureResponseTimer = time.NewTimer(hashTtl)
- attemptedPeers = make(map[string]bool) // attempted peers will help with retries
- activePeer = p // active peer will help determine the current active peer
- hash common.Hash // common and last hash
+ active = p // active peer will help determine the current active peer
+ head = common.Hash{} // common and last hash
+
+ timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
+ attempted = make(map[string]bool) // attempted peers will help with retries
+ crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
)
- attemptedPeers[p.id] = true
+ defer crossTicker.Stop()
-out:
- for {
+ attempted[p.id] = true
+ for finished := false; !finished; {
select {
case <-d.cancelCh:
return errCancelHashFetch
+
case hashPack := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != activePeer.id {
+ if hashPack.peerId != active.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
break
}
-
- failureResponseTimer.Reset(hashTtl)
+ timeout.Reset(hashTTL)
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
- glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id)
- d.queue.Reset()
-
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
return errEmptyHashSet
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
done, index := false, 0
- for index, hash = range hashPack.hashes {
- if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil {
- glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
+ for index, head = range hashPack.hashes {
+ if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
+ glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
hashPack.hashes = hashPack.hashes[:index]
done = true
break
}
}
- d.queue.Insert(hashPack.hashes)
-
+ // Insert all the new hashes, but only continue if got something useful
+ inserts := d.queue.Insert(hashPack.hashes)
+ if len(inserts) == 0 && !done {
+ glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
+ return ErrBadPeer
+ }
if !done {
- activePeer.getHashes(hash)
+ // Try and fetch a random block to verify the hash batch
+ // Skip the last hash as the cross check races with the next hash fetch
+ if len(inserts) > 1 {
+ cross := inserts[rand.Intn(len(inserts)-1)]
+ glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
+
+ d.checks[cross] = time.Now().Add(blockTTL)
+ active.getBlocks([]common.Hash{cross})
+ }
+ // Also fetch a fresh
+ active.getHashes(head)
continue
}
// We're done, allocate the download cache and proceed pulling the blocks
offset := 0
- if block := d.getBlock(hash); block != nil {
+ if block := d.getBlock(head); block != nil {
offset = int(block.NumberU64() + 1)
}
d.queue.Alloc(offset)
- break out
+ finished = true
- case <-failureResponseTimer.C:
+ case blockPack := <-d.blockCh:
+ // Cross check the block with the random verifications
+ if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
+ continue
+ }
+ block := blockPack.blocks[0]
+ if _, ok := d.checks[block.Hash()]; ok {
+ if !d.queue.Has(block.ParentHash()) {
+ return ErrCrossCheckFailed
+ }
+ delete(d.checks, block.Hash())
+ }
+
+ case <-crossTicker.C:
+ // Iterate over all the cross checks and fail the hash chain if they're not verified
+ for hash, deadline := range d.checks {
+ if time.Now().After(deadline) {
+ glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
+ return ErrCrossCheckFailed
+ }
+ }
+
+ case <-timeout.C:
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
var p *peer // p will be set if a peer can be found
@@ -291,21 +334,20 @@ out:
// already fetched hash list. This can't guarantee 100% correctness but does
// a fair job. This is always either correct or false incorrect.
for _, peer := range d.peers.AllPeers() {
- if d.queue.Has(peer.head) && !attemptedPeers[p.id] {
+ if d.queue.Has(peer.head) && !attempted[peer.id] {
p = peer
break
}
}
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
- if p == nil || (hash == common.Hash{}) {
- d.queue.Reset()
+ if p == nil || (head == common.Hash{}) {
return ErrTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
- activePeer = p
- p.getHashes(hash)
+ active = p
+ p.getHashes(head)
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
}
}
@@ -328,12 +370,26 @@ out:
select {
case <-d.cancelCh:
return errCancelBlockFetch
+
case blockPack := <-d.blockCh:
+ // Short circuit if it's a stale cross check
+ if len(blockPack.blocks) == 1 {
+ block := blockPack.blocks[0]
+ if _, ok := d.checks[block.Hash()]; ok {
+ delete(d.checks, block.Hash())
+ continue
+ }
+ }
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
- // Deliver the received chunk of blocks, but drop the peer if invalid
+ // Deliver the received chunk of blocks
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
+ if err == ErrInvalidChain {
+ // The hash chain is invalid (blocks are not ordered properly), abort
+ return err
+ }
+ // Peer did deliver, but some blocks were off, penalize
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
peer.Demote()
break
@@ -351,7 +407,7 @@ out:
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
- badPeers := d.queue.Expire(blockTtl)
+ badPeers := d.queue.Expire(blockTTL)
for _, pid := range badPeers {
// XXX We could make use of a reputation system here ranking peers
// in their performance
@@ -364,7 +420,6 @@ out:
}
// After removing bad peers make sure we actually have sufficient peer left to keep downloading
if d.peers.Len() == 0 {
- d.queue.Reset()
return errNoPeers
}
// If there are unrequested hashes left start fetching
@@ -398,9 +453,7 @@ out:
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if d.queue.InFlight() == 0 {
- d.queue.Reset()
-
- return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending())
+ return errPeersUnavailable
}
} else if d.queue.InFlight() == 0 {
@@ -416,37 +469,45 @@ out:
return nil
}
-// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
-// the protocol handler.
-func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a new batch of blocks received from a remote node.
+// This is usually invoked through the BlocksMsg by the protocol handler.
+func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
// Make sure the downloader is active
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- d.blockCh <- blockPack{id, blocks}
+ select {
+ case d.blockCh <- blockPack{id, blocks}:
+ return nil
- return nil
+ case <-cancel:
+ return errNoSyncActive
+ }
}
-func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+// DeliverHashes injects a new batch of hashes received from a remote node into
+// the download schedule. This is usually invoked through the BlockHashesMsg by
+// the protocol handler.
+func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
// Make sure the downloader is active
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- // make sure that the hashes that are being added are actually from the peer
- // that's the current active peer. hashes that have been received from other
- // peers are dropped and ignored.
- if d.activePeer != id {
- return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
- }
+ select {
+ case d.hashCh <- hashPack{id, hashes}:
+ return nil
- if glog.V(logger.Debug) && len(hashes) != 0 {
- from, to := hashes[0], hashes[len(hashes)-1]
- glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
+ case <-cancel:
+ return errNoSyncActive
}
- d.hashCh <- hashPack{id, hashes}
-
- return nil
}