diff options
-rw-r--r-- | eth/downloader/downloader.go | 20 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 100 | ||||
-rw-r--r-- | eth/sync.go | 23 |
3 files changed, 112 insertions, 31 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c6eecfe2f..f33aa334a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -37,6 +37,7 @@ var ( errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") + ErrUnknownParent = errors.New("block has unknown parent") ) type hashCheckFn func(common.Hash) bool @@ -142,16 +143,19 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler -// it's possible it yields no blocks -func (d *Downloader) TakeBlocks() types.Blocks { - // Check that there are blocks available and its parents are known +// TakeBlocks takes blocks from the queue and yields them to the caller. +func (d *Downloader) TakeBlocks() (types.Blocks, error) { + // If the head block is missing, no blocks are ready head := d.queue.GetHeadBlock() - if head == nil || !d.hasBlock(head.ParentHash()) { - return nil + if head == nil { + return nil, nil } - // Retrieve a full batch of blocks - return d.queue.TakeBlocks(head) + // If the parent hash of the head is unknown, notify the caller + if !d.hasBlock(head.ParentHash()) { + return nil, ErrUnknownParent + } + // Otherwise retrieve a full batch of blocks + return d.queue.TakeBlocks(head), nil } func (d *Downloader) Has(hash common.Hash) bool { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 78eff011a..c3d1b2e00 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -10,7 +10,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} +var ( + knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} +) func createHashes(start, amount int) (hashes []common.Hash) { hashes = make([]common.Hash, amount+1) @@ -27,7 +30,7 @@ func createBlock(i int, prevHash, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) block.HeaderHash = hash - block.ParentHeaderHash = knownHash + block.ParentHeaderHash = prevHash return block } @@ -42,9 +45,12 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { } type downloadTester struct { - downloader *Downloader - hashes []common.Hash - blocks map[common.Hash]*types.Block + downloader *Downloader + + hashes []common.Hash // Chain of hashes simulating + blocks map[common.Hash]*types.Block // Blocks associated with the hashes + chain []common.Hash // Block-chain being constructed + t *testing.T pcount int done chan bool @@ -52,7 +58,15 @@ type downloadTester struct { } func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { - tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)} + tester := &downloadTester{ + t: t, + + hashes: hashes, + blocks: blocks, + chain: []common.Hash{knownHash}, + + done: make(chan bool), + } downloader := New(tester.hasBlock, tester.getBlock) tester.downloader = downloader @@ -64,9 +78,17 @@ func (dl *downloadTester) sync(peerId string, hash common.Hash) error { return dl.downloader.Synchronise(peerId, hash) } +func (dl *downloadTester) insertBlocks(blocks types.Blocks) { + for _, block := range blocks { + dl.chain = append(dl.chain, block.Hash()) + } +} + func (dl *downloadTester) hasBlock(hash common.Hash) bool { - if knownHash == hash { - return true + for _, h := range dl.chain { + if h == hash { + return true + } } return false } @@ -175,10 +197,12 @@ func TestTaking(t *testing.T) { if err != nil { t.Error("download error", err) } - - bs1 := tester.downloader.TakeBlocks() - if len(bs1) != 1000 { - t.Error("expected to take 1000, got", len(bs1)) + bs, err := tester.downloader.TakeBlocks() + if err != nil { + t.Fatalf("failed to take blocks: %v", err) + } + if len(bs) != targetBlocks { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) } } @@ -248,17 +272,18 @@ func TestThrottling(t *testing.T) { done := make(chan struct{}) took := []*types.Block{} go func() { - for { + for running := true; running; { select { case <-done: - took = append(took, tester.downloader.TakeBlocks()...) - done <- struct{}{} - return + running = false default: - took = append(took, tester.downloader.TakeBlocks()...) time.Sleep(time.Millisecond) } + // Take a batch of blocks and accumulate + blocks, _ := tester.downloader.TakeBlocks() + took = append(took, blocks...) } + done <- struct{}{} }() // Synchronise the two threads and verify @@ -273,3 +298,44 @@ func TestThrottling(t *testing.T) { t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) } } + +// Tests that if a peer returns an invalid chain with a block pointing to a non- +// existing parent, it is correctly detected and handled. +func TestNonExistingParentAttack(t *testing.T) { + // Forge a single-link chain with a forged header + hashes := createHashes(0, 1) + blocks := createBlocksFromHashes(hashes) + + forged := blocks[hashes[0]] + forged.ParentHeaderHash = unknownHash + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if err := tester.sync("attack", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs, err := tester.downloader.TakeBlocks() + if err != ErrUnknownParent { + t.Fatalf("take error mismatch: have %v, want %v", err, ErrUnknownParent) + } + if len(bs) != 0 { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), 0) + } + // Cancel the download due to the parent attack + tester.downloader.Cancel() + + // Reconstruct a valid chain, and try to synchronize with it + forged.ParentHeaderHash = knownHash + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs, err = tester.downloader.TakeBlocks() + if err != nil { + t.Fatalf("failed to retrieve blocks: %v", err) + } + if len(bs) != 1 { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), 1) + } +} diff --git a/eth/sync.go b/eth/sync.go index 00b571782..b51fb7c10 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/eth/downloader" @@ -14,6 +15,7 @@ import ( func (pm *ProtocolManager) update() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) + blockProcPend := int32(0) for { select { @@ -36,7 +38,14 @@ func (pm *ProtocolManager) update() { } case <-blockProc: // Try to pull some blocks from the downloaded - go pm.processBlocks() + if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { + go func() { + if err := pm.processBlocks(); err != nil { + pm.downloader.Cancel() + } + atomic.StoreInt32(&blockProcPend, 0) + }() + } case <-pm.quitSync: return @@ -52,8 +61,12 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() - // Take a batch of blocks (will return nil if a previous batch has not reached the chain yet) - blocks := pm.downloader.TakeBlocks() + // Take a batch of blocks, but abort if there's an invalid head or if the chain's empty + blocks, err := pm.downloader.TakeBlocks() + if err != nil { + glog.V(logger.Warn).Infof("Block processing failed: %v", err) + return err + } if len(blocks) == 0 { return nil } @@ -63,9 +76,7 @@ func (pm *ProtocolManager) processBlocks() error { max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { - // cancel download process - pm.downloader.Cancel() - + glog.V(logger.Warn).Infof("Block insertion failed: %v", err) return err } blocks = blocks[max:] |