diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-19 06:09:12 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-19 06:09:12 +0800 |
commit | 71aa5fe8a31bdf12a38e940730e60964bbf4a1c0 (patch) | |
tree | 7a71713a6447f6a76e8f982f2527e73085473451 /eth/downloader | |
parent | 84f1af6413b172c7c88d567e8b9033197196b5bd (diff) | |
parent | 164b878854b58aed833eb704579343099854735f (diff) | |
download | dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.gz dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.bz2 dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.lz dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.xz dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.tar.zst dexon-71aa5fe8a31bdf12a38e940730e60964bbf4a1c0.zip |
Merge branch 'downloader-proto' into develop
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 205 | ||||
-rw-r--r-- | eth/downloader/peer.go | 25 | ||||
-rw-r--r-- | eth/downloader/queue.go | 23 | ||||
-rw-r--r-- | eth/downloader/synchronous.go | 79 |
4 files changed, 257 insertions, 75 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4e795af6d..8f955b483 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,6 +1,8 @@ package downloader import ( + "errors" + "fmt" "math" "math/big" "sync" @@ -16,8 +18,21 @@ import ( ) const ( - maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk - minDesiredPeerCount = 3 // Amount of peers desired to start syncing + maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out + hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out +) + +var ( + errLowTd = errors.New("peer's TD is too low") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") ) type hashCheckFn func(common.Hash) bool @@ -26,9 +41,10 @@ type hashIterFn func() (common.Hash, error) type currentTdFn func() *big.Int type Downloader struct { - mu sync.RWMutex - queue *queue - peers peers + mu sync.RWMutex + queue *queue + peers peers + activePeer string // Callbacks hasBlock hashCheckFn @@ -43,7 +59,7 @@ type Downloader struct { // Channels newPeerCh chan *peer syncCh chan syncPack - HashCh chan []common.Hash + hashCh chan []common.Hash blockCh chan blockPack quit chan struct{} } @@ -68,7 +84,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) currentTd: currentTd, newPeerCh: make(chan *peer, 1), syncCh: make(chan syncPack, 1), - HashCh: make(chan []common.Hash, 1), + hashCh: make(chan []common.Hash, 1), blockCh: make(chan blockPack, 1), quit: make(chan struct{}), } @@ -82,7 +98,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH d.mu.Lock() defer d.mu.Unlock() - glog.V(logger.Detail).Infoln("Register peer", id) + glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td) // Create a new peer and add it to the list of known peers peer := newPeer(id, td, hash, getHashes, getBlocks) @@ -94,6 +110,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH return nil } +// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer. func (d *Downloader) UnregisterPeer(id string) { d.mu.Lock() defer d.mu.Unlock() @@ -105,8 +122,7 @@ func (d *Downloader) UnregisterPeer(id string) { func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` - //itimer := time.NewTicker(5 * time.Second) - itimer := time.NewTimer(5 * time.Second) + itimer := time.NewTimer(peerCountTimeout) out: for { select { @@ -116,11 +132,18 @@ out: if len(d.peers) < minDesiredPeerCount { break } + d.selectPeer(d.peers.bestPeer()) case <-itimer.C: // The timer will make sure that the downloader keeps an active state // in which it attempts to always check the network for highest td peers - d.selectPeer(d.peers.bestPeer()) + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := d.peers.bestPeer(); peer != nil { + d.selectPeer(d.peers.bestPeer()) + } else { + itimer.Reset(5 * time.Second) + } case <-d.quit: break out } @@ -131,7 +154,7 @@ func (d *Downloader) selectPeer(p *peer) { // Make sure it's doing neither. Once done we can restart the // downloading process if the TD is higher. For now just get on // with whatever is going on. This prevents unecessary switching. - if !(d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing()) { + if !d.isBusy() { // selected peer must be better than our own // XXX we also check the peer's recent hash to make sure we // don't have it. Some peers report (i think) incorrect TD. @@ -142,6 +165,7 @@ func (d *Downloader) selectPeer(p *peer) { glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) d.syncCh <- syncPack{p, p.recentHash, false} } + } func (d *Downloader) update() { @@ -149,30 +173,13 @@ out: for { select { case sync := <-d.syncCh: - selectedPeer := sync.peer - glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id) - // Start the fetcher. This will block the update entirely - // interupts need to be send to the appropriate channels - // respectively. - if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil { - // handle error - glog.V(logger.Debug).Infoln("Error fetching hashes:", err) - // XXX Reset + var peer *peer = sync.peer + err := d.getFromPeer(peer, sync.hash, sync.ignoreInitial) + if err != nil { + glog.V(logger.Detail).Infoln(err) break } - // Start fetching blocks in paralel. The strategy is simple - // take any available peers, seserve a chunk for each peer available, - // let the peer deliver the chunkn and periodically check if a peer - // has timedout. When done downloading, process blocks. - if err := d.startFetchingBlocks(selectedPeer); err != nil { - glog.V(logger.Debug).Infoln("Error downloading blocks:", err) - // XXX reset - break - } - - glog.V(logger.Detail).Infoln("Sync completed") - d.process() case <-d.quit: break out @@ -182,6 +189,9 @@ out: // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { + atomic.StoreInt32(&d.fetchingHashes, 1) + defer atomic.StoreInt32(&d.fetchingHashes, 0) + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) start := time.Now() @@ -192,15 +202,15 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia // Add the hash to the queue first d.queue.hashPool.Add(hash) } - // Get the first batch of hashes p.getHashes(hash) - atomic.StoreInt32(&d.fetchingHashes, 1) + + failureResponse := time.NewTimer(hashTtl) out: for { select { - case hashes := <-d.HashCh: + case hashes := <-d.hashCh: var done bool // determines whether we're done fetching hashes (i.e. common hash found) hashSet := set.New() for _, hash := range hashes { @@ -216,26 +226,36 @@ out: d.queue.put(hashSet) // Add hashes to the chunk set - // Check if we're done fetching - if !done && len(hashes) > 0 { - //fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) + if len(hashes) == 0 { // Make sure the peer actually gave you something valid + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id) + d.queue.reset() + + return errEmptyHashSet + } else if !done { // Check if we're done fetching // Get the next set of hashes p.getHashes(hashes[len(hashes)-1]) - atomic.StoreInt32(&d.fetchingHashes, 1) - } else { - atomic.StoreInt32(&d.fetchingHashes, 0) + } else { // we're done break out } + case <-failureResponse.C: + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + // TODO instead of reseting the queue select a new peer from which we can start downloading hashes. + // 1. check for peer's best hash to be included in the current hash set; + // 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer. + d.queue.reset() + + return errTimeout } } - glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start)) + glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start)) return nil } func (d *Downloader) startFetchingBlocks(p *peer) error { - glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks") + glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)") atomic.StoreInt32(&d.downloadingBlocks, 1) + defer atomic.StoreInt32(&d.downloadingBlocks, 0) start := time.Now() @@ -245,18 +265,18 @@ out: for { select { case blockPack := <-d.blockCh: - d.peers[blockPack.peerId].promote() - d.queue.deliver(blockPack.peerId, blockPack.blocks) - d.peers.setState(blockPack.peerId, idleState) + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if d.peers[blockPack.peerId] != nil { + d.peers[blockPack.peerId].promote() + d.queue.deliver(blockPack.peerId, blockPack.blocks) + d.peers.setState(blockPack.peerId, idleState) + } case <-ticker.C: // If there are unrequested hashes left start fetching // from the available peers. if d.queue.hashPool.Size() > 0 { availablePeers := d.peers.get(idleState) - if len(availablePeers) == 0 { - glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers)) - } - for _, peer := range availablePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. @@ -265,7 +285,6 @@ out: continue } - //fmt.Println("fetching for", peer.id) // XXX make fetch blocking. // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to @@ -276,13 +295,20 @@ out: d.queue.put(chunk.hashes) } } - atomic.StoreInt32(&d.downloadingBlocks, 1) + + // make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if len(d.queue.fetching) == 0 { + d.queue.reset() + d.peers.reset() + + return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers)) + } + } else if len(d.queue.fetching) == 0 { // When there are no more queue and no more `fetching`. We can // safely assume we're done. Another part of the process will check // for parent errors and will re-request anything that's missing - atomic.StoreInt32(&d.downloadingBlocks, 0) - // Break out so that we can process with processing blocks break out } else { // Check for bad peers. Bad peers may indicate a peer not responding @@ -293,10 +319,10 @@ out: d.queue.mu.Lock() var badPeers []string for pid, chunk := range d.queue.fetching { - if time.Since(chunk.itime) > 5*time.Second { + if time.Since(chunk.itime) > blockTtl { badPeers = append(badPeers, pid) // remove peer as good peer from peer list - d.UnregisterPeer(pid) + //d.UnregisterPeer(pid) } } d.queue.mu.Unlock() @@ -313,26 +339,42 @@ out: d.queue.deliver(pid, nil) if peer := d.peers[pid]; peer != nil { peer.demote() + peer.reset() } } } - //fmt.Println(d.queue.hashPool.Size(), len(d.queue.fetching)) } } - glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start)) + glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start)) + + return nil +} + +func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { + // make sure that the hashes that are being added are actually from the peer + // that's the current active peer. hashes that have been received from other + // peers are dropped and ignored. + if d.activePeer != id { + return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer) + } + + d.hashCh <- hashes return nil } // Add an (unrequested) block to the downloader. This is usually done through the // NewBlockMsg by the protocol handler. -func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { +// Adding blocks is done synchronously. if there are missing blocks, blocks will be +// fetched first. If the downloader is busy or if some other processed failed an error +// will be returned. +func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error { hash := block.Hash() if d.hasBlock(hash) { - return + return fmt.Errorf("known block %x", hash.Bytes()[:4]) } peer := d.peers.getPeer(id) @@ -340,7 +382,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { // and add the block. Otherwise just ignore it if peer == nil { glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id) - return + return errBadPeer } peer.mu.Lock() @@ -353,17 +395,24 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { d.queue.addBlock(id, block, td) // if neither go ahead to process - if !(d.isFetchingHashes() || d.isDownloadingBlocks()) { - // Check if the parent of the received block is known. - // If the block is not know, request it otherwise, request. - phash := block.ParentHash() - if !d.hasBlock(phash) { - glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4]) - d.syncCh <- syncPack{peer, peer.recentHash, true} - } else { - d.process() + if d.isBusy() { + return errBusy + } + + // Check if the parent of the received block is known. + // If the block is not know, request it otherwise, request. + phash := block.ParentHash() + if !d.hasBlock(phash) { + glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4]) + + // Get the missing hashes from the peer (synchronously) + err := d.getFromPeer(peer, peer.recentHash, true) + if err != nil { + return err } } + + return d.process() } // Deliver a chunk to the downloader. This is usually done through the BlocksMsg by @@ -383,8 +432,11 @@ func (d *Downloader) process() error { // to a seperate goroutine where it periodically checks for linked pieces. types.BlockBy(types.Number).Sort(d.queue.blocks) blocks := d.queue.blocks + if len(blocks) == 0 { + return nil + } - glog.V(logger.Debug).Infoln("Inserting chain with", len(blocks), "blocks") + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) var err error // Loop untill we're out of blocks @@ -408,6 +460,11 @@ func (d *Downloader) process() error { } } break + } else if err != nil { + // Reset chain completely. This needs much, much improvement. + // instead: check all blocks leading down to this block false block and remove it + blocks = nil + break } blocks = blocks[max:] } @@ -432,3 +489,7 @@ func (d *Downloader) isDownloadingBlocks() bool { func (d *Downloader) isProcessing() bool { return atomic.LoadInt32(&d.processingBlocks) == 1 } + +func (d *Downloader) isBusy() bool { + return d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing() +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4cd306a05..88ede16f9 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "gopkg.in/fatih/set.v0" ) const ( @@ -19,6 +20,12 @@ type blockFetcherFn func([]common.Hash) error // XXX make threadsafe!!!! type peers map[string]*peer +func (p peers) reset() { + for _, peer := range p { + peer.reset() + } +} + func (p peers) get(state int) []*peer { var peers []*peer for _, peer := range p { @@ -64,13 +71,23 @@ type peer struct { td *big.Int recentHash common.Hash + requested *set.Set + getHashes hashFetcherFn getBlocks blockFetcherFn } // create a new peer func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { - return &peer{id: id, td: td, recentHash: hash, getHashes: getHashes, getBlocks: getBlocks, state: idleState} + return &peer{ + id: id, + td: td, + recentHash: hash, + getHashes: getHashes, + getBlocks: getBlocks, + state: idleState, + requested: set.New(), + } } // fetch a chunk using the peer @@ -82,6 +99,8 @@ func (p *peer) fetch(chunk *chunk) error { return errors.New("peer already fetching chunk") } + p.requested.Merge(chunk.hashes) + // set working state p.state = workingState // convert the set to a fetchable slice @@ -115,3 +134,7 @@ func (p *peer) demote() { p.rep = 0 } } + +func (p *peer) reset() { + p.state = idleState +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 4d1aa4e93..ce3aa9850 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -31,6 +31,17 @@ func newqueue() *queue { } } +func (c *queue) reset() { + c.mu.Lock() + defer c.mu.Unlock() + + c.hashPool.Clear() + c.fetchPool.Clear() + c.blockHashes.Clear() + c.blocks = nil + c.fetching = make(map[string]*chunk) +} + // reserve a `max` set of hashes for `p` peer. func (c *queue) get(p *peer, max int) *chunk { c.mu.Lock() @@ -49,11 +60,19 @@ func (c *queue) get(p *peer, max int) *chunk { return false } - hashes.Add(v) - i++ + // Skip any hashes that have previously been requested from the peer + if !p.requested.Has(v) { + hashes.Add(v) + i++ + } return true }) + // if no hashes can be requested return a nil chunk + if hashes.Size() == 0 { + return nil + } + // remove the fetchable hashes from hash pool c.hashPool.Separate(hashes) c.fetchPool.Merge(hashes) diff --git a/eth/downloader/synchronous.go b/eth/downloader/synchronous.go new file mode 100644 index 000000000..7bb49d24e --- /dev/null +++ b/eth/downloader/synchronous.go @@ -0,0 +1,79 @@ +package downloader + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS + +// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given +// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the +// checks fail an error will be returned. This method is synchronous +func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) { + // Check if we're busy + if d.isBusy() { + return nil, errBusy + } + + // Attempt to select a peer. This can either be nothing, which returns, best peer + // or selected peer. If no peer could be found an error will be returned + var p *peer + if len(id) == 0 { + p = d.peers[id] + if p == nil { + return nil, errUnknownPeer + } + } else { + p = d.peers.bestPeer() + } + + // Make sure our td is lower than the peer's td + if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { + return nil, errLowTd + } + + // Get the hash from the peer and initiate the downloading progress. + err := d.getFromPeer(p, p.recentHash, false) + if err != nil { + return nil, err + } + + return d.queue.blocks, nil +} + +// Synchronise will synchronise using the best peer. +func (d *Downloader) Synchronise() (types.Blocks, error) { + return d.SynchroniseWithPeer("") +} + +func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { + d.activePeer = p.id + + glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + // Start the fetcher. This will block the update entirely + // interupts need to be send to the appropriate channels + // respectively. + if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + // handle error + glog.V(logger.Debug).Infoln("Error fetching hashes:", err) + // XXX Reset + return err + } + + // Start fetching blocks in paralel. The strategy is simple + // take any available peers, seserve a chunk for each peer available, + // let the peer deliver the chunkn and periodically check if a peer + // has timedout. When done downloading, process blocks. + if err := d.startFetchingBlocks(p); err != nil { + glog.V(logger.Debug).Infoln("Error downloading blocks:", err) + // XXX reset + return err + } + + glog.V(logger.Detail).Infoln("Sync completed") + + return nil +} |