diff options
-rw-r--r-- | eth/fetcher/fetcher.go | 67 | ||||
-rw-r--r-- | eth/fetcher/fetcher_test.go | 57 | ||||
-rw-r--r-- | eth/handler.go | 2 |
3 files changed, 95 insertions, 31 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 78d25f72e..a70fcbeed 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -3,7 +3,6 @@ package fetcher import ( "errors" - "math" "math/rand" "time" @@ -57,8 +56,9 @@ type inject struct { type Fetcher struct { // Various event channels notify chan *announce - insert chan *inject + inject chan *inject filter chan chan []*types.Block + done chan common.Hash quit chan struct{} // Announce states @@ -79,8 +79,9 @@ type Fetcher struct { func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher { return &Fetcher{ notify: make(chan *announce), - insert: make(chan *inject), + inject: make(chan *inject), filter: make(chan chan []*types.Block), + done: make(chan common.Hash), quit: make(chan struct{}), announced: make(map[common.Hash][]*announce), fetching: make(map[common.Hash]*announce), @@ -128,7 +129,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { block: block, } select { - case f.insert <- op: + case f.inject <- op: return nil case <-f.quit: return errTerminated @@ -166,8 +167,6 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { func (f *Fetcher) loop() { // Iterate the block fetching until a quit is requested fetch := time.NewTimer(0) - done := make(chan common.Hash) - for { // Clean up any expired block fetches for hash, announce := range f.fetching { @@ -179,27 +178,19 @@ func (f *Fetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - // If too high up the chain, continue later op := f.queue.PopItem().(*inject) - if number := op.block.NumberU64(); number > height+1 { + number := op.block.NumberU64() + + // If too high up the chain or phase, continue later + if number > height+1 { f.queue.Push(op, -float32(op.block.NumberU64())) break } - // Otherwise if not known yet, try and import - hash := op.block.Hash() - if f.hasBlock(hash) { + // Otherwise if fresh and still unknown, try and import + if number <= height || f.hasBlock(op.block.Hash()) { continue } - // Block may just fit, try to import it - glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", op.origin, op.block.NumberU64(), hash.Bytes()[:4]) - go func() { - defer func() { done <- hash }() - - if err := f.importBlock(op.origin, op.block); err != nil { - glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", op.origin, op.block.NumberU64(), hash.Bytes()[:4], err) - return - } - }() + f.insert(op.origin, op.block) } // Wait for an outside event to occur select { @@ -209,7 +200,6 @@ func (f *Fetcher) loop() { case notification := <-f.notify: // A block was announced, schedule if it's not yet downloading - glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) if _, ok := f.fetching[notification.hash]; ok { break } @@ -218,11 +208,11 @@ func (f *Fetcher) loop() { f.reschedule(fetch) } - case op := <-f.insert: + case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps f.enqueue(op.origin, op.block) - case hash := <-done: + case hash := <-f.done: // A pending import finished, remove all traces of the notification delete(f.announced, hash) delete(f.fetching, hash) @@ -243,8 +233,7 @@ func (f *Fetcher) loop() { } } // Send out all block requests - for peer, hashes := range request { - glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) + for _, hashes := range request { go f.fetching[hashes[0]].fetch(hashes) } // Schedule the next fetch if blocks are still pending @@ -304,7 +293,6 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { earliest = announces[0].time } } - glog.V(logger.Detail).Infof("Scheduling next fetch in %v", arriveTimeout-time.Since(earliest)) fetch.Reset(arriveTimeout - time.Since(earliest)) } @@ -313,9 +301,9 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash() - // Make sure the block isn't in some weird place - if math.Abs(float64(f.chainHeight())-float64(block.NumberU64())) > maxQueueDist { - glog.Infof("Peer %s: discarded block #%d [%x] too far from head", peer, block.NumberU64(), hash.Bytes()[:4]) + // Discard any past or too distant blocks + if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist <= 0 || dist > maxQueueDist { + glog.Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) return } // Schedule the block for future importing @@ -328,3 +316,22 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { } } } + +// insert spawns a new goroutine to run a block insertion into the chain. If the +// block's number is at the same height as the current import phase, if updates +// the phase states accordingly. +func (f *Fetcher) insert(peer string, block *types.Block) { + hash := block.Hash() + + // Run the import on a new thread + glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", peer, block.NumberU64(), hash[:4]) + go func() { + defer func() { f.done <- hash }() + + // Run the actual import and log any issues + if err := f.importBlock(peer, block); err != nil { + glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", peer, block.NumberU64(), hash[:4], err) + return + } + }() +} diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index af2652a86..00fad3498 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -91,9 +91,15 @@ func (f *fetcherTester) hasBlock(hash common.Hash) bool { // importBlock injects a new blocks into the simulated chain. func (f *fetcherTester) importBlock(peer string, block *types.Block) error { + // Make sure the parent in known if _, ok := f.ownBlocks[block.ParentHash()]; !ok { return errors.New("unknown parent") } + // Discard any new blocks if the same height already exists + if block.NumberU64() <= f.ownBlocks[f.ownHashes[len(f.ownHashes)-1]].NumberU64() { + return nil + } + // Otherwise build our current chain f.ownHashes = append(f.ownHashes, block.Hash()) f.ownBlocks[block.Hash()] = block return nil @@ -363,3 +369,54 @@ func TestDistantDiscarding(t *testing.T) { t.Fatalf("fetcher queued future block") } } + +// Tests that if multiple uncles (i.e. blocks at the same height) are queued for +// importing, then they will get inserted in phases, previous heights needing to +// complete before the next numbered blocks can begin. +func TestCompetingImports(t *testing.T) { + // Generate a few soft-forks for concurrent imports + hashesA := createHashes(16, knownHash) + hashesB := createHashes(16, knownHash) + hashesC := createHashes(16, knownHash) + + blocksA := createBlocksFromHashes(hashesA) + blocksB := createBlocksFromHashes(hashesB) + blocksC := createBlocksFromHashes(hashesC) + + // Create a tester, and override the import to check number reversals + tester := newTester() + + first := int32(1) + height := uint64(1) + tester.fetcher.importBlock = func(peer string, block *types.Block) error { + // Check for any phase reordering + if prev := atomic.LoadUint64(&height); block.NumberU64() < prev { + t.Errorf("phase reversal: have %v, want %v", block.NumberU64(), prev) + } + atomic.StoreUint64(&height, block.NumberU64()) + + // Sleep a bit on the first import not to race with the enqueues + if atomic.CompareAndSwapInt32(&first, 1, 0) { + time.Sleep(50 * time.Millisecond) + } + return tester.importBlock(peer, block) + } + // Queue up everything but with a missing link + for i := 0; i < len(hashesA)-2; i++ { + tester.fetcher.Enqueue("chain A", blocksA[hashesA[i]]) + tester.fetcher.Enqueue("chain B", blocksB[hashesB[i]]) + tester.fetcher.Enqueue("chain C", blocksC[hashesC[i]]) + } + // Add the three missing links, and wait for a full import + tester.fetcher.Enqueue("chain A", blocksA[hashesA[len(hashesA)-2]]) + tester.fetcher.Enqueue("chain B", blocksB[hashesB[len(hashesB)-2]]) + tester.fetcher.Enqueue("chain C", blocksC[hashesC[len(hashesC)-2]]) + + start := time.Now() + for len(tester.ownHashes) != len(hashesA) && time.Since(start) < time.Second { + time.Sleep(50 * time.Millisecond) + } + if len(tester.ownHashes) != len(hashesA) { + t.Fatalf("chain length mismatch: have %v, want %v", len(tester.ownHashes), len(hashesA)) + } +} diff --git a/eth/handler.go b/eth/handler.go index c3b58650d..b62815532 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -8,11 +8,11 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" |