From 5ec6ecc511dc861c0395335f06883bc9ac650e4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 17 Jun 2015 18:25:23 +0300 Subject: eth, eth/fetcher: move propagated block import into fetcher --- eth/fetcher/fetcher.go | 55 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 19 deletions(-) (limited to 'eth/fetcher/fetcher.go') diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index a70fcbeed..c96471554 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -29,12 +29,18 @@ type hashCheckFn func(common.Hash) bool // blockRequesterFn is a callback type for sending a block retrieval request. type blockRequesterFn func([]common.Hash) error -// blockImporterFn is a callback type for trying to inject a block into the local chain. -type blockImporterFn func(peer string, block *types.Block) error +// blockBroadcasterFn is a callback type for broadcasting a block to connected peers. +type blockBroadcasterFn func(block *types.Block) // chainHeightFn is a callback type to retrieve the current chain height. type chainHeightFn func() uint64 +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. +type chainInsertFn func(types.Blocks) (int, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + // announce is the hash notification of the availability of a new block in the // network. type announce struct { @@ -70,26 +76,30 @@ type Fetcher struct { queued map[common.Hash]struct{} // Presence set of already queued blocks (to dedup imports) // Callbacks - hasBlock hashCheckFn // Checks if a block is present in the chain - importBlock blockImporterFn // Injects a block from an origin peer into the chain - chainHeight chainHeightFn // Retrieves the current chain's height + hasBlock hashCheckFn // Checks if a block is present in the chain + broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers + chainHeight chainHeightFn // Retrieves the current chain's height + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Drops a peer for misbehaving } // New creates a block fetcher to retrieve blocks based on hash announcements. -func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher { +func New(hasBlock hashCheckFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher { return &Fetcher{ - notify: make(chan *announce), - inject: make(chan *inject), - filter: make(chan chan []*types.Block), - done: make(chan common.Hash), - quit: make(chan struct{}), - announced: make(map[common.Hash][]*announce), - fetching: make(map[common.Hash]*announce), - queue: prque.New(), - queued: make(map[common.Hash]struct{}), - hasBlock: hasBlock, - importBlock: importBlock, - chainHeight: chainHeight, + notify: make(chan *announce), + inject: make(chan *inject), + filter: make(chan chan []*types.Block), + done: make(chan common.Hash), + quit: make(chan struct{}), + announced: make(map[common.Hash][]*announce), + fetching: make(map[common.Hash]*announce), + queue: prque.New(), + queued: make(map[common.Hash]struct{}), + hasBlock: hasBlock, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -328,10 +338,17 @@ func (f *Fetcher) insert(peer string, block *types.Block) { go func() { defer func() { f.done <- hash }() + // If the parent's unknown, abort insertion + if !f.hasBlock(block.ParentHash()) { + return + } // Run the actual import and log any issues - if err := f.importBlock(peer, block); err != nil { + if _, err := f.insertChain(types.Blocks{block}); err != nil { glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", peer, block.NumberU64(), hash[:4], err) + f.dropPeer(peer) return } + // If import succeeded, broadcast the block + go f.broadcastBlock(block) }() } -- cgit v1.2.3