diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 294 |
1 files changed, 221 insertions, 73 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 29b627771..306c4fd2d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -3,6 +3,7 @@ package downloader import ( "bytes" "errors" + "math" "math/rand" "sync" "sync/atomic" @@ -28,32 +29,40 @@ var ( crossCheckCycle = time.Second // Period after which to check for expired cross checks maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( - errLowTd = errors.New("peers TD is too low") - ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") - ErrStallingPeer = errors.New("peer is stalling") - errBannedHead = errors.New("peer head hash already banned") - errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - ErrEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - ErrInvalidChain = errors.New("retrieved hash chain is invalid") - ErrCrossCheckFailed = errors.New("block cross-check failed") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errBannedHead = errors.New("peer head hash already banned") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") + errCancelHashFetch = errors.New("hash fetching canceled (requested)") + errCancelBlockFetch = errors.New("block downloading canceled (requested)") + errCancelChainImport = errors.New("chain importing canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) +// hashCheckFn is a callback type for verifying a hash's presence in the local chain. type hashCheckFn func(common.Hash) bool -type getBlockFn func(common.Hash) *types.Block + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) -type hashIterFn func() (common.Hash, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) type blockPack struct { peerId string @@ -78,13 +87,23 @@ type Downloader struct { checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain banned *set.Set // Set of hashes we've received and banned + // Statistics + importStart time.Time // Instance when the last blocks were taken from the cache + importQueue []*Block // Previously taken blocks to check import progress + importDone int // Number of taken blocks already imported from the last batch + importLock sync.Mutex + // Callbacks - hasBlock hashCheckFn - getBlock getBlockFn + hasBlock hashCheckFn // Checks if a block is present in the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Retrieved the TD of our own chain // Status - synchronising int32 - notified int32 + synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing + synchronising int32 + processing int32 + notified int32 // Channels newPeerCh chan *peer @@ -101,17 +120,20 @@ type Block struct { OriginPeer string } -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { +// New creates a new downloader to fetch hashes and blocks from remote peers. +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasBlock: hasBlock, - getBlock: getBlock, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + hasBlock: hasBlock, + getBlock: getBlock, + insertChain: insertChain, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan hashPack, 1), + blockCh: make(chan blockPack, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -121,11 +143,30 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa return downloader } -func (d *Downloader) Stats() (current int, max int) { - return d.queue.Size() +// Stats retrieves the current status of the downloader. +func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) { + // Fetch the download status + pending, cached = d.queue.Size() + + // Figure out the import progress + d.importLock.Lock() + defer d.importLock.Unlock() + + for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) { + d.importQueue = d.importQueue[1:] + d.importDone++ + } + importing = len(d.importQueue) + + // Make an estimate on the total sync + estimate = 0 + if d.importDone > 0 { + estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing) + } + return } -// Synchronising returns the state of the downloader +// Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { return atomic.LoadInt32(&d.synchronising) > 0 } @@ -158,19 +199,47 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise will select the peer and use it for synchronising. If an empty string is given +// Synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. +func (d *Downloader) Synchronise(id string, head common.Hash) { + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head) + + switch err := d.synchronise(id, head); err { + case nil: + glog.V(logger.Detail).Infof("Synchronisation completed") + + case errBusy: + glog.V(logger.Detail).Infof("Synchronisation already in progress") + + case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed: + glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) + d.dropPeer(id) + + case errPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) + + default: + glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) + } +} + +// synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) error { +func (d *Downloader) synchronise(id string, hash common.Hash) error { + // Mock out the synchonisation if testing + if d.synchroniseMock != nil { + return d.synchroniseMock(id, hash) + } // Make sure only one goroutine is ever allowed past this point at once if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { - return ErrBusy + return errBusy } defer atomic.StoreInt32(&d.synchronising, 0) // If the head hash is banned, terminate immediately if d.banned.Has(hash) { - return ErrInvalidChain + return errBannedHead } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { @@ -184,7 +253,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return ErrPendingQueue + return errPendingQueue } // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() @@ -200,11 +269,6 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() []*Block { - return d.queue.TakeBlocks() -} - // Has checks if the downloader knows about a particular hash, meaning that its // either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { @@ -239,29 +303,26 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) Cancel() bool { - // If we're not syncing just return. - hs, bs := d.queue.Size() - if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { - return false - } +func (d *Downloader) Cancel() { // Close the current cancel channel d.cancelLock.Lock() - select { - case <-d.cancelCh: - // Channel was already closed - default: - close(d.cancelCh) + if d.cancelCh != nil { + select { + case <-d.cancelCh: + // Channel was already closed + default: + close(d.cancelCh) + } } d.cancelLock.Unlock() - // reset the queue + // Reset the queue d.queue.Reset() - - return true } -// XXX Make synchronous +// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// up until it finds a common ancestor. If the source peer times out, alternative +// ones are tried for continuation. func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { var ( start = time.Now() @@ -279,7 +340,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { <-timeout.C // timeout channel should be initially empty. getHashes := func(from common.Hash) { - active.getHashes(from) + go active.getHashes(from) timeout.Reset(hashTTL) } @@ -304,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) - return ErrEmptyHashSet + return errEmptyHashSet } for index, hash := range hashPack.hashes { if d.banned.Has(hash) { @@ -314,7 +375,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } - return ErrInvalidChain + return errInvalidChain } } // Determine if we're done fetching hashes (queue up all pending), and continue if not done @@ -331,12 +392,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) - return ErrBadPeer + return errBadPeer } if !done { // Check that the peer is not stalling the sync if len(inserts) < MinHashFetch { - return ErrStallingPeer + return errStallingPeer } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch @@ -348,9 +409,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { expire: time.Now().Add(blockSoftTTL), parent: parent, } - active.getBlocks([]common.Hash{origin}) + go active.getBlocks([]common.Hash{origin}) - // Also fetch a fresh + // Also fetch a fresh batch of hashes getHashes(head) continue } @@ -370,7 +431,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { block := blockPack.blocks[0] if check, ok := d.checks[block.Hash()]; ok { if block.ParentHash() != check.parent { - return ErrCrossCheckFailed + return errCrossCheckFailed } delete(d.checks, block.Hash()) } @@ -380,7 +441,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { for hash, check := range d.checks { if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) - return ErrCrossCheckFailed + return errCrossCheckFailed } } @@ -400,7 +461,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (head == common.Hash{}) { - return ErrTimeout + return errTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. @@ -457,12 +518,13 @@ out: glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } - // All was successful, promote the peer + // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + go d.process() - case ErrInvalidChain: + case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort return err @@ -579,7 +641,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { return errCancelBlockFetch case <-timeout: - return ErrTimeout + return errTimeout case <-d.hashCh: // Out of bounds hashes received, ignore them @@ -636,6 +698,92 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { } } +// process takes blocks from the queue and tries to import them into the chain. +// +// The algorithmic flow is as follows: +// - The `processing` flag is swapped to 1 to ensure singleton access +// - The current `cancel` channel is retrieved to detect sync abortions +// - Blocks are iteratively taken from the cache and inserted into the chain +// - When the cache becomes empty, insertion stops +// - The `processing` flag is swapped back to 0 +// - A post-exit check is made whether new blocks became available +// - This step is important: it handles a potential race condition between +// checking for no more work, and releasing the processing "mutex". In +// between these state changes, a block may have arrived, but a processing +// attempt denied, so we need to re-enter to ensure the block isn't left +// to idle in the cache. +func (d *Downloader) process() (err error) { + // Make sure only one goroutine is ever allowed to process blocks at once + if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { + return + } + // If the processor just exited, but there are freshly pending items, try to + // reenter. This is needed because the goroutine spinned up for processing + // the fresh blocks might have been rejected entry to to this present thread + // not yet releasing the `processing` state. + defer func() { + if err == nil && d.queue.GetHeadBlock() != nil { + err = d.process() + } + }() + // Release the lock upon exit (note, before checking for reentry!), and set + // the import statistics to zero. + defer func() { + d.importLock.Lock() + d.importQueue = nil + d.importDone = 0 + d.importLock.Unlock() + + atomic.StoreInt32(&d.processing, 0) + }() + + // Fetch the current cancel channel to allow termination + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + // Repeat the processing as long as there are blocks to import + for { + // Fetch the next batch of blocks + blocks := d.queue.TakeBlocks() + if len(blocks) == 0 { + return nil + } + // Reset the import statistics + d.importLock.Lock() + d.importStart = time.Now() + d.importQueue = blocks + d.importDone = 0 + d.importLock.Unlock() + + // Actually import the blocks + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) + for len(blocks) != 0 { // TODO: quit + // Check for any termination requests + select { + case <-cancel: + return errCancelChainImport + default: + } + // Retrieve the first batch of blocks to insert + max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) + raw := make(types.Blocks, 0, max) + for _, block := range blocks[:max] { + raw = append(raw, block.RawBlock) + } + // Try to inset the blocks, drop the originating peer if there's an error + index, err := d.insertChain(raw) + if err != nil { + glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) + d.dropPeer(blocks[index].OriginPeer) + d.Cancel() + return errCancelChainImport + } + blocks = blocks[max:] + } + } +} + // DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { |