diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-16 16:58:32 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-06-18 20:56:07 +0800 |
commit | 7c2af1c11722dc3175a98342c060afcfaf6a275f (patch) | |
tree | 287ed1901f4114628ba1bd12d4f783aa6b5312b2 /eth/handler.go | |
parent | 2cea41065609dbebdd3856a00e9333566945ebee (diff) | |
download | dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.gz dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.bz2 dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.lz dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.xz dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.zst dexon-7c2af1c11722dc3175a98342c060afcfaf6a275f.zip |
eth, eth/fetcher: separate notification sync mechanism
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 71 |
1 files changed, 26 insertions, 45 deletions
diff --git a/eth/handler.go b/eth/handler.go index ec4f2d53a..99ac4ce68 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -45,6 +47,7 @@ type ProtocolManager struct { txpool txPool chainman *core.ChainManager downloader *downloader.Downloader + fetcher *fetcher.Fetcher peers *peerSet SubProtocol p2p.Protocol @@ -54,11 +57,9 @@ type ProtocolManager struct { minedBlockSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - newBlockCh chan chan []*types.Block - txsyncCh chan *txsync - quitSync chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} // wait group is used for graceful shutdowns during downloading // and processing @@ -69,30 +70,33 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager { + // Create the protocol manager and initialize peer handlers manager := &ProtocolManager{ - eventMux: mux, - txpool: txpool, - chainman: chainman, - peers: newPeerSet(), - newPeerCh: make(chan *peer, 1), - newHashCh: make(chan []*blockAnnounce, 1), - newBlockCh: make(chan chan []*types.Block), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + eventMux: mux, + txpool: txpool, + chainman: chainman, + peers: newPeerSet(), + newPeerCh: make(chan *peer, 1), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), Length: ProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) }, } + // Construct the different synchronisation mechanisms + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) + + importer := func(peer string, block *types.Block) error { + return manager.importBlock(manager.peers.Peer(peer), block, nil) + } + manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer) return manager } @@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() { // start sync handlers go pm.syncer() - go pm.fetcher() go pm.txsyncLoop() } @@ -291,20 +294,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) - filter := make(chan []*types.Block) - select { - case <-self.quitSync: - case self.newBlockCh <- filter: - select { - case <-self.quitSync: - case filter <- blocks: - select { - case <-self.quitSync: - case blocks := <-filter: - self.downloader.DeliverBlocks(p.id, blocks) - } - } + // Filter out any explicitly requested blocks, deliver the rest to the downloader + if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 { + self.downloader.DeliverBlocks(p.id, blocks) } case NewBlockHashesMsg: @@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { unknown = append(unknown, hash) } } - announces := make([]*blockAnnounce, len(unknown)) - for i, hash := range unknown { - announces[i] = &blockAnnounce{ - hash: hash, - peer: p, - time: time.Now(), - } - } - if len(announces) > 0 { - select { - case self.newHashCh <- announces: - case <-self.quitSync: - } + for _, hash := range unknown { + self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks) } case NewBlockMsg: |