From d2d5dbc6fbdb613a0c5b1967ee82a74cd94739a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 13:13:43 +0300 Subject: eth/downloader: fix active peer shadowing, polish func names --- eth/downloader/downloader.go | 28 ++++++++++------------------ eth/downloader/downloader_test.go | 8 ++++---- eth/handler.go | 4 ++-- 3 files changed, 16 insertions(+), 24 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 577152a21..c6eecfe2f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -55,10 +55,9 @@ type hashPack struct { } type Downloader struct { - mu sync.RWMutex - queue *queue - peers *peerSet - activePeer string + mu sync.RWMutex + queue *queue + peers *peerSet // Callbacks hasBlock hashCheckFn @@ -162,7 +161,6 @@ func (d *Downloader) Has(hash common.Hash) bool { // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { - d.activePeer = p.id defer func() { // reset on error if err != nil { @@ -416,32 +414,26 @@ out: return nil } -// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by -// the protocol handler. -func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error { +// DeliverBlocks injects a new batch of blocks received from a remote node. +// This is usually invoked through the BlocksMsg by the protocol handler. +func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - d.blockCh <- blockPack{id, blocks} return nil } -func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { +// DeliverHashes injects a new batch of hashes received from a remote node into +// the download schedule. This is usually invoked through the BlockHashesMsg by +// the protocol handler. +func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - - // make sure that the hashes that are being added are actually from the peer - // that's the current active peer. hashes that have been received from other - // peers are dropped and ignored. - if d.activePeer != id { - return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer) - } - if glog.V(logger.Debug) && len(hashes) != 0 { from, to := hashes[0], hashes[len(hashes)-1] glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 385ad2909..78eff011a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -76,7 +76,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { } func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.AddHashes(dl.activePeerId, dl.hashes) + dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes) return nil } @@ -87,7 +87,7 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { blocks[i] = dl.blocks[hash] } - go dl.downloader.DeliverChunk(id, blocks) + go dl.downloader.DeliverBlocks(id, blocks) return nil } @@ -188,12 +188,12 @@ func TestInactiveDownloader(t *testing.T) { blocks := createBlocksFromHashSet(createHashSet(hashes)) tester := newTester(t, hashes, nil) - err := tester.downloader.AddHashes("bad peer 001", hashes) + err := tester.downloader.DeliverHashes("bad peer 001", hashes) if err != errNoSyncActive { t.Error("expected no sync error, got", err) } - err = tester.downloader.DeliverChunk("bad peer 001", blocks) + err = tester.downloader.DeliverBlocks("bad peer 001", blocks) if err != errNoSyncActive { t.Error("expected no sync error, got", err) } diff --git a/eth/handler.go b/eth/handler.go index 88394543e..b2d741295 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -224,7 +224,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } - err := self.downloader.AddHashes(p.id, hashes) + err := self.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -264,7 +264,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - self.downloader.DeliverChunk(p.id, blocks) + self.downloader.DeliverBlocks(p.id, blocks) case NewBlockMsg: var request newBlockMsgData -- cgit v1.2.3 From ee0c8923035f44a44cfc9120255b807e90baada9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 13:47:21 +0300 Subject: eth/downloader: fix deliveries to check for sync cancels --- eth/downloader/downloader.go | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c6eecfe2f..04e9c3a21 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -70,7 +70,9 @@ type Downloader struct { newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack - cancelCh chan struct{} + + cancelCh chan struct{} // Channel to cancel mid-flight syncs + cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -83,6 +85,9 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), } + // Set the initial downloader state as canceled (sanity check) + downloader.cancelCh = make(chan struct{}) + close(downloader.cancelCh) return downloader } @@ -123,8 +128,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) - // Create cancel channel for aborting midflight + // Create cancel channel for aborting mid-flight + d.cancelLock.Lock() d.cancelCh = make(chan struct{}) + d.cancelLock.Unlock() // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { @@ -421,9 +428,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - d.blockCh <- blockPack{id, blocks} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.blockCh <- blockPack{id, blocks}: + return nil + + case <-cancel: + return errNoSyncActive + } } // DeliverHashes injects a new batch of hashes received from a remote node into @@ -434,11 +450,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - if glog.V(logger.Debug) && len(hashes) != 0 { - from, to := hashes[0], hashes[len(hashes)-1] - glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) - } - d.hashCh <- hashPack{id, hashes} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.hashCh <- hashPack{id, hashes}: + return nil + + case <-cancel: + return errNoSyncActive + } } -- cgit v1.2.3 From ec57aa64cda2e525687641971a54df15a04362d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 14:01:08 +0300 Subject: eth/downloader: sync the cancel channel during cancel too --- eth/downloader/downloader.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 04e9c3a21..0cbf42d30 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -190,32 +190,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. func (d *Downloader) Cancel() bool { - hs, bs := d.queue.Size() // If we're not syncing just return. + hs, bs := d.queue.Size() if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { return false } - + // Close the current cancel channel + d.cancelLock.RLock() close(d.cancelCh) - - // clean up -hashDone: - for { - select { - case <-d.hashCh: - default: - break hashDone - } - } - -blockDone: - for { - select { - case <-d.blockCh: - default: - break blockDone - } - } + d.cancelLock.RUnlock() // reset the queue d.queue.Reset() -- cgit v1.2.3 From 48ee0777a5acbf59aab691866eae5e9adf172f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 16:03:05 +0300 Subject: eth/downloader: add a user sync notificaton --- eth/downloader/downloader.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0cbf42d30..ba380eca3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -65,6 +65,7 @@ type Downloader struct { // Status synchronising int32 + notified int32 // Channels newPeerCh chan *peer @@ -128,6 +129,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // Post a user notification of the sync (only once per session) + if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { + glog.V(logger.Info).Infoln("Block synchronisation started") + } // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) -- cgit v1.2.3 From de3a71cafdbf20ad5887033aa991131bfb6e0ed2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 16:56:01 +0300 Subject: eth/downloader: remove a redundant sync progress check --- eth/downloader/downloader.go | 4 ---- 1 file changed, 4 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ba380eca3..5850ea13a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -86,10 +86,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), } - // Set the initial downloader state as canceled (sanity check) - downloader.cancelCh = make(chan struct{}) - close(downloader.cancelCh) - return downloader } -- cgit v1.2.3 From 910a6d4e46dfb06a671ef68c8d90c66cab107d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 17:25:01 +0300 Subject: eth/downloader: fix nil panic caused by wrong variable use --- eth/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c6eecfe2f..55455262a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -289,7 +289,7 @@ out: // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. for _, peer := range d.peers.AllPeers() { - if d.queue.Has(peer.head) && !attemptedPeers[p.id] { + if d.queue.Has(peer.head) && !attemptedPeers[peer.id] { p = peer break } -- cgit v1.2.3 From a4246c2da658d9b5b02a4caba511688748a88b19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 14 May 2015 15:24:18 +0300 Subject: eth, eth/downloader: handle a potential unknown parent attack --- eth/downloader/downloader.go | 20 +++++--- eth/downloader/downloader_test.go | 100 +++++++++++++++++++++++++++++++------- eth/sync.go | 23 ++++++--- 3 files changed, 112 insertions(+), 31 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c6eecfe2f..f33aa334a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -37,6 +37,7 @@ var ( errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") + ErrUnknownParent = errors.New("block has unknown parent") ) type hashCheckFn func(common.Hash) bool @@ -142,16 +143,19 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler -// it's possible it yields no blocks -func (d *Downloader) TakeBlocks() types.Blocks { - // Check that there are blocks available and its parents are known +// TakeBlocks takes blocks from the queue and yields them to the caller. +func (d *Downloader) TakeBlocks() (types.Blocks, error) { + // If the head block is missing, no blocks are ready head := d.queue.GetHeadBlock() - if head == nil || !d.hasBlock(head.ParentHash()) { - return nil + if head == nil { + return nil, nil } - // Retrieve a full batch of blocks - return d.queue.TakeBlocks(head) + // If the parent hash of the head is unknown, notify the caller + if !d.hasBlock(head.ParentHash()) { + return nil, ErrUnknownParent + } + // Otherwise retrieve a full batch of blocks + return d.queue.TakeBlocks(head), nil } func (d *Downloader) Has(hash common.Hash) bool { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 78eff011a..c3d1b2e00 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -10,7 +10,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} +var ( + knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} +) func createHashes(start, amount int) (hashes []common.Hash) { hashes = make([]common.Hash, amount+1) @@ -27,7 +30,7 @@ func createBlock(i int, prevHash, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) block.HeaderHash = hash - block.ParentHeaderHash = knownHash + block.ParentHeaderHash = prevHash return block } @@ -42,9 +45,12 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { } type downloadTester struct { - downloader *Downloader - hashes []common.Hash - blocks map[common.Hash]*types.Block + downloader *Downloader + + hashes []common.Hash // Chain of hashes simulating + blocks map[common.Hash]*types.Block // Blocks associated with the hashes + chain []common.Hash // Block-chain being constructed + t *testing.T pcount int done chan bool @@ -52,7 +58,15 @@ type downloadTester struct { } func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { - tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)} + tester := &downloadTester{ + t: t, + + hashes: hashes, + blocks: blocks, + chain: []common.Hash{knownHash}, + + done: make(chan bool), + } downloader := New(tester.hasBlock, tester.getBlock) tester.downloader = downloader @@ -64,9 +78,17 @@ func (dl *downloadTester) sync(peerId string, hash common.Hash) error { return dl.downloader.Synchronise(peerId, hash) } +func (dl *downloadTester) insertBlocks(blocks types.Blocks) { + for _, block := range blocks { + dl.chain = append(dl.chain, block.Hash()) + } +} + func (dl *downloadTester) hasBlock(hash common.Hash) bool { - if knownHash == hash { - return true + for _, h := range dl.chain { + if h == hash { + return true + } } return false } @@ -175,10 +197,12 @@ func TestTaking(t *testing.T) { if err != nil { t.Error("download error", err) } - - bs1 := tester.downloader.TakeBlocks() - if len(bs1) != 1000 { - t.Error("expected to take 1000, got", len(bs1)) + bs, err := tester.downloader.TakeBlocks() + if err != nil { + t.Fatalf("failed to take blocks: %v", err) + } + if len(bs) != targetBlocks { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) } } @@ -248,17 +272,18 @@ func TestThrottling(t *testing.T) { done := make(chan struct{}) took := []*types.Block{} go func() { - for { + for running := true; running; { select { case <-done: - took = append(took, tester.downloader.TakeBlocks()...) - done <- struct{}{} - return + running = false default: - took = append(took, tester.downloader.TakeBlocks()...) time.Sleep(time.Millisecond) } + // Take a batch of blocks and accumulate + blocks, _ := tester.downloader.TakeBlocks() + took = append(took, blocks...) } + done <- struct{}{} }() // Synchronise the two threads and verify @@ -273,3 +298,44 @@ func TestThrottling(t *testing.T) { t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) } } + +// Tests that if a peer returns an invalid chain with a block pointing to a non- +// existing parent, it is correctly detected and handled. +func TestNonExistingParentAttack(t *testing.T) { + // Forge a single-link chain with a forged header + hashes := createHashes(0, 1) + blocks := createBlocksFromHashes(hashes) + + forged := blocks[hashes[0]] + forged.ParentHeaderHash = unknownHash + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if err := tester.sync("attack", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs, err := tester.downloader.TakeBlocks() + if err != ErrUnknownParent { + t.Fatalf("take error mismatch: have %v, want %v", err, ErrUnknownParent) + } + if len(bs) != 0 { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), 0) + } + // Cancel the download due to the parent attack + tester.downloader.Cancel() + + // Reconstruct a valid chain, and try to synchronize with it + forged.ParentHeaderHash = knownHash + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs, err = tester.downloader.TakeBlocks() + if err != nil { + t.Fatalf("failed to retrieve blocks: %v", err) + } + if len(bs) != 1 { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), 1) + } +} diff --git a/eth/sync.go b/eth/sync.go index 00b571782..b51fb7c10 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/eth/downloader" @@ -14,6 +15,7 @@ import ( func (pm *ProtocolManager) update() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) + blockProcPend := int32(0) for { select { @@ -36,7 +38,14 @@ func (pm *ProtocolManager) update() { } case <-blockProc: // Try to pull some blocks from the downloaded - go pm.processBlocks() + if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { + go func() { + if err := pm.processBlocks(); err != nil { + pm.downloader.Cancel() + } + atomic.StoreInt32(&blockProcPend, 0) + }() + } case <-pm.quitSync: return @@ -52,8 +61,12 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() - // Take a batch of blocks (will return nil if a previous batch has not reached the chain yet) - blocks := pm.downloader.TakeBlocks() + // Take a batch of blocks, but abort if there's an invalid head or if the chain's empty + blocks, err := pm.downloader.TakeBlocks() + if err != nil { + glog.V(logger.Warn).Infof("Block processing failed: %v", err) + return err + } if len(blocks) == 0 { return nil } @@ -63,9 +76,7 @@ func (pm *ProtocolManager) processBlocks() error { max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { - // cancel download process - pm.downloader.Cancel() - + glog.V(logger.Warn).Infof("Block insertion failed: %v", err) return err } blocks = blocks[max:] -- cgit v1.2.3 From 3eda70c64c3b790573751227f8ac0fe42bdc0307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 14 May 2015 15:38:49 +0300 Subject: eth, eth/downloader: remove parent verification from the downlaoder --- eth/downloader/downloader.go | 14 ++------------ eth/downloader/downloader_test.go | 26 +++++++++----------------- eth/downloader/queue.go | 10 ++-------- eth/sync.go | 13 ++++--------- 4 files changed, 17 insertions(+), 46 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f33aa334a..fb023c7dd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -144,18 +144,8 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } // TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() (types.Blocks, error) { - // If the head block is missing, no blocks are ready - head := d.queue.GetHeadBlock() - if head == nil { - return nil, nil - } - // If the parent hash of the head is unknown, notify the caller - if !d.hasBlock(head.ParentHash()) { - return nil, ErrUnknownParent - } - // Otherwise retrieve a full batch of blocks - return d.queue.TakeBlocks(head), nil +func (d *Downloader) TakeBlocks() types.Blocks { + return d.queue.TakeBlocks() } func (d *Downloader) Has(hash common.Hash) bool { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c3d1b2e00..2a95b3d8e 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -197,10 +197,7 @@ func TestTaking(t *testing.T) { if err != nil { t.Error("download error", err) } - bs, err := tester.downloader.TakeBlocks() - if err != nil { - t.Fatalf("failed to take blocks: %v", err) - } + bs := tester.downloader.TakeBlocks() if len(bs) != targetBlocks { t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) } @@ -280,8 +277,7 @@ func TestThrottling(t *testing.T) { time.Sleep(time.Millisecond) } // Take a batch of blocks and accumulate - blocks, _ := tester.downloader.TakeBlocks() - took = append(took, blocks...) + took = append(took, tester.downloader.TakeBlocks()...) } done <- struct{}{} }() @@ -315,14 +311,13 @@ func TestNonExistingParentAttack(t *testing.T) { if err := tester.sync("attack", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs, err := tester.downloader.TakeBlocks() - if err != ErrUnknownParent { - t.Fatalf("take error mismatch: have %v, want %v", err, ErrUnknownParent) + bs := tester.downloader.TakeBlocks() + if len(bs) != 1 { + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } - if len(bs) != 0 { - t.Error("retrieved block mismatch: have %v, want %v", len(bs), 0) + if tester.hasBlock(bs[0].ParentHash()) { + t.Fatalf("tester knows about the unknown hash") } - // Cancel the download due to the parent attack tester.downloader.Cancel() // Reconstruct a valid chain, and try to synchronize with it @@ -331,11 +326,8 @@ func TestNonExistingParentAttack(t *testing.T) { if err := tester.sync("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs, err = tester.downloader.TakeBlocks() - if err != nil { - t.Fatalf("failed to retrieve blocks: %v", err) - } + bs = tester.downloader.TakeBlocks() if len(bs) != 1 { - t.Error("retrieved block mismatch: have %v, want %v", len(bs), 1) + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 40749698c..6ad915757 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -172,17 +172,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block { } // TakeBlocks retrieves and permanently removes a batch of blocks from the cache. -// The head parameter is required to prevent a race condition where concurrent -// takes may fail parent verifications. -func (q *queue) TakeBlocks(head *types.Block) types.Blocks { +func (q *queue) TakeBlocks() types.Blocks { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the head block's different - if len(q.blockCache) == 0 || q.blockCache[0] != head { - return nil - } - // Otherwise accumulate all available blocks + // Accumulate all available blocks var blocks types.Blocks for _, block := range q.blockCache { if block == nil { diff --git a/eth/sync.go b/eth/sync.go index b51fb7c10..c89f34596 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -40,9 +40,7 @@ func (pm *ProtocolManager) update() { // Try to pull some blocks from the downloaded if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { go func() { - if err := pm.processBlocks(); err != nil { - pm.downloader.Cancel() - } + pm.processBlocks() atomic.StoreInt32(&blockProcPend, 0) }() } @@ -61,12 +59,8 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() - // Take a batch of blocks, but abort if there's an invalid head or if the chain's empty - blocks, err := pm.downloader.TakeBlocks() - if err != nil { - glog.V(logger.Warn).Infof("Block processing failed: %v", err) - return err - } + // Short circuit if no blocks are available for insertion + blocks := pm.downloader.TakeBlocks() if len(blocks) == 0 { return nil } @@ -77,6 +71,7 @@ func (pm *ProtocolManager) processBlocks() error { _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { glog.V(logger.Warn).Infof("Block insertion failed: %v", err) + pm.downloader.Cancel() return err } blocks = blocks[max:] -- cgit v1.2.3 From ebf1eb9359617468103d05764f74796278dfa0d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 14 May 2015 15:40:28 +0300 Subject: eth/downloader: remove a previous leftover --- eth/downloader/downloader.go | 1 - 1 file changed, 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index fb023c7dd..bfda3050b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -37,7 +37,6 @@ var ( errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") - ErrUnknownParent = errors.New("block has unknown parent") ) type hashCheckFn func(common.Hash) bool -- cgit v1.2.3 From fe87feccb157b2426075523a592cabcb4c6d1cf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 14 May 2015 15:44:54 +0300 Subject: eth/downloader: add a small additional check to the test --- eth/downloader/downloader_test.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 2a95b3d8e..cfa6257a3 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -330,4 +330,7 @@ func TestNonExistingParentAttack(t *testing.T) { if len(bs) != 1 { t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } + if !tester.hasBlock(bs[0].ParentHash()) { + t.Fatalf("tester doesn't know about the origin hash") + } } -- cgit v1.2.3 From b71091e337fef7e3cfad56c61c97a42094e87531 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 15 May 2015 00:43:00 +0200 Subject: eth, eth/downloader, miner: use download events to check miner start --- eth/backend.go | 4 ++-- eth/downloader/downloader.go | 17 ++++++++++++++++- eth/downloader/events.go | 5 +++++ 3 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 eth/downloader/events.go (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 362a7eab7..064018955 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -265,12 +265,12 @@ func New(config *Config) (*Ethereum, error) { } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) - eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.GetBlock) + eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.pow = ethash.New() eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.miner = miner.New(eth, eth.pow) + eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index cc75c3014..616971f73 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) @@ -55,6 +56,8 @@ type hashPack struct { } type Downloader struct { + mux *event.TypeMux + mu sync.RWMutex queue *queue peers *peerSet @@ -76,8 +79,9 @@ type Downloader struct { cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers } -func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { downloader := &Downloader{ + mux: mux, queue: newQueue(), peers: newPeerSet(), hasBlock: hasBlock, @@ -93,6 +97,11 @@ func (d *Downloader) Stats() (current int, max int) { return d.queue.Size() } +// Synchronising returns the state of the downloader +func (d *Downloader) Synchronising() bool { + return atomic.LoadInt32(&d.synchronising) > 0 +} + // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { @@ -129,6 +138,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } + + d.mux.Post(StartEvent{}) + // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -166,6 +178,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // reset on error if err != nil { d.queue.Reset() + d.mux.Post(FailedEvent{err}) + } else { + d.mux.Post(DoneEvent{}) } }() diff --git a/eth/downloader/events.go b/eth/downloader/events.go new file mode 100644 index 000000000..333feb976 --- /dev/null +++ b/eth/downloader/events.go @@ -0,0 +1,5 @@ +package downloader + +type DoneEvent struct{} +type StartEvent struct{} +type FailedEvent struct{ Err error } -- cgit v1.2.3 From cfb2b51b749a5897eb47bee15e67a0f9d850bfbd Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 15 May 2015 12:26:34 +0200 Subject: eth/downloader: fixed test --- eth/downloader/downloader_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index cfa6257a3..50fe00d42 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" ) var ( @@ -67,7 +68,8 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types done: make(chan bool), } - downloader := New(tester.hasBlock, tester.getBlock) + var mux event.TypeMux + downloader := New(&mux, tester.hasBlock, tester.getBlock) tester.downloader = downloader return tester -- cgit v1.2.3 From cd2fb0905109828028172c84f9c10f1343647ca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 01:40:16 +0300 Subject: eth, eth/downloader: prevent hash repeater attack --- eth/downloader/downloader.go | 10 ++++++---- eth/downloader/downloader_test.go | 29 +++++++++++++++++++++++++++++ eth/downloader/queue.go | 20 +++++++++++--------- eth/sync.go | 6 ++++-- 4 files changed, 50 insertions(+), 15 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 616971f73..3da606aef 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -27,7 +27,7 @@ var ( errLowTd = errors.New("peer's TD is too low") ErrBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") + ErrBadPeer = errors.New("action from bad peer ignored") errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") @@ -266,9 +266,11 @@ out: break } } - d.queue.Insert(hashPack.hashes) - - if !done { + // Insert all the new hashes, but only continue if got something useful + inserts := d.queue.Insert(hashPack.hashes) + if inserts == 0 && !done { + return ErrBadPeer + } else if !done { activePeer.getHashes(hash) continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 50fe00d42..1bba3ca51 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -336,3 +336,32 @@ func TestNonExistingParentAttack(t *testing.T) { t.Fatalf("tester doesn't know about the origin hash") } } + +// Tests that if a malicious peers keeps sending us repeating hashes, we don't +// loop indefinitely. +func TestRepeatingHashAttack(t *testing.T) { + // Create a valid chain, but drop the last link + hashes := createHashes(1000, 1) + blocks := createBlocksFromHashes(hashes) + + hashes = hashes[:len(hashes)-1] + + // Try and sync with the malicious node + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + + errc := make(chan error) + go func() { + errc <- tester.sync("attack", hashes[0]) + }() + + // Make sure that syncing returns and does so with a failure + select { + case <-time.After(100 * time.Millisecond): + t.Fatalf("synchronisation blocked") + case err := <-errc: + if err == nil { + t.Fatalf("synchronisation succeeded") + } + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 6ad915757..fdea1f63f 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -122,24 +122,26 @@ func (q *queue) Has(hash common.Hash) bool { return false } -// Insert adds a set of hashes for the download queue for scheduling. -func (q *queue) Insert(hashes []common.Hash) { +// Insert adds a set of hashes for the download queue for scheduling, returning +// the number of new hashes encountered. +func (q *queue) Insert(hashes []common.Hash) int { q.lock.Lock() defer q.lock.Unlock() // Insert all the hashes prioritized in the arrival order - for i, hash := range hashes { - index := q.hashCounter + i - + inserts := 0 + for _, hash := range hashes { + // Skip anything we already have if old, ok := q.hashPool[hash]; ok { glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old) continue } - q.hashPool[hash] = index - q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first + // Update the counters and insert the hash + q.hashCounter, inserts = q.hashCounter+1, inserts+1 + q.hashPool[hash] = q.hashCounter + q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first } - // Update the hash counter for the next batch of inserts - q.hashCounter += len(hashes) + return inserts } // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't diff --git a/eth/sync.go b/eth/sync.go index c89f34596..73f6253be 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -101,11 +101,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrBusy: glog.V(logger.Debug).Infof("Synchronisation already in progress") - case downloader.ErrTimeout: - glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id) + case downloader.ErrTimeout, downloader.ErrBadPeer: + glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) pm.removePeer(peer) + case downloader.ErrPendingQueue: glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) + default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } -- cgit v1.2.3 From 366e9627e8cdddd8111789dc13050b1dd8ec6fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 01:56:52 +0300 Subject: eth/downloader: add a test for detecting missing blocks --- eth/downloader/downloader.go | 4 +--- eth/downloader/downloader_test.go | 28 +++++++++++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 3da606aef..c90d289df 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -2,7 +2,6 @@ package downloader import ( "errors" - "fmt" "sync" "sync/atomic" "time" @@ -398,8 +397,7 @@ out: // and all failed throw an error if d.queue.InFlight() == 0 { d.queue.Reset() - - return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending()) + return errPeersUnavailable } } else if d.queue.InFlight() == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1bba3ca51..46538d6b4 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -106,11 +106,12 @@ func (dl *downloadTester) getHashes(hash common.Hash) error { func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { return func(hashes []common.Hash) error { - blocks := make([]*types.Block, len(hashes)) - for i, hash := range hashes { - blocks[i] = dl.blocks[hash] + blocks := make([]*types.Block, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := dl.blocks[hash]; ok { + blocks = append(blocks, block) + } } - go dl.downloader.DeliverBlocks(id, blocks) return nil @@ -341,7 +342,7 @@ func TestNonExistingParentAttack(t *testing.T) { // loop indefinitely. func TestRepeatingHashAttack(t *testing.T) { // Create a valid chain, but drop the last link - hashes := createHashes(1000, 1) + hashes := createHashes(0, 1000) blocks := createBlocksFromHashes(hashes) hashes = hashes[:len(hashes)-1] @@ -365,3 +366,20 @@ func TestRepeatingHashAttack(t *testing.T) { } } } + +// Tests that if a malicious peers returns a non-existent block hash, it should +// eventually time out and the sync reattempted. +func TestNonExistingBlockAttack(t *testing.T) { + // Create a valid chain, but forge the last link + hashes := createHashes(0, 10) + blocks := createBlocksFromHashes(hashes) + + hashes[len(hashes)/2] = unknownHash + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) + } +} -- cgit v1.2.3 From bcc2980179f39eea4825df72ad882274086d912e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 02:14:21 +0300 Subject: eth/downloader: check sync after failed attacks --- eth/downloader/downloader.go | 6 +++++- eth/downloader/downloader_test.go | 24 ++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c90d289df..30be0dde5 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -268,8 +268,12 @@ out: // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) if inserts == 0 && !done { + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", activePeer.id) + d.queue.Reset() + return ErrBadPeer - } else if !done { + } + if !done { activePeer.getHashes(hash) continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 46538d6b4..7888bd1e0 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -342,14 +342,13 @@ func TestNonExistingParentAttack(t *testing.T) { // loop indefinitely. func TestRepeatingHashAttack(t *testing.T) { // Create a valid chain, but drop the last link - hashes := createHashes(0, 1000) + hashes := createHashes(0, blockCacheLimit) blocks := createBlocksFromHashes(hashes) - - hashes = hashes[:len(hashes)-1] + forged := hashes[:len(hashes)-1] // Try and sync with the malicious node - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester := newTester(t, forged, blocks) + tester.newPeer("attack", big.NewInt(10000), forged[0]) errc := make(chan error) go func() { @@ -365,14 +364,21 @@ func TestRepeatingHashAttack(t *testing.T) { t.Fatalf("synchronisation succeeded") } } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } // Tests that if a malicious peers returns a non-existent block hash, it should // eventually time out and the sync reattempted. func TestNonExistingBlockAttack(t *testing.T) { // Create a valid chain, but forge the last link - hashes := createHashes(0, 10) + hashes := createHashes(0, blockCacheLimit) blocks := createBlocksFromHashes(hashes) + origin := hashes[len(hashes)/2] hashes[len(hashes)/2] = unknownHash @@ -382,4 +388,10 @@ func TestNonExistingBlockAttack(t *testing.T) { if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) } + // Ensure that a valid chain can still pass sync + hashes[len(hashes)/2] = origin + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } -- cgit v1.2.3 From 72411eb24c47a6b41d8530e6057a88c60491f0e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 11:58:37 +0300 Subject: eth/downloader: circumvent hash reordering attacks --- eth/downloader/downloader.go | 36 +++++++++------- eth/downloader/downloader_test.go | 90 +++++++++++++++++++++++++++------------ eth/downloader/queue.go | 11 +++-- 3 files changed, 88 insertions(+), 49 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 30be0dde5..426da9beb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -23,20 +23,20 @@ var ( minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out - errLowTd = errors.New("peer's TD is too low") - ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") - errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errBlockNumberOverflow = errors.New("received block which overflows") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + errLowTd = errors.New("peer's TD is too low") + ErrBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") + ErrBadPeer = errors.New("action from bad peer ignored") + errNoPeers = errors.New("no peers to keep download active") + ErrPendingQueue = errors.New("pending items in queue") + ErrTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + ErrInvalidChain = errors.New("retrieved hash chain is invalid") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -334,8 +334,14 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks, but drop the peer if invalid + // Deliver the received chunk of blocks if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + if err == ErrInvalidChain { + // The hash chain is invalid (blocks are not ordered properly), abort + d.queue.Reset() + return err + } + // Peer did deliver, but some blocks were off, penalize glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) peer.Demote() break diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7888bd1e0..4b8ee93d2 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,9 +75,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types return tester } -func (dl *downloadTester) sync(peerId string, hash common.Hash) error { +// sync is a simple wrapper around the downloader to start synchronisation and +// block until it returns +func (dl *downloadTester) sync(peerId string, head common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, hash) + return dl.downloader.Synchronise(peerId, head) +} + +// syncTake is starts synchronising with a remote peer, but concurrently it also +// starts fetching blocks that the downloader retrieved. IT blocks until both go +// routines terminate. +func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) { + // Start a block collector to take blocks as they become available + done := make(chan struct{}) + took := []*types.Block{} + go func() { + for running := true; running; { + select { + case <-done: + running = false + default: + time.Sleep(time.Millisecond) + } + // Take a batch of blocks and accumulate + took = append(took, dl.downloader.TakeBlocks()...) + } + done <- struct{}{} + }() + // Start the downloading, sync the taker and return + err := dl.sync(peerId, head) + + done <- struct{}{} + <-done + + return took, err } func (dl *downloadTester) insertBlocks(blocks types.Blocks) { @@ -264,32 +295,7 @@ func TestThrottling(t *testing.T) { tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) // Concurrently download and take the blocks - errc := make(chan error, 1) - go func() { - errc <- tester.sync("peer1", hashes[0]) - }() - - done := make(chan struct{}) - took := []*types.Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - took = append(took, tester.downloader.TakeBlocks()...) - } - done <- struct{}{} - }() - - // Synchronise the two threads and verify - err := <-errc - done <- struct{}{} - <-done - + took, err := tester.syncTake("peer1", hashes[0]) if err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -395,3 +401,31 @@ func TestNonExistingBlockAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Tests that if a malicious peer is returning hashes in a weird order, that the +// sync throttler doesn't choke on them waiting for the valid blocks. +func TestInvalidHashOrderAttack(t *testing.T) { + // Create a valid long chain, but reverse some hashes within + hashes := createHashes(0, 4*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + + reverse := make([]common.Hash, len(hashes)) + copy(reverse, hashes) + + for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ { + reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i] + } + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, reverse, blocks) + tester.newPeer("attack", big.NewInt(10000), reverse[0]) + if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index fdea1f63f..aa48c521a 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -298,18 +298,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // Iterate over the downloaded blocks and add each of them errs := make([]error, 0) for _, block := range blocks { - // Skip any blocks that fall outside the cache range - index := int(block.NumberU64()) - q.blockOffset - if index >= len(q.blockCache) || index < 0 { - //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache)) - continue - } // Skip any blocks that were not requested hash := block.Hash() if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested block %v", hash)) continue } + // If a requested block falls out of the range, the hash chain is invalid + index := int(block.NumberU64()) - q.blockOffset + if index >= len(q.blockCache) || index < 0 { + return ErrInvalidChain + } // Otherwise merge the block and mark the hash block q.blockCache[index] = block -- cgit v1.2.3 From 9ad515d2dc62c5d1ce1099efa89bd0a0b3f06a67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 12:01:05 +0300 Subject: eth: drop a sync peer if it sends an invalid hash chain --- eth/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/sync.go b/eth/sync.go index 73f6253be..cc1fe80ea 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -101,7 +101,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrBusy: glog.V(logger.Debug).Infof("Synchronisation already in progress") - case downloader.ErrTimeout, downloader.ErrBadPeer: + case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) pm.removePeer(peer) -- cgit v1.2.3 From 83226762c20dbf48939d76046ad32422a44feda0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 13:14:46 +0300 Subject: eth, eth/downloader: detect and handle madeup hash attacks --- eth/downloader/downloader.go | 96 ++++++++++++++++++++++++++------------- eth/downloader/downloader_test.go | 45 +++++++++++++++--- eth/downloader/queue.go | 10 ++-- eth/sync.go | 2 +- 4 files changed, 110 insertions(+), 43 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 426da9beb..95dd37fd7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -2,6 +2,7 @@ package downloader import ( "errors" + "math/rand" "sync" "sync/atomic" "time" @@ -14,15 +15,19 @@ import ( ) const ( - maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk + maxHashFetch = 512 // Amount of hashes to be fetched per chunk + maxBlockFetch = 128 // Amount of blocks to be fetched per chunk peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out + hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) var ( - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out + blockTTL = 5 * time.Second // Time it takes for a block request to time out + crossCheckCycle = time.Second // Period after which to check for expired cross checks + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) +var ( errLowTd = errors.New("peer's TD is too low") ErrBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") @@ -34,6 +39,7 @@ var ( errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") ErrInvalidChain = errors.New("retrieved hash chain is invalid") + ErrCrossCheckFailed = errors.New("block cross-check failed") errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") @@ -220,46 +226,47 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { start := time.Now() - // Add the hash to the queue first + // Add the hash to the queue first, and start hash retrieval d.queue.Insert([]common.Hash{h}) - - // Get the first batch of hashes p.getHashes(h) var ( - failureResponseTimer = time.NewTimer(hashTtl) - attemptedPeers = make(map[string]bool) // attempted peers will help with retries - activePeer = p // active peer will help determine the current active peer - hash common.Hash // common and last hash + active = p // active peer will help determine the current active peer + head = common.Hash{} // common and last hash + + timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer + attempted = make(map[string]bool) // attempted peers will help with retries + crossChecks = make(map[common.Hash]time.Time) // running cross checks and their deadline + crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) - attemptedPeers[p.id] = true + defer crossTicker.Stop() -out: - for { + attempted[p.id] = true + for finished := false; !finished; { select { case <-d.cancelCh: return errCancelHashFetch + case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != activePeer.id { + if hashPack.peerId != active.id { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) break } - - failureResponseTimer.Reset(hashTtl) + timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) d.queue.Reset() return errEmptyHashSet } // Determine if we're done fetching hashes (queue up all pending), and continue if not done done, index := false, 0 - for index, hash = range hashPack.hashes { - if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) + for index, head = range hashPack.hashes { + if d.hasBlock(head) || d.queue.GetBlock(head) != nil { + glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -267,25 +274,50 @@ out: } // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) - if inserts == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", activePeer.id) + if len(inserts) == 0 && !done { + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) d.queue.Reset() return ErrBadPeer } if !done { - activePeer.getHashes(hash) + // Try and fetch a random block to verify the hash batch + cross := inserts[rand.Intn(len(inserts))] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + + crossChecks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) + + // Also fetch a fresh + active.getHashes(head) continue } // We're done, allocate the download cache and proceed pulling the blocks offset := 0 - if block := d.getBlock(hash); block != nil { + if block := d.getBlock(head); block != nil { offset = int(block.NumberU64() + 1) } d.queue.Alloc(offset) - break out + finished = true + + case blockPack := <-d.blockCh: + // Cross check the block with the random verifications + if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { + continue + } + hash := blockPack.blocks[0].Hash() + delete(crossChecks, hash) + + case <-crossTicker.C: + // Iterate over all the cross checks and fail the hash chain if they're not verified + for hash, deadline := range crossChecks { + if time.Now().After(deadline) { + glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) + return ErrCrossCheckFailed + } + } - case <-failureResponseTimer.C: + case <-timeout.C: glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) var p *peer // p will be set if a peer can be found @@ -293,21 +325,21 @@ out: // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. for _, peer := range d.peers.AllPeers() { - if d.queue.Has(peer.head) && !attemptedPeers[peer.id] { + if d.queue.Has(peer.head) && !attempted[peer.id] { p = peer break } } // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. - if p == nil || (hash == common.Hash{}) { + if p == nil || (head == common.Hash{}) { d.queue.Reset() return ErrTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. - activePeer = p - p.getHashes(hash) + active = p + p.getHashes(head) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) } } @@ -359,7 +391,7 @@ out: // that badly or poorly behave are removed from the peer set (not banned). // Bad peers are excluded from the available peer set and therefor won't be // reused. XXX We could re-introduce peers after X time. - badPeers := d.queue.Expire(blockTtl) + badPeers := d.queue.Expire(blockTTL) for _, pid := range badPeers { // XXX We could make use of a reputation system here ranking peers // in their performance diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4b8ee93d2..60dcc06cd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -130,8 +130,23 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.blocks[knownHash] } -func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes) +// getHashes retrieves a batch of hashes for reconstructing the chain. +func (dl *downloadTester) getHashes(head common.Hash) error { + // Gather the next batch of hashes + hashes := make([]common.Hash, 0, maxHashFetch) + for i, hash := range dl.hashes { + if hash == head { + for len(hashes) < cap(hashes) && i < len(dl.hashes) { + hashes = append(hashes, dl.hashes[i]) + i++ + } + break + } + } + // Delay delivery a bit to allow attacks to unfold + time.Sleep(time.Millisecond) + + dl.downloader.DeliverHashes(dl.activePeerId, hashes) return nil } @@ -166,7 +181,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash func TestDownload(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -215,7 +230,7 @@ func TestMissing(t *testing.T) { func TestTaking(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -256,7 +271,7 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -282,7 +297,7 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) @@ -429,3 +444,21 @@ func TestInvalidHashOrderAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Tests that if a malicious peer makes up a random hash chain and tries to push +// indefinitely, it actually gets caught with it. +func TestMadeupHashChainAttack(t *testing.T) { + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of hashes without backing blocks + hashes := createHashes(0, 1024*blockCacheLimit) + hashes = hashes[:len(hashes)-1] + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, nil) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index aa48c521a..13ec9a520 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -123,13 +123,13 @@ func (q *queue) Has(hash common.Hash) bool { } // Insert adds a set of hashes for the download queue for scheduling, returning -// the number of new hashes encountered. -func (q *queue) Insert(hashes []common.Hash) int { +// the new hashes encountered. +func (q *queue) Insert(hashes []common.Hash) []common.Hash { q.lock.Lock() defer q.lock.Unlock() // Insert all the hashes prioritized in the arrival order - inserts := 0 + inserts := make([]common.Hash, 0, len(hashes)) for _, hash := range hashes { // Skip anything we already have if old, ok := q.hashPool[hash]; ok { @@ -137,7 +137,9 @@ func (q *queue) Insert(hashes []common.Hash) int { continue } // Update the counters and insert the hash - q.hashCounter, inserts = q.hashCounter+1, inserts+1 + q.hashCounter = q.hashCounter + 1 + inserts = append(inserts, hash) + q.hashPool[hash] = q.hashCounter q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first } diff --git a/eth/sync.go b/eth/sync.go index cc1fe80ea..aa7ebc77b 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -101,7 +101,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrBusy: glog.V(logger.Debug).Infof("Synchronisation already in progress") - case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain: + case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) pm.removePeer(peer) -- cgit v1.2.3 From b517967f8626d5a06772c488360f895c4186712f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 13:32:21 +0300 Subject: eth/downloader: don't penalize for stale cross checks --- eth/downloader/downloader.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95dd37fd7..2807a4bcf 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -63,9 +63,10 @@ type hashPack struct { type Downloader struct { mux *event.TypeMux - mu sync.RWMutex - queue *queue - peers *peerSet + mu sync.RWMutex + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain // Callbacks hasBlock hashCheckFn @@ -89,6 +90,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa mux: mux, queue: newQueue(), peers: newPeerSet(), + checks: make(map[common.Hash]time.Time), hasBlock: hasBlock, getBlock: getBlock, newPeerCh: make(chan *peer, 1), @@ -236,7 +238,6 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer attempted = make(map[string]bool) // attempted peers will help with retries - crossChecks = make(map[common.Hash]time.Time) // running cross checks and their deadline crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) defer crossTicker.Stop() @@ -285,7 +286,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { cross := inserts[rand.Intn(len(inserts))] glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) - crossChecks[cross] = time.Now().Add(blockTTL) + d.checks[cross] = time.Now().Add(blockTTL) active.getBlocks([]common.Hash{cross}) // Also fetch a fresh @@ -306,11 +307,11 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } hash := blockPack.blocks[0].Hash() - delete(crossChecks, hash) + delete(d.checks, hash) case <-crossTicker.C: // Iterate over all the cross checks and fail the hash chain if they're not verified - for hash, deadline := range crossChecks { + for hash, deadline := range d.checks { if time.Now().After(deadline) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) return ErrCrossCheckFailed @@ -362,7 +363,16 @@ out: select { case <-d.cancelCh: return errCancelBlockFetch + case blockPack := <-d.blockCh: + // Short circuit if it's a stale cross check + if len(blockPack.blocks) == 1 { + block := blockPack.blocks[0] + if _, ok := d.checks[block.Hash()]; ok { + delete(d.checks, block.Hash()) + continue + } + } // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { -- cgit v1.2.3 From 5c1a7b965ca7901d3b185d75205419b87163a4fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 14:48:57 +0300 Subject: eth/downloader: circumvent a fake blockchain attack --- eth/downloader/downloader.go | 21 ++++++------- eth/downloader/downloader_test.go | 64 ++++++++++++++++++++++++++++++--------- 2 files changed, 58 insertions(+), 27 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 2807a4bcf..f9bd5a635 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -90,7 +90,6 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa mux: mux, queue: newQueue(), peers: newPeerSet(), - checks: make(map[common.Hash]time.Time), hasBlock: hasBlock, getBlock: getBlock, newPeerCh: make(chan *peer, 1), @@ -160,6 +159,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() + d.checks = make(map[common.Hash]time.Time) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -184,7 +184,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { defer func() { // reset on error if err != nil { - d.queue.Reset() + d.Cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -259,8 +259,6 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) - d.queue.Reset() - return errEmptyHashSet } // Determine if we're done fetching hashes (queue up all pending), and continue if not done @@ -277,8 +275,6 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) - d.queue.Reset() - return ErrBadPeer } if !done { @@ -306,8 +302,13 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { continue } - hash := blockPack.blocks[0].Hash() - delete(d.checks, hash) + block := blockPack.blocks[0] + if _, ok := d.checks[block.Hash()]; ok { + if !d.queue.Has(block.ParentHash()) { + return ErrCrossCheckFailed + } + delete(d.checks, block.Hash()) + } case <-crossTicker.C: // Iterate over all the cross checks and fail the hash chain if they're not verified @@ -334,7 +335,6 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (head == common.Hash{}) { - d.queue.Reset() return ErrTimeout } // set p to the active peer. this will invalidate any hashes that may be returned @@ -380,7 +380,6 @@ out: if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { if err == ErrInvalidChain { // The hash chain is invalid (blocks are not ordered properly), abort - d.queue.Reset() return err } // Peer did deliver, but some blocks were off, penalize @@ -414,7 +413,6 @@ out: } // After removing bad peers make sure we actually have sufficient peer left to keep downloading if d.peers.Len() == 0 { - d.queue.Reset() return errNoPeers } // If there are unrequested hashes left start fetching @@ -448,7 +446,6 @@ out: // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if d.queue.InFlight() == 0 { - d.queue.Reset() return errPeersUnavailable } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 60dcc06cd..d55664314 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -23,25 +23,26 @@ func createHashes(start, amount int) (hashes []common.Hash) { for i := range hashes[:len(hashes)-1] { binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2)) } - return } -func createBlock(i int, prevHash, hash common.Hash) *types.Block { +func createBlock(i int, parent, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) block.HeaderHash = hash - block.ParentHeaderHash = prevHash + block.ParentHeaderHash = parent return block } func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { blocks := make(map[common.Hash]*types.Block) - - for i, hash := range hashes { - blocks[hash] = createBlock(len(hashes)-i, knownHash, hash) + for i := 0; i < len(hashes); i++ { + parent := knownHash + if i < len(hashes)-1 { + parent = hashes[i+1] + } + blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i]) } - return blocks } @@ -136,6 +137,7 @@ func (dl *downloadTester) getHashes(head common.Hash) error { hashes := make([]common.Hash, 0, maxHashFetch) for i, hash := range dl.hashes { if hash == head { + i++ for len(hashes) < cap(hashes) && i < len(dl.hashes) { hashes = append(hashes, dl.hashes[i]) i++ @@ -144,9 +146,11 @@ func (dl *downloadTester) getHashes(head common.Hash) error { } } // Delay delivery a bit to allow attacks to unfold - time.Sleep(time.Millisecond) - - dl.downloader.DeliverHashes(dl.activePeerId, hashes) + id := dl.activePeerId + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, hashes) + }() return nil } @@ -424,12 +428,15 @@ func TestInvalidHashOrderAttack(t *testing.T) { hashes := createHashes(0, 4*blockCacheLimit) blocks := createBlocksFromHashes(hashes) + chunk1 := make([]common.Hash, blockCacheLimit) + chunk2 := make([]common.Hash, blockCacheLimit) + copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit]) + copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit]) + reverse := make([]common.Hash, len(hashes)) copy(reverse, hashes) - - for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ { - reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i] - } + copy(reverse[2*blockCacheLimit:], chunk1) + copy(reverse[blockCacheLimit:], chunk2) // Try and sync with the malicious node and check that it fails tester := newTester(t, reverse, blocks) @@ -453,7 +460,6 @@ func TestMadeupHashChainAttack(t *testing.T) { // Create a long chain of hashes without backing blocks hashes := createHashes(0, 1024*blockCacheLimit) - hashes = hashes[:len(hashes)-1] // Try and sync with the malicious node and check that it fails tester := newTester(t, hashes, nil) @@ -462,3 +468,31 @@ func TestMadeupHashChainAttack(t *testing.T) { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) } } + +// Tests that if a malicious peer makes up a random block chain, and tried to +// push indefinitely, it actually gets caught with it. +func TestMadeupBlockChainAttack(t *testing.T) { + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of blocks and simulate an invalid chain by dropping every second + hashes := createHashes(0, 32*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + + gapped := make([]common.Hash, len(hashes)/2) + for i := 0; i < len(gapped); i++ { + gapped[i] = hashes[2*i] + } + // Try and sync with the malicious node and check that it fails + tester := newTester(t, gapped, blocks) + tester.newPeer("attack", big.NewInt(10000), gapped[0]) + if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} -- cgit v1.2.3 From 4f0d88cb023d51bdc5f38cdb54dd11c51765c98b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 19:43:42 +0300 Subject: eth/downloader: fix cancel channel double close --- eth/downloader/downloader.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f9bd5a635..a0a5b20a2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -212,9 +212,14 @@ func (d *Downloader) Cancel() bool { return false } // Close the current cancel channel - d.cancelLock.RLock() - close(d.cancelCh) - d.cancelLock.RUnlock() + d.cancelLock.Lock() + select { + case <-d.cancelCh: + // Channel was already closed + default: + close(d.cancelCh) + } + d.cancelLock.Unlock() // reset the queue d.queue.Reset() -- cgit v1.2.3 From f3ae8f50a5dfafa667fbe5e6a6574e69162d5ee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 20:54:10 +0300 Subject: eth/downloader: circumvent download race between crosscheck and hashes --- eth/downloader/downloader.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a0a5b20a2..1bc81406c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -284,12 +284,14 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } if !done { // Try and fetch a random block to verify the hash batch - cross := inserts[rand.Intn(len(inserts))] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) - - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) + // Skip the last hash as the cross check races with the next hash fetch + if len(inserts) > 1 { + cross := inserts[rand.Intn(len(inserts)-1)] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + d.checks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) + } // Also fetch a fresh active.getHashes(head) continue -- cgit v1.2.3 From 412cf98bbcb7387de588e4e949380787af000a4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 15 May 2015 21:16:42 +0300 Subject: eth/downloader: fix #992, where tests may time out on a slow machine --- eth/downloader/downloader_test.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d55664314..19d64ac67 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -472,6 +472,9 @@ func TestMadeupHashChainAttack(t *testing.T) { // Tests that if a malicious peer makes up a random block chain, and tried to // push indefinitely, it actually gets caught with it. func TestMadeupBlockChainAttack(t *testing.T) { + defaultBlockTTL := blockTTL + defaultCrossCheckCycle := crossCheckCycle + blockTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond @@ -490,6 +493,9 @@ func TestMadeupBlockChainAttack(t *testing.T) { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) } // Ensure that a valid chain can still pass sync + blockTTL = defaultBlockTTL + crossCheckCycle = defaultCrossCheckCycle + tester.hashes = hashes tester.newPeer("valid", big.NewInt(20000), hashes[0]) if _, err := tester.syncTake("valid", hashes[0]); err != nil { -- cgit v1.2.3 From 55d85d60fdad2cbd7947d87b2a81bd8df6a18025 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 15 May 2015 16:47:44 +0200 Subject: eth, cmd/geth: start mining from console respects CLI flag --- eth/backend.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 064018955..a7107f8d8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -190,6 +190,7 @@ type Ethereum struct { // logger logger.LogSystem Mining bool + MinerThreads int NatSpec bool DataDir string etherbase common.Address @@ -262,6 +263,7 @@ func New(config *Config) (*Ethereum, error) { ethVersionId: config.ProtocolVersion, netVersionId: config.NetworkId, NatSpec: config.NatSpec, + MinerThreads: config.MinerThreads, } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) -- cgit v1.2.3