diff options
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 96 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 45 | ||||
-rw-r--r-- | eth/downloader/queue.go | 10 |
3 files changed, 109 insertions, 42 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 426da9beb..95dd37fd7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -2,6 +2,7 @@ package downloader import ( "errors" + "math/rand" "sync" "sync/atomic" "time" @@ -14,15 +15,19 @@ import ( ) const ( - maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk + maxHashFetch = 512 // Amount of hashes to be fetched per chunk + maxBlockFetch = 128 // Amount of blocks to be fetched per chunk peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out + hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) var ( - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out + blockTTL = 5 * time.Second // Time it takes for a block request to time out + crossCheckCycle = time.Second // Period after which to check for expired cross checks + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) +var ( errLowTd = errors.New("peer's TD is too low") ErrBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") @@ -34,6 +39,7 @@ var ( errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") ErrInvalidChain = errors.New("retrieved hash chain is invalid") + ErrCrossCheckFailed = errors.New("block cross-check failed") errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") @@ -220,46 +226,47 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { start := time.Now() - // Add the hash to the queue first + // Add the hash to the queue first, and start hash retrieval d.queue.Insert([]common.Hash{h}) - - // Get the first batch of hashes p.getHashes(h) var ( - failureResponseTimer = time.NewTimer(hashTtl) - attemptedPeers = make(map[string]bool) // attempted peers will help with retries - activePeer = p // active peer will help determine the current active peer - hash common.Hash // common and last hash + active = p // active peer will help determine the current active peer + head = common.Hash{} // common and last hash + + timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer + attempted = make(map[string]bool) // attempted peers will help with retries + crossChecks = make(map[common.Hash]time.Time) // running cross checks and their deadline + crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) - attemptedPeers[p.id] = true + defer crossTicker.Stop() -out: - for { + attempted[p.id] = true + for finished := false; !finished; { select { case <-d.cancelCh: return errCancelHashFetch + case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != activePeer.id { + if hashPack.peerId != active.id { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) break } - - failureResponseTimer.Reset(hashTtl) + timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) d.queue.Reset() return errEmptyHashSet } // Determine if we're done fetching hashes (queue up all pending), and continue if not done done, index := false, 0 - for index, hash = range hashPack.hashes { - if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) + for index, head = range hashPack.hashes { + if d.hasBlock(head) || d.queue.GetBlock(head) != nil { + glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -267,25 +274,50 @@ out: } // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) - if inserts == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", activePeer.id) + if len(inserts) == 0 && !done { + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) d.queue.Reset() return ErrBadPeer } if !done { - activePeer.getHashes(hash) + // Try and fetch a random block to verify the hash batch + cross := inserts[rand.Intn(len(inserts))] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + + crossChecks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) + + // Also fetch a fresh + active.getHashes(head) continue } // We're done, allocate the download cache and proceed pulling the blocks offset := 0 - if block := d.getBlock(hash); block != nil { + if block := d.getBlock(head); block != nil { offset = int(block.NumberU64() + 1) } d.queue.Alloc(offset) - break out + finished = true + + case blockPack := <-d.blockCh: + // Cross check the block with the random verifications + if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { + continue + } + hash := blockPack.blocks[0].Hash() + delete(crossChecks, hash) + + case <-crossTicker.C: + // Iterate over all the cross checks and fail the hash chain if they're not verified + for hash, deadline := range crossChecks { + if time.Now().After(deadline) { + glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) + return ErrCrossCheckFailed + } + } - case <-failureResponseTimer.C: + case <-timeout.C: glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) var p *peer // p will be set if a peer can be found @@ -293,21 +325,21 @@ out: // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. for _, peer := range d.peers.AllPeers() { - if d.queue.Has(peer.head) && !attemptedPeers[peer.id] { + if d.queue.Has(peer.head) && !attempted[peer.id] { p = peer break } } // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. - if p == nil || (hash == common.Hash{}) { + if p == nil || (head == common.Hash{}) { d.queue.Reset() return ErrTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. - activePeer = p - p.getHashes(hash) + active = p + p.getHashes(head) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) } } @@ -359,7 +391,7 @@ out: // that badly or poorly behave are removed from the peer set (not banned). // Bad peers are excluded from the available peer set and therefor won't be // reused. XXX We could re-introduce peers after X time. - badPeers := d.queue.Expire(blockTtl) + badPeers := d.queue.Expire(blockTTL) for _, pid := range badPeers { // XXX We could make use of a reputation system here ranking peers // in their performance diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4b8ee93d2..60dcc06cd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -130,8 +130,23 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.blocks[knownHash] } -func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes) +// getHashes retrieves a batch of hashes for reconstructing the chain. +func (dl *downloadTester) getHashes(head common.Hash) error { + // Gather the next batch of hashes + hashes := make([]common.Hash, 0, maxHashFetch) + for i, hash := range dl.hashes { + if hash == head { + for len(hashes) < cap(hashes) && i < len(dl.hashes) { + hashes = append(hashes, dl.hashes[i]) + i++ + } + break + } + } + // Delay delivery a bit to allow attacks to unfold + time.Sleep(time.Millisecond) + + dl.downloader.DeliverHashes(dl.activePeerId, hashes) return nil } @@ -166,7 +181,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash func TestDownload(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -215,7 +230,7 @@ func TestMissing(t *testing.T) { func TestTaking(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -256,7 +271,7 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -282,7 +297,7 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) @@ -429,3 +444,21 @@ func TestInvalidHashOrderAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Tests that if a malicious peer makes up a random hash chain and tries to push +// indefinitely, it actually gets caught with it. +func TestMadeupHashChainAttack(t *testing.T) { + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of hashes without backing blocks + hashes := createHashes(0, 1024*blockCacheLimit) + hashes = hashes[:len(hashes)-1] + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, nil) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index aa48c521a..13ec9a520 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -123,13 +123,13 @@ func (q *queue) Has(hash common.Hash) bool { } // Insert adds a set of hashes for the download queue for scheduling, returning -// the number of new hashes encountered. -func (q *queue) Insert(hashes []common.Hash) int { +// the new hashes encountered. +func (q *queue) Insert(hashes []common.Hash) []common.Hash { q.lock.Lock() defer q.lock.Unlock() // Insert all the hashes prioritized in the arrival order - inserts := 0 + inserts := make([]common.Hash, 0, len(hashes)) for _, hash := range hashes { // Skip anything we already have if old, ok := q.hashPool[hash]; ok { @@ -137,7 +137,9 @@ func (q *queue) Insert(hashes []common.Hash) int { continue } // Update the counters and insert the hash - q.hashCounter, inserts = q.hashCounter+1, inserts+1 + q.hashCounter = q.hashCounter + 1 + inserts = append(inserts, hash) + q.hashPool[hash] = q.hashCounter q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first } |