diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-24 21:37:32 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-24 21:38:10 +0800 |
commit | d84c2202e79c30ec906b1a078bfd9fdf5ae94a31 (patch) | |
tree | e05cfa5b69f82fe7bece8bee3b672522406733d3 /eth | |
parent | bd9c76097d485b55ae808fee345d1d76801df1ea (diff) | |
download | go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar.gz go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar.bz2 go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar.lz go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar.xz go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.tar.zst go-tangerine-d84c2202e79c30ec906b1a078bfd9fdf5ae94a31.zip |
eth, eth/downloader: simplified synchronisation process
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/downloader.go | 37 | ||||
-rw-r--r-- | eth/handler.go | 18 |
2 files changed, 24 insertions, 31 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 00c4cd88f..9b8d7c0b2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -96,14 +96,14 @@ func (d *Downloader) Stats() (current int, max int) { return d.queue.blockHashes.Size(), d.queue.fetchPool.Size() + d.queue.hashPool.Size() } -func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { d.mu.Lock() defer d.mu.Unlock() - glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td) + glog.V(logger.Detail).Infoln("Register peer", id) // Create a new peer and add it to the list of known peers - peer := newPeer(id, td, hash, getHashes, getBlocks) + peer := newPeer(id, hash, getHashes, getBlocks) // add peer to our peer set d.peers[id] = peer // broadcast new peer @@ -133,7 +133,7 @@ out: break } - d.process() + d.process(peer) case <-d.quit: break out } @@ -143,27 +143,27 @@ out: // SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) (types.Blocks, error) { +func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Make sure it's doing neither. Once done we can restart the // downloading process if the TD is higher. For now just get on // with whatever is going on. This prevents unecessary switching. if d.isBusy() { - return nil, errBusy + return errBusy } // Fetch the peer using the id or throw an error if the peer couldn't be found p := d.peers[id] if p == nil { - return nil, errUnknownPeer + return errUnknownPeer } // Get the hash from the peer and initiate the downloading progress. err := d.getFromPeer(p, hash, false) if err != nil { - return nil, err + return err } - return d.queue.blocks, nil + return d.process(p) } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { @@ -405,13 +405,12 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error } peer.mu.Lock() - peer.td = td peer.recentHash = block.Hash() peer.mu.Unlock() peer.promote() glog.V(logger.Detail).Infoln("Inserting new block from:", id) - d.queue.addBlock(id, block, td) + d.queue.addBlock(id, block) // if neither go ahead to process if d.isBusy() { @@ -431,10 +430,10 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error } } - return d.process() + return d.process(peer) } -func (d *Downloader) process() error { +func (d *Downloader) process(peer *peer) error { atomic.StoreInt32(&d.processingBlocks, 1) defer atomic.StoreInt32(&d.processingBlocks, 0) @@ -460,18 +459,8 @@ func (d *Downloader) process() error { // grandparents can be requested and queued. err = d.insertChain(blocks[:max]) if err != nil && core.IsParentErr(err) { - glog.V(logger.Debug).Infoln("Aborting process due to missing parent. Fetching hashes") - - // TODO change this. This shite - for i, block := range blocks[:max] { - if !d.hasBlock(block.ParentHash()) { - d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true} - // remove processed blocks - blocks = blocks[i:] + glog.V(logger.Debug).Infoln("Aborting process due to missing parent.") - break - } - } break } else if err != nil { // immediatly unregister the false peer but do not disconnect diff --git a/eth/handler.go b/eth/handler.go index a5bc125da..8db476eb4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -90,7 +90,7 @@ type ProtocolManager struct { minedBlockSub event.Subscription newPeerCh chan *peer - quit chan struct{} + quitSync chan struct{} } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -103,9 +103,8 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo downloader: downloader, peers: make(map[string]*peer), newPeerCh: make(chan *peer, 1), - quit: make(chan struct{}), + quitSync: make(chan struct{}), } - go manager.peerHandler() manager.SubProtocol = p2p.Protocol{ Name: "eth", @@ -123,7 +122,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo return manager } -func (pm *ProtocolManager) peerHandler() { +func (pm *ProtocolManager) syncHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` itimer := time.NewTimer(peerCountTimeout) out: @@ -153,7 +152,7 @@ out: } else { itimer.Reset(5 * time.Second) } - case <-pm.quit: + case <-pm.quitSync: break out } } @@ -161,7 +160,7 @@ out: func (pm *ProtocolManager) synchronise(peer *peer) { // Get the hashes from the peer (synchronously) - _, err := pm.downloader.Synchronise(peer.id, peer.recentHash) + err := pm.downloader.Synchronise(peer.id, peer.recentHash) if err != nil { // handle error glog.V(logger.Debug).Infoln("error downloading:", err) @@ -176,11 +175,15 @@ func (pm *ProtocolManager) Start() { // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + + // sync handler + go pm.syncHandler() } func (pm *ProtocolManager) Stop() { pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + close(pm.quitSync) // quits the sync handler } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -198,7 +201,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.peers[p.id] = p pm.pmu.Unlock() - pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks) + pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { pm.pmu.Lock() defer pm.pmu.Unlock() @@ -370,6 +373,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } else { // adding blocks is synchronous go func() { + // TODO check parent error err := self.downloader.AddBlock(p.id, request.Block, request.TD) if err != nil { glog.V(logger.Detail).Infoln("downloader err:", err) |