diff options
-rw-r--r-- | eth/downloader/downloader.go | 14 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 26 | ||||
-rw-r--r-- | eth/downloader/queue.go | 10 | ||||
-rw-r--r-- | eth/sync.go | 13 |
4 files changed, 17 insertions, 46 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f33aa334a..fb023c7dd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -144,18 +144,8 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } // TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() (types.Blocks, error) { - // If the head block is missing, no blocks are ready - head := d.queue.GetHeadBlock() - if head == nil { - return nil, nil - } - // If the parent hash of the head is unknown, notify the caller - if !d.hasBlock(head.ParentHash()) { - return nil, ErrUnknownParent - } - // Otherwise retrieve a full batch of blocks - return d.queue.TakeBlocks(head), nil +func (d *Downloader) TakeBlocks() types.Blocks { + return d.queue.TakeBlocks() } func (d *Downloader) Has(hash common.Hash) bool { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c3d1b2e00..2a95b3d8e 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -197,10 +197,7 @@ func TestTaking(t *testing.T) { if err != nil { t.Error("download error", err) } - bs, err := tester.downloader.TakeBlocks() - if err != nil { - t.Fatalf("failed to take blocks: %v", err) - } + bs := tester.downloader.TakeBlocks() if len(bs) != targetBlocks { t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) } @@ -280,8 +277,7 @@ func TestThrottling(t *testing.T) { time.Sleep(time.Millisecond) } // Take a batch of blocks and accumulate - blocks, _ := tester.downloader.TakeBlocks() - took = append(took, blocks...) + took = append(took, tester.downloader.TakeBlocks()...) } done <- struct{}{} }() @@ -315,14 +311,13 @@ func TestNonExistingParentAttack(t *testing.T) { if err := tester.sync("attack", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs, err := tester.downloader.TakeBlocks() - if err != ErrUnknownParent { - t.Fatalf("take error mismatch: have %v, want %v", err, ErrUnknownParent) + bs := tester.downloader.TakeBlocks() + if len(bs) != 1 { + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } - if len(bs) != 0 { - t.Error("retrieved block mismatch: have %v, want %v", len(bs), 0) + if tester.hasBlock(bs[0].ParentHash()) { + t.Fatalf("tester knows about the unknown hash") } - // Cancel the download due to the parent attack tester.downloader.Cancel() // Reconstruct a valid chain, and try to synchronize with it @@ -331,11 +326,8 @@ func TestNonExistingParentAttack(t *testing.T) { if err := tester.sync("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs, err = tester.downloader.TakeBlocks() - if err != nil { - t.Fatalf("failed to retrieve blocks: %v", err) - } + bs = tester.downloader.TakeBlocks() if len(bs) != 1 { - t.Error("retrieved block mismatch: have %v, want %v", len(bs), 1) + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 40749698c..6ad915757 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -172,17 +172,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block { } // TakeBlocks retrieves and permanently removes a batch of blocks from the cache. -// The head parameter is required to prevent a race condition where concurrent -// takes may fail parent verifications. -func (q *queue) TakeBlocks(head *types.Block) types.Blocks { +func (q *queue) TakeBlocks() types.Blocks { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the head block's different - if len(q.blockCache) == 0 || q.blockCache[0] != head { - return nil - } - // Otherwise accumulate all available blocks + // Accumulate all available blocks var blocks types.Blocks for _, block := range q.blockCache { if block == nil { diff --git a/eth/sync.go b/eth/sync.go index b51fb7c10..c89f34596 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -40,9 +40,7 @@ func (pm *ProtocolManager) update() { // Try to pull some blocks from the downloaded if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { go func() { - if err := pm.processBlocks(); err != nil { - pm.downloader.Cancel() - } + pm.processBlocks() atomic.StoreInt32(&blockProcPend, 0) }() } @@ -61,12 +59,8 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() - // Take a batch of blocks, but abort if there's an invalid head or if the chain's empty - blocks, err := pm.downloader.TakeBlocks() - if err != nil { - glog.V(logger.Warn).Infof("Block processing failed: %v", err) - return err - } + // Short circuit if no blocks are available for insertion + blocks := pm.downloader.TakeBlocks() if len(blocks) == 0 { return nil } @@ -77,6 +71,7 @@ func (pm *ProtocolManager) processBlocks() error { _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { glog.V(logger.Warn).Infof("Block insertion failed: %v", err) + pm.downloader.Cancel() return err } blocks = blocks[max:] |