aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-16 16:58:32 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-18 20:56:07 +0800
commit7c2af1c11722dc3175a98342c060afcfaf6a275f (patch)
tree287ed1901f4114628ba1bd12d4f783aa6b5312b2 /eth/handler.go
parent2cea41065609dbebdd3856a00e9333566945ebee (diff)
downloadgo-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.gz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.bz2
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.lz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.xz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.zst
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.zip
eth, eth/fetcher: separate notification sync mechanism
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go71
1 files changed, 26 insertions, 45 deletions
diff --git a/eth/handler.go b/eth/handler.go
index ec4f2d53a..99ac4ce68 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -7,6 +7,8 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/eth/fetcher"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -45,6 +47,7 @@ type ProtocolManager struct {
txpool txPool
chainman *core.ChainManager
downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
peers *peerSet
SubProtocol p2p.Protocol
@@ -54,11 +57,9 @@ type ProtocolManager struct {
minedBlockSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- newHashCh chan []*blockAnnounce
- newBlockCh chan chan []*types.Block
- txsyncCh chan *txsync
- quitSync chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
@@ -69,30 +70,33 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
+ // Create the protocol manager and initialize peer handlers
manager := &ProtocolManager{
- eventMux: mux,
- txpool: txpool,
- chainman: chainman,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer, 1),
- newHashCh: make(chan []*blockAnnounce, 1),
- newBlockCh: make(chan chan []*types.Block),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
+ eventMux: mux,
+ txpool: txpool,
+ chainman: chainman,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer, 1),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
}
- manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
manager.SubProtocol = p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
Length: ProtocolLength,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(protocolVersion, networkId, p, rw)
-
manager.newPeerCh <- peer
-
return manager.handle(peer)
},
}
+ // Construct the different synchronisation mechanisms
+ manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
+
+ importer := func(peer string, block *types.Block) error {
+ return manager.importBlock(manager.peers.Peer(peer), block, nil)
+ }
+ manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer)
return manager
}
@@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() {
// start sync handlers
go pm.syncer()
- go pm.fetcher()
go pm.txsyncLoop()
}
@@ -291,20 +294,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
- // Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
- filter := make(chan []*types.Block)
- select {
- case <-self.quitSync:
- case self.newBlockCh <- filter:
- select {
- case <-self.quitSync:
- case filter <- blocks:
- select {
- case <-self.quitSync:
- case blocks := <-filter:
- self.downloader.DeliverBlocks(p.id, blocks)
- }
- }
+ // Filter out any explicitly requested blocks, deliver the rest to the downloader
+ if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 {
+ self.downloader.DeliverBlocks(p.id, blocks)
}
case NewBlockHashesMsg:
@@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
unknown = append(unknown, hash)
}
}
- announces := make([]*blockAnnounce, len(unknown))
- for i, hash := range unknown {
- announces[i] = &blockAnnounce{
- hash: hash,
- peer: p,
- time: time.Now(),
- }
- }
- if len(announces) > 0 {
- select {
- case self.newHashCh <- announces:
- case <-self.quitSync:
- }
+ for _, hash := range unknown {
+ self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
}
case NewBlockMsg: