aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-16 16:58:32 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-18 20:56:07 +0800
commit7c2af1c11722dc3175a98342c060afcfaf6a275f (patch)
tree287ed1901f4114628ba1bd12d4f783aa6b5312b2 /eth
parent2cea41065609dbebdd3856a00e9333566945ebee (diff)
downloadgo-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.gz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.bz2
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.lz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.xz
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.tar.zst
go-tangerine-7c2af1c11722dc3175a98342c060afcfaf6a275f.zip
eth, eth/fetcher: separate notification sync mechanism
Diffstat (limited to 'eth')
-rw-r--r--eth/downloader/downloader.go1
-rw-r--r--eth/fetcher/fetcher.go258
-rw-r--r--eth/handler.go71
-rw-r--r--eth/sync.go145
4 files changed, 293 insertions, 182 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 9866a5b46..18e5f50e8 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,3 +1,4 @@
+// Package downloader contains the manual full chain synchronisation.
package downloader
import (
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
new file mode 100644
index 000000000..19c53048c
--- /dev/null
+++ b/eth/fetcher/fetcher.go
@@ -0,0 +1,258 @@
+// Package fetcher contains the block announcement based synchonisation.
+package fetcher
+
+import (
+ "errors"
+ "math/rand"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+const (
+ arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+ fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
+)
+
+var (
+ errTerminated = errors.New("terminated")
+)
+
+// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
+type hashCheckFn func(common.Hash) bool
+
+// blockRequesterFn is a callback type for sending a block retrieval request.
+type blockRequesterFn func([]common.Hash) error
+
+// blockImporterFn is a callback type for trying to inject a block into the local chain.
+type blockImporterFn func(peer string, block *types.Block) error
+
+// announce is the hash notification of the availability of a new block in the
+// network.
+type announce struct {
+ hash common.Hash // Hash of the block being announced
+ time time.Time // Timestamp of the announcement
+
+ origin string // Identifier of the peer originating the notification
+ fetch blockRequesterFn // Fetcher function to retrieve
+}
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+ // Various event channels
+ notify chan *announce
+ filter chan chan []*types.Block
+ quit chan struct{}
+
+ // Callbacks
+ hasBlock hashCheckFn // Checks if a block is present in the chain
+ importBlock blockImporterFn // Injects a block from an origin peer into the chain
+}
+
+// New creates a block fetcher to retrieve blocks based on hash announcements.
+func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher {
+ return &Fetcher{
+ notify: make(chan *announce),
+ filter: make(chan chan []*types.Block),
+ quit: make(chan struct{}),
+ hasBlock: hasBlock,
+ importBlock: importBlock,
+ }
+}
+
+// Start boots up the announcement based synchoniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+ go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+ close(f.quit)
+}
+
+// Notify announces the fetcher of the potential availability of a new block in
+// the network.
+func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher blockRequesterFn) error {
+ block := &announce{
+ hash: hash,
+ time: time,
+ origin: peer,
+ fetch: fetcher,
+ }
+ select {
+ case f.notify <- block:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// Filter extracts all the blocks that were explicitly requested by the fetcher,
+// returning those that should be handled differently.
+func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
+ // Send the filter channel to the fetcher
+ filter := make(chan []*types.Block)
+
+ select {
+ case f.filter <- filter:
+ case <-f.quit:
+ return nil
+ }
+ // Request the filtering of the block list
+ select {
+ case filter <- blocks:
+ case <-f.quit:
+ return nil
+ }
+ // Retrieve the blocks remaining after filtering
+ select {
+ case blocks := <-filter:
+ return blocks
+ case <-f.quit:
+ return nil
+ }
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+ announced := make(map[common.Hash][]*announce)
+ fetching := make(map[common.Hash]*announce)
+ fetch := time.NewTimer(0)
+ done := make(chan common.Hash)
+
+ // Iterate the block fetching until a quit is requested
+ for {
+ // Clean up any expired block fetches
+ for hash, announce := range fetching {
+ if time.Since(announce.time) > fetchTimeout {
+ delete(announced, hash)
+ delete(fetching, hash)
+ }
+ }
+ // Wait for an outside event to occur
+ select {
+ case <-f.quit:
+ // Fetcher terminating, abort all operations
+ return
+
+ case notification := <-f.notify:
+ // A block was announced, schedule if it's not yet downloading
+ glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4])
+ if _, ok := fetching[notification.hash]; ok {
+ break
+ }
+ if len(announced) == 0 {
+ fetch.Reset(arriveTimeout)
+ }
+ announced[notification.hash] = append(announced[notification.hash], notification)
+
+ case hash := <-done:
+ // A pending import finished, remove all traces of the notification
+ delete(announced, hash)
+ delete(fetching, hash)
+
+ case <-fetch.C:
+ // At least one block's timer ran out, check for needing retrieval
+ request := make(map[string][]common.Hash)
+
+ for hash, announces := range announced {
+ if time.Since(announces[0].time) > arriveTimeout {
+ announce := announces[rand.Intn(len(announces))]
+ if !f.hasBlock(hash) {
+ request[announce.origin] = append(request[announce.origin], hash)
+ fetching[hash] = announce
+ }
+ delete(announced, hash)
+ }
+ }
+ // Send out all block requests
+ for peer, hashes := range request {
+ glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes))
+ go fetching[hashes[0]].fetch(hashes)
+ }
+ // Schedule the next fetch if blocks are still pending
+ if len(announced) > 0 {
+ nearest := time.Now()
+ for _, announces := range announced {
+ if nearest.Before(announces[0].time) {
+ nearest = announces[0].time
+ }
+ }
+ fetch.Reset(arriveTimeout + time.Since(nearest))
+ }
+
+ case filter := <-f.filter:
+ // Blocks arrived, extract any explicit fetches, return all else
+ var blocks types.Blocks
+ select {
+ case blocks = <-filter:
+ case <-f.quit:
+ return
+ }
+
+ explicit, download := []*types.Block{}, []*types.Block{}
+ for _, block := range blocks {
+ hash := block.Hash()
+
+ // Filter explicitly requested blocks from hash announcements
+ if _, ok := fetching[hash]; ok {
+ // Discard if already imported by other means
+ if !f.hasBlock(hash) {
+ explicit = append(explicit, block)
+ } else {
+ delete(fetching, hash)
+ }
+ } else {
+ download = append(download, block)
+ }
+ }
+
+ select {
+ case filter <- download:
+ case <-f.quit:
+ return
+ }
+ // Create a closure with the retrieved blocks and origin peers
+ peers := make([]string, 0, len(explicit))
+ blocks = make([]*types.Block, 0, len(explicit))
+ for _, block := range explicit {
+ hash := block.Hash()
+ if announce := fetching[hash]; announce != nil {
+ // Drop the block if it surely cannot fit
+ if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) {
+ // delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout?
+ continue
+ }
+ // Otherwise accumulate for import
+ peers = append(peers, announce.origin)
+ blocks = append(blocks, block)
+ }
+ }
+ // If any explicit fetches were replied to, import them
+ if count := len(blocks); count > 0 {
+ glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
+ go func() {
+ // Make sure all hashes are cleaned up
+ for _, block := range blocks {
+ hash := block.Hash()
+ defer func() { done <- hash }()
+ }
+ // Try and actually import the blocks
+ for i := 0; i < len(blocks); i++ {
+ if err := f.importBlock(peers[i], blocks[i]); err != nil {
+ glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
+ return
+ }
+ }
+ }()
+ }
+ }
+ }
+}
diff --git a/eth/handler.go b/eth/handler.go
index ec4f2d53a..99ac4ce68 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -7,6 +7,8 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/eth/fetcher"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -45,6 +47,7 @@ type ProtocolManager struct {
txpool txPool
chainman *core.ChainManager
downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
peers *peerSet
SubProtocol p2p.Protocol
@@ -54,11 +57,9 @@ type ProtocolManager struct {
minedBlockSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- newHashCh chan []*blockAnnounce
- newBlockCh chan chan []*types.Block
- txsyncCh chan *txsync
- quitSync chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
@@ -69,30 +70,33 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
+ // Create the protocol manager and initialize peer handlers
manager := &ProtocolManager{
- eventMux: mux,
- txpool: txpool,
- chainman: chainman,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer, 1),
- newHashCh: make(chan []*blockAnnounce, 1),
- newBlockCh: make(chan chan []*types.Block),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
+ eventMux: mux,
+ txpool: txpool,
+ chainman: chainman,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer, 1),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
}
- manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
manager.SubProtocol = p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
Length: ProtocolLength,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(protocolVersion, networkId, p, rw)
-
manager.newPeerCh <- peer
-
return manager.handle(peer)
},
}
+ // Construct the different synchronisation mechanisms
+ manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
+
+ importer := func(peer string, block *types.Block) error {
+ return manager.importBlock(manager.peers.Peer(peer), block, nil)
+ }
+ manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer)
return manager
}
@@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() {
// start sync handlers
go pm.syncer()
- go pm.fetcher()
go pm.txsyncLoop()
}
@@ -291,20 +294,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
- // Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
- filter := make(chan []*types.Block)
- select {
- case <-self.quitSync:
- case self.newBlockCh <- filter:
- select {
- case <-self.quitSync:
- case filter <- blocks:
- select {
- case <-self.quitSync:
- case blocks := <-filter:
- self.downloader.DeliverBlocks(p.id, blocks)
- }
- }
+ // Filter out any explicitly requested blocks, deliver the rest to the downloader
+ if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 {
+ self.downloader.DeliverBlocks(p.id, blocks)
}
case NewBlockHashesMsg:
@@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
unknown = append(unknown, hash)
}
}
- announces := make([]*blockAnnounce, len(unknown))
- for i, hash := range unknown {
- announces[i] = &blockAnnounce{
- hash: hash,
- peer: p,
- time: time.Now(),
- }
- }
- if len(announces) > 0 {
- select {
- case self.newHashCh <- announces:
- case <-self.quitSync:
- }
+ for _, hash := range unknown {
+ self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
}
case NewBlockMsg:
diff --git a/eth/sync.go b/eth/sync.go
index 751bc1a2a..82abb725f 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -12,11 +12,8 @@ import (
)
const (
- forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
- notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
- notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
- notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
@@ -119,140 +116,15 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
-// fetcher is responsible for collecting hash notifications, and periodically
-// checking all unknown ones and individually fetching them.
-func (pm *ProtocolManager) fetcher() {
- announces := make(map[common.Hash][]*blockAnnounce)
- request := make(map[*peer][]common.Hash)
- pending := make(map[common.Hash]*blockAnnounce)
- cycle := time.Tick(notifyCheckCycle)
- done := make(chan common.Hash)
-
- // Iterate the block fetching until a quit is requested
- for {
- select {
- case notifications := <-pm.newHashCh:
- // A batch of hashes the notified, schedule them for retrieval
- glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id)
- for _, announce := range notifications {
- // Skip if it's already pending fetch
- if _, ok := pending[announce.hash]; ok {
- continue
- }
- // Otherwise queue up the peer as a potential source
- announces[announce.hash] = append(announces[announce.hash], announce)
- }
-
- case hash := <-done:
- // A pending import finished, remove all traces
- delete(pending, hash)
-
- case <-cycle:
- // Clean up any expired block fetches
- for hash, announce := range pending {
- if time.Since(announce.time) > notifyFetchTimeout {
- delete(pending, hash)
- }
- }
- // Check if any notified blocks failed to arrive
- for hash, all := range announces {
- if time.Since(all[0].time) > notifyArriveTimeout {
- announce := all[rand.Intn(len(all))]
- if !pm.chainman.HasBlock(hash) {
- request[announce.peer] = append(request[announce.peer], hash)
- pending[hash] = announce
- }
- delete(announces, hash)
- }
- }
- if len(request) == 0 {
- break
- }
- // Send out all block requests
- for peer, hashes := range request {
- glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
- go peer.requestBlocks(hashes)
- }
- request = make(map[*peer][]common.Hash)
-
- case filter := <-pm.newBlockCh:
- // Blocks arrived, extract any explicit fetches, return all else
- var blocks types.Blocks
- select {
- case blocks = <-filter:
- case <-pm.quitSync:
- return
- }
-
- explicit, download := []*types.Block{}, []*types.Block{}
- for _, block := range blocks {
- hash := block.Hash()
-
- // Filter explicitly requested blocks from hash announcements
- if _, ok := pending[hash]; ok {
- // Discard if already imported by other means
- if !pm.chainman.HasBlock(hash) {
- explicit = append(explicit, block)
- } else {
- delete(pending, hash)
- }
- } else {
- download = append(download, block)
- }
- }
-
- select {
- case filter <- download:
- case <-pm.quitSync:
- return
- }
- // Create a closure with the retrieved blocks and origin peers
- peers := make([]*peer, 0, len(explicit))
- blocks = make([]*types.Block, 0, len(explicit))
- for _, block := range explicit {
- hash := block.Hash()
- if announce := pending[hash]; announce != nil {
- // Drop the block if it surely cannot fit
- if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) {
- // delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout?
- continue
- }
- // Otherwise accumulate for import
- peers = append(peers, announce.peer)
- blocks = append(blocks, block)
- }
- }
- // If any explicit fetches were replied to, import them
- if count := len(blocks); count > 0 {
- glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
- go func() {
- // Make sure all hashes are cleaned up
- for _, block := range blocks {
- hash := block.Hash()
- defer func() { done <- hash }()
- }
- // Try and actually import the blocks
- for i := 0; i < len(blocks); i++ {
- if err := pm.importBlock(peers[i], blocks[i], nil); err != nil {
- glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
- return
- }
- }
- }()
- }
-
- case <-pm.quitSync:
- return
- }
- }
-}
-
// syncer is responsible for periodically synchronising with the network, both
-// downloading hashes and blocks as well as retrieving cached ones.
+// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
- // Abort any pending syncs if we terminate
+ // Start and ensure cleanup of sync mechanisms
+ pm.fetcher.Start()
+ defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
+ // Wait for different events to fire synchronisation operations
forceSync := time.Tick(forceSyncCycle)
for {
select {
@@ -273,8 +145,7 @@ func (pm *ProtocolManager) syncer() {
}
}
-// synchronise tries to sync up our local block chain with a remote peer, both
-// adding various sanity checks as well as wrapping it with various log entries.
+// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
// Short circuit if no peers are available
if peer == nil {