diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-09-15 18:33:45 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-09-15 19:45:53 +0800 |
commit | 99b62f36b6443e8ed8ff4e8c09ee9267eeaea162 (patch) | |
tree | 1839f62c31897a62f9e190a4fbda27310a060825 /eth/downloader | |
parent | 0a7d059b6a4365c671386855289bfdc3178e2d60 (diff) | |
download | dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar.gz dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar.bz2 dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar.lz dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar.xz dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.tar.zst dexon-99b62f36b6443e8ed8ff4e8c09ee9267eeaea162.zip |
eth/downloader: header-chain order and ancestry check
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 2 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 65 | ||||
-rw-r--r-- | eth/downloader/queue.go | 17 |
3 files changed, 77 insertions, 7 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5a6bcdff0..f038e24e4 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1078,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // Otherwise insert all the new headers, aborting in case of junk glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) - inserts := d.queue.Insert(headerPack.headers) + inserts := d.queue.Insert(headerPack.headers, from) if len(inserts) != len(headerPack.headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6ce19480f..885fab8bd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -139,10 +139,6 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 { break } - // If there are queued blocks, but the head is missing, it's a stale leftover - if hashes+blocks > 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 && dl.downloader.queue.GetHeadBlock() == nil { - break - } // Otherwise sleep a bit and retry time.Sleep(time.Millisecond) } @@ -660,6 +656,67 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { } } +// Tests that headers are enqueued continuously, preventing malicious nodes from +// stalling the downloader by feeding gapped header chains. +func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62) } +func TestMissingHeaderAttack63(t *testing.T) { testMissingHeaderAttack(t, 63) } +func TestMissingHeaderAttack64(t *testing.T) { testMissingHeaderAttack(t, 64) } + +func testMissingHeaderAttack(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + + // Attempt a full sync with an attacker feeding gapped headers + tester.newPeer("attack", protocol, hashes, blocks) + missing := targetBlocks / 2 + delete(tester.peerBlocks["attack"], hashes[missing]) + + if err := tester.sync("attack", nil); err == nil { + t.Fatalf("succeeded attacker synchronisation") + } + // Synchronise with the valid peer and make sure sync succeeds + tester.newPeer("valid", protocol, hashes, blocks) + if err := tester.sync("valid", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != len(hashes) { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) + } +} + +// Tests that if requested headers are shifted (i.e. first is missing), the queue +// detects the invalid numbering. +func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62) } +func TestShiftedHeaderAttack63(t *testing.T) { testShiftedHeaderAttack(t, 63) } +func TestShiftedHeaderAttack64(t *testing.T) { testShiftedHeaderAttack(t, 64) } + +func testShiftedHeaderAttack(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + + // Attempt a full sync with an attacker feeding shifted headers + tester.newPeer("attack", protocol, hashes, blocks) + delete(tester.peerBlocks["attack"], hashes[len(hashes)-2]) + + if err := tester.sync("attack", nil); err == nil { + t.Fatalf("succeeded attacker synchronisation") + } + // Synchronise with the valid peer and make sure sync succeeds + tester.newPeer("valid", protocol, hashes, blocks) + if err := tester.sync("valid", nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != len(hashes) { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes)) + } +} + // Tests that if a peer sends an invalid body for a requested block, it gets // dropped immediately by the downloader. func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 7db78327b..49d1046fb 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -57,6 +57,7 @@ type queue struct { headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order pendPool map[string]*fetchRequest // Currently pending block retrieval operations @@ -91,6 +92,7 @@ func (q *queue) Reset() { q.headerPool = make(map[common.Hash]*types.Header) q.headerQueue.Reset() + q.headerHead = common.Hash{} q.pendPool = make(map[string]*fetchRequest) @@ -186,7 +188,7 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { // Insert adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Insert(headers []*types.Header) []*types.Header { +func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -196,13 +198,24 @@ func (q *queue) Insert(headers []*types.Header) []*types.Header { // Make sure no duplicate requests are executed hash := header.Hash() if _, ok := q.headerPool[hash]; ok { - glog.V(logger.Warn).Infof("Header %x already scheduled", hash) + glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4]) continue } + // Make sure chain order is honored and preserved throughout + if header.Number == nil || header.Number.Uint64() != from { + glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) + break + } + if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { + glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4]) + break + } // Queue the header for body retrieval inserts = append(inserts, header) q.headerPool[hash] = header q.headerQueue.Push(header, -float32(header.Number.Uint64())) + q.headerHead = hash + from++ } return inserts } |