From 057d36b0499a9f4684c2789448ccf6c5640b1b6c Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Sun, 31 May 2015 13:48:27 -0500 Subject: Update bootnode --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 18e214d44..98939b1fa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -45,7 +45,7 @@ var ( defaultBootNodes = []*discover.Node{ // ETH/DEV Go Bootnodes discover.MustParseNode("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"), - discover.MustParseNode("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"), + discover.MustParseNode("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"), // ETH/DEV cpp-ethereum (poc-9.ethdev.com) discover.MustParseNode("enode://487611428e6c99a11a9795a6abe7b529e81315ca6aad66e2a2fc76e3adf263faba0d35466c2f8f68d561dbefa8878d4df5f1f2ddb1fbeab7f42ffb8cd328bd4a@5.1.83.226:30303"), } -- cgit v1.2.3 From c9a546c310d82eb00e0e76a5e73d2ff7d601f8f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 3 Jun 2015 14:39:21 +0300 Subject: eth/downloader: add a basic block download congestion control --- eth/downloader/downloader.go | 14 ++++++------ eth/downloader/downloader_test.go | 22 +++++++++---------- eth/downloader/peer.go | 45 ++++++++++++++++++++++++++++++++++----- eth/downloader/queue.go | 10 +++++++-- eth/downloader/queue_test.go | 31 --------------------------- 5 files changed, 67 insertions(+), 55 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 85531ce15..f52a97610 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -27,9 +27,10 @@ const ( ) var ( - blockTTL = 5 * time.Second // Time it takes for a block request to time out - crossCheckCycle = time.Second // Period after which to check for expired cross checks - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth + blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired + crossCheckCycle = time.Second // Period after which to check for expired cross checks + minDesiredPeerCount = 5 // Amount of peers desired to start syncing ) var ( @@ -324,7 +325,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent) d.checks[origin] = &crossCheck{ - expire: time.Now().Add(blockTTL), + expire: time.Now().Add(blockSoftTTL), parent: parent, } active.getBlocks([]common.Hash{origin}) @@ -429,6 +430,7 @@ out: // Peer did deliver, but some blocks were off, penalize glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) peer.Demote() + peer.SetIdle() break } if glog.V(logger.Debug) && len(blockPack.blocks) > 0 { @@ -444,7 +446,7 @@ out: // that badly or poorly behave are removed from the peer set (not banned). // Bad peers are excluded from the available peer set and therefor won't be // reused. XXX We could re-introduce peers after X time. - badPeers := d.queue.Expire(blockTTL) + badPeers := d.queue.Expire(blockHardTTL) for _, pid := range badPeers { // XXX We could make use of a reputation system here ranking peers // in their performance @@ -475,7 +477,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer, MaxBlockFetch) + request := d.queue.Reserve(peer) if request == nil { continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 66be1ca18..ef94ddbab 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -191,7 +191,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash func TestDownload(t *testing.T) { minDesiredPeerCount = 4 - blockTTL = 1 * time.Second + blockHardTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -240,7 +240,7 @@ func TestMissing(t *testing.T) { func TestTaking(t *testing.T) { minDesiredPeerCount = 4 - blockTTL = 1 * time.Second + blockHardTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -281,7 +281,7 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { minDesiredPeerCount = 4 - blockTTL = 1 * time.Second + blockHardTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -307,7 +307,7 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 - blockTTL = 1 * time.Second + blockHardTTL = 1 * time.Second targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) @@ -461,7 +461,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Tests that if a malicious peer makes up a random hash chain and tries to push // indefinitely, it actually gets caught with it. func TestMadeupHashChainAttack(t *testing.T) { - blockTTL = 100 * time.Millisecond + blockSoftTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond // Create a long chain of hashes without backing blocks @@ -495,10 +495,10 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Tests that if a malicious peer makes up a random block chain, and tried to // push indefinitely, it actually gets caught with it. func TestMadeupBlockChainAttack(t *testing.T) { - defaultBlockTTL := blockTTL + defaultBlockTTL := blockSoftTTL defaultCrossCheckCycle := crossCheckCycle - blockTTL = 100 * time.Millisecond + blockSoftTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second @@ -516,7 +516,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) } // Ensure that a valid chain can still pass sync - blockTTL = defaultBlockTTL + blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle tester.hashes = hashes @@ -530,10 +530,10 @@ func TestMadeupBlockChainAttack(t *testing.T) { // attacker make up a valid hashes for random blocks, but also forges the block // parents to point to existing hashes. func TestMadeupParentBlockChainAttack(t *testing.T) { - defaultBlockTTL := blockTTL + defaultBlockTTL := blockSoftTTL defaultCrossCheckCycle := crossCheckCycle - blockTTL = 100 * time.Millisecond + blockSoftTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second @@ -550,7 +550,7 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) } // Ensure that a valid chain can still pass sync - blockTTL = defaultBlockTTL + blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle tester.blocks = blocks diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4abae8d5e..df54eecbd 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,10 +5,14 @@ package downloader import ( "errors" + "math" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) @@ -27,14 +31,15 @@ type peer struct { head common.Hash // Hash of the peers latest known block idle int32 // Current activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation (not used currently) + rep int32 // Simple peer reputation - mu sync.RWMutex + capacity int32 // Number of blocks allowed to fetch per request + started time.Time // Time instance when the last fetch was started - ignored *set.Set + ignored *set.Set // Set of hashes not to request (didn't have previously) - getHashes hashFetcherFn - getBlocks blockFetcherFn + getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) + getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) } // newPeer create a new downloader peer, with specific hash and block retrieval @@ -43,6 +48,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo return &peer{ id: id, head: head, + capacity: 1, getHashes: getHashes, getBlocks: getBlocks, ignored: set.New(), @@ -52,6 +58,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo // Reset clears the internal state of a peer entity. func (p *peer) Reset() { atomic.StoreInt32(&p.idle, 0) + atomic.StoreInt32(&p.capacity, 1) p.ignored.Clear() } @@ -61,6 +68,8 @@ func (p *peer) Fetch(request *fetchRequest) error { if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { return errAlreadyFetching } + p.started = time.Now() + // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) for hash, _ := range request.Hashes { @@ -72,10 +81,36 @@ func (p *peer) Fetch(request *fetchRequest) error { } // SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. func (p *peer) SetIdle() { + // Update the peer's download allowance based on previous performance + scale := 2.0 + if time.Since(p.started) > blockSoftTTL { + scale = 0.5 + } + for { + // Calculate the new download bandwidth allowance + prev := atomic.LoadInt32(&p.capacity) + next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) + if scale < 1 { + glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next) + } + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + break + } + } + // Set the peer to idle to allow further block requests atomic.StoreInt32(&p.idle, 0) } +// Capacity retrieves the peers block download allowance based on its previously +// discovered bandwidth capacity. +func (p *peer) Capacity() int { + return int(atomic.LoadInt32(&p.capacity)) +} + // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 7ea400dc4..69d91512a 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -203,7 +203,7 @@ func (q *queue) TakeBlocks() []*Block { // Reserve reserves a set of hashes for the given peer, skipping any previously // failed download. -func (q *queue) Reserve(p *peer, max int) *fetchRequest { +func (q *queue) Reserve(p *peer) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -215,11 +215,17 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest { if _, ok := q.pendPool[p.id]; ok { return nil } + // Calculate an upper limit on the hashes we might fetch (i.e. throttling) + space := len(q.blockCache) - len(q.blockPool) + for _, request := range q.pendPool { + space -= len(request.Hashes) + } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) - for len(send) < max && !q.hashQueue.Empty() { + capacity := p.Capacity() + for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index b1f3591f3..ee6141f71 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -1,8 +1,6 @@ package downloader import ( - "testing" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "gopkg.in/fatih/set.v0" @@ -30,32 +28,3 @@ func createBlocksFromHashSet(hashes *set.Set) []*types.Block { return blocks } - -func TestChunking(t *testing.T) { - queue := newQueue() - peer1 := newPeer("peer1", common.Hash{}, nil, nil) - peer2 := newPeer("peer2", common.Hash{}, nil, nil) - - // 99 + 1 (1 == known genesis hash) - hashes := createHashes(0, 99) - queue.Insert(hashes) - - chunk1 := queue.Reserve(peer1, 99) - if chunk1 == nil { - t.Errorf("chunk1 is nil") - t.FailNow() - } - chunk2 := queue.Reserve(peer2, 99) - if chunk2 == nil { - t.Errorf("chunk2 is nil") - t.FailNow() - } - - if len(chunk1.Hashes) != 99 { - t.Error("expected chunk1 hashes to be 99, got", len(chunk1.Hashes)) - } - - if len(chunk2.Hashes) != 1 { - t.Error("expected chunk1 hashes to be 1, got", len(chunk2.Hashes)) - } -} -- cgit v1.2.3 From 3ec159ab6be4dfcc51e339da562466eea38ce8b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 3 Jun 2015 15:43:12 +0300 Subject: eth/downloader: demote peers if they exceed the soft limits at 1 blocks already --- eth/downloader/peer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'eth') diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index df54eecbd..8ef017df7 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -93,11 +93,16 @@ func (p *peer) SetIdle() { // Calculate the new download bandwidth allowance prev := atomic.LoadInt32(&p.capacity) next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) - if scale < 1 { - glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next) - } + // Try to update the old value if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + // If we're having problems at 1 capacity, try to find better peers + if next == 1 { + p.Demote() + } + if prev != next { + glog.V(logger.Detail).Infof("%s: changing block download capacity from %d to %d", p.id, prev, next) + } break } } -- cgit v1.2.3 From 5197aed7dbba2ac19d99221efe33fede82007f5d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Jun 2015 15:56:25 +0200 Subject: cmd/utils, eth: core.NewBlockProcessor no longer needs TxPool --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 98939b1fa..724763a52 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -288,7 +288,7 @@ func New(config *Config) (*Ethereum, error) { eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux()) eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) - eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) + eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) -- cgit v1.2.3 From d09a6e54215bef8b1ac16a99f0b1d75a8a92a6a8 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 3 Jun 2015 22:22:20 +0200 Subject: core, eth, miner: moved nonce management to tx pool. Removed the managed tx state from the chain manager to the transaction pool where it's much easier to keep track of nonces (and manage them). The transaction pool now also uses the queue and pending txs differently where queued txs are now moved over to the pending queue (i.e. txs ready for processing and propagation). --- eth/backend.go | 29 ----------------------------- 1 file changed, 29 deletions(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 724763a52..3956dfcaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -198,7 +198,6 @@ type Ethereum struct { net *p2p.Server eventMux *event.TypeMux - txSub event.Subscription miner *miner.Miner // logger logger.LogSystem @@ -470,10 +469,6 @@ func (s *Ethereum) Start() error { s.whisper.Start() } - // broadcast transactions - s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) - go s.txBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } @@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.net.Stop() s.protocolManager.Stop() s.chainManager.Stop() @@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() { <-s.shutdownChan } -func (self *Ethereum) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.(core.TxPreEvent) - self.syncAccounts(event.Tx) - } -} - -// keep accounts synced up -func (self *Ethereum) syncAccounts(tx *types.Transaction) { - from, err := tx.From() - if err != nil { - return - } - - if self.accountManager.HasAccount(from) { - if self.chainManager.TxState().GetNonce(from) < tx.Nonce() { - self.chainManager.TxState().SetNonce(from, tx.Nonce()) - } - } -} - // StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval // by default that is 10 times per epoch // in epoch n, if we past autoDAGepochHeight within-epoch blocks, -- cgit v1.2.3 From 28c32d1b1bc9ed687713aa4de4eaab38d2a4cb08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 14:51:14 +0300 Subject: eth/downloader: fix #1178, don't request blocks beyond the cache bounds --- eth/downloader/downloader.go | 38 ++++++++++++++++++++++++-------------- eth/downloader/peer.go | 10 ++++++++++ eth/downloader/queue.go | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f52a97610..806f60f1b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -281,19 +281,19 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != active.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) return errEmptyHashSet } for _, hash := range hashPack.hashes { if d.banned.Has(hash) { - glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) return ErrInvalidChain } } @@ -301,7 +301,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { done, index := false, 0 for index, head = range hashPack.hashes { if d.hasBlock(head) || d.queue.GetBlock(head) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) + glog.V(logger.Debug).Infof("Found common hash %x", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -310,7 +310,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return ErrBadPeer } if !done { @@ -365,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } case <-timeout.C: - glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id) var p *peer // p will be set if a peer can be found // Attempt to find a new peer by checking inclusion of peers best hash in our @@ -386,10 +386,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // by our previous (delayed) peer. active = p p.getHashes(head) - glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) + glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } - glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) + glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start)) return nil } @@ -421,22 +421,29 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks + // Deliver the received chunk of blocks, and demote in case of errors if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { if err == ErrInvalidChain { // The hash chain is invalid (blocks are not ordered properly), abort return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) peer.Demote() peer.SetIdle() break } - if glog.V(logger.Debug) && len(blockPack.blocks) > 0 { - glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) + if len(blockPack.blocks) == 0 { + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + peer.Demote() + peer.SetIdle() + break + } + // All was successful, promote the peer + if glog.V(logger.Detail) && len(blockPack.blocks) > 0 { + glog.Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } - // Promote the peer and update it's idle state peer.Promote() peer.SetIdle() } @@ -481,11 +488,14 @@ out: if request == nil { continue } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) + glog.V(logger.Error).Infof("Peer %s received double work", peer.id) d.queue.Cancel(request) } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 8ef017df7..2b3f8798e 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,6 +5,7 @@ package downloader import ( "errors" + "fmt" "math" "sync" "sync/atomic" @@ -135,6 +136,15 @@ func (p *peer) Demote() { } } +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ + fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("ignored %4d", p.ignored.Size()), + ) +} + // peerSet represents the collection of active peer participating in the block // download procedure. type peerSet struct { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 69d91512a..02fa667f1 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -225,7 +225,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest { skip := make(map[common.Hash]int) capacity := p.Capacity() - for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() { + for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) -- cgit v1.2.3 From 24cca2f18d4a7abb103179a289db09c2d11bd2a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 15:10:43 +0300 Subject: eth/downloader: log after state updates, easier to debug --- eth/downloader/downloader.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 806f60f1b..d8dbef726 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -428,24 +428,22 @@ out: return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) peer.Demote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) break } // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) if len(blockPack.blocks) == 0 { - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) peer.Demote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } // All was successful, promote the peer - if glog.V(logger.Detail) && len(blockPack.blocks) > 0 { - glog.Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - } peer.Promote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding -- cgit v1.2.3 From d754c25cc87172dfafeea71116da2260544a3f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 16:22:55 +0300 Subject: eth/downloader: drop log entry from peer, it's covered already --- eth/downloader/peer.go | 5 ----- 1 file changed, 5 deletions(-) (limited to 'eth') diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 2b3f8798e..43b50079b 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -12,8 +12,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) @@ -101,9 +99,6 @@ func (p *peer) SetIdle() { if next == 1 { p.Demote() } - if prev != next { - glog.V(logger.Detail).Infof("%s: changing block download capacity from %d to %d", p.id, prev, next) - } break } } -- cgit v1.2.3 From 94e4aa6ea9aabb5bf6244d9b38607b336703af98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 5 Jun 2015 11:53:46 +0300 Subject: eth/downloader: log hard timeouts and reset capacity --- eth/downloader/downloader.go | 1 + eth/downloader/peer.go | 3 +++ 2 files changed, 4 insertions(+) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d8dbef726..626f47b7b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -460,6 +460,7 @@ out: // 3) Amount and availability. if peer := d.peers.Peer(pid); peer != nil { peer.Demote() + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 43b50079b..5fbc64648 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -87,6 +87,9 @@ func (p *peer) SetIdle() { scale := 2.0 if time.Since(p.started) > blockSoftTTL { scale = 0.5 + if time.Since(p.started) > blockHardTTL { + scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1 + } } for { // Calculate the new download bandwidth allowance -- cgit v1.2.3 From 328ef60b856f75bf664fb103bc54674d962bef2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 5 Jun 2015 12:37:48 +0300 Subject: eth/downloader: differentiate stale and nonexistent deliveries --- eth/downloader/downloader.go | 50 ++++++++++++++++++++++++++++++-------------- eth/downloader/queue.go | 15 ++++++++++--- 2 files changed, 46 insertions(+), 19 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 626f47b7b..cc1ceb637 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -422,28 +422,46 @@ out: // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { // Deliver the received chunk of blocks, and demote in case of errors - if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { - if err == ErrInvalidChain { - // The hash chain is invalid (blocks are not ordered properly), abort - return err + err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) + switch err { + case nil: + // If no blocks were delivered, demote the peer (need the delivery above) + if len(blockPack.blocks) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + break } - // Peer did deliver, but some blocks were off, penalize + // All was successful, promote the peer + peer.Promote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + + case ErrInvalidChain: + // The hash chain is invalid (blocks are not ordered properly), abort + return err + + case errNoFetchesPending: + // Peer probably timed out with its delivery but came through + // in the end, demote, but allow to to pull from this peer. peer.Demote() peer.SetIdle() - glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) - break - } - // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) - if len(blockPack.blocks) == 0 { + glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + + case errStaleDelivery: + // Delivered something completely else than requested, usually + // caused by a timeout and delivery during a new sync cycle. + // Don't set it to idle as the original request should still be + // in flight. + peer.Demote() + glog.V(logger.Detail).Infof("%s: stale delivery", peer) + + default: + // Peer did something semi-useful, demote but keep it around peer.Demote() peer.SetIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break + glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) } - // All was successful, promote the peer - peer.Promote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 02fa667f1..671ffe51b 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -20,6 +20,11 @@ const ( blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download ) +var ( + errNoFetchesPending = errors.New("no fetches pending") + errStaleDelivery = errors.New("stale delivery") +) + // fetchRequest is a currently running block retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent @@ -293,7 +298,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // Short circuit if the blocks were never requested request := q.pendPool[id] if request == nil { - return errors.New("no fetches pending") + return errNoFetchesPending } delete(q.pendPool, id) @@ -309,7 +314,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // Skip any blocks that were not requested hash := block.Hash() if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested block %v", hash)) + errs = append(errs, fmt.Errorf("non-requested block %x", hash)) continue } // If a requested block falls out of the range, the hash chain is invalid @@ -326,11 +331,15 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { delete(q.hashPool, hash) q.blockPool[hash] = int(block.NumberU64()) } - // Return all failed fetches to the queue + // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // If none of the blocks were good, it's a stale delivery if len(errs) != 0 { + if len(errs) == len(blocks) { + return errStaleDelivery + } return fmt.Errorf("multiple failures: %v", errs) } return nil -- cgit v1.2.3 From 94e525ae12bf3455ae434feb83ad834c8dcfa1ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 26 May 2015 12:44:09 +0300 Subject: eth, eth/downloader: fix #1098, elevate empty hash errors to peer drops --- eth/downloader/downloader.go | 4 ++-- eth/sync.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d8dbef726..6f1c94a91 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -42,7 +42,7 @@ var ( errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") + ErrEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") ErrInvalidChain = errors.New("retrieved hash chain is invalid") @@ -289,7 +289,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) - return errEmptyHashSet + return ErrEmptyHashSet } for _, hash := range hashPack.hashes { if d.banned.Has(hash) { diff --git a/eth/sync.go b/eth/sync.go index 76e137630..56084f2f0 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -109,7 +109,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrBusy: glog.V(logger.Debug).Infof("Synchronisation already in progress") - case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: + case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) pm.removePeer(peer.id) -- cgit v1.2.3 From eedb25b22ac466d8165153589b0e4a7c7de69128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 29 May 2015 19:47:00 +0300 Subject: eth/downloader: clean up tests and unused variables --- eth/downloader/downloader.go | 10 +- eth/downloader/downloader_test.go | 189 +++++++++++++++----------------------- 2 files changed, 79 insertions(+), 120 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9f7f34559..de6ee1734 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -22,15 +22,13 @@ const ( MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request - peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - hashTTL = 5 * time.Second // Time it takes for a hash request to time out + hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) var ( - blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired - crossCheckCycle = time.Second // Period after which to check for expired cross checks - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth + blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired + crossCheckCycle = time.Second // Period after which to check for expired cross checks ) var ( diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ef94ddbab..434861c61 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -21,7 +21,7 @@ func createHashes(start, amount int) (hashes []common.Hash) { hashes[len(hashes)-1] = knownHash for i := range hashes[:len(hashes)-1] { - binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2)) + binary.BigEndian.PutUint64(hashes[i][:8], uint64(start+i+2)) } return } @@ -56,7 +56,6 @@ type downloadTester struct { maxHashFetch int // Overrides the maximum number of retrieved hashes t *testing.T - pcount int done chan bool activePeerId string } @@ -114,12 +113,6 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e return took, err } -func (dl *downloadTester) insertBlocks(blocks types.Blocks) { - for _, block := range blocks { - dl.chain = append(dl.chain, block.Hash()) - } -} - func (dl *downloadTester) hasBlock(hash common.Hash) bool { for _, h := range dl.chain { if h == hash { @@ -175,157 +168,125 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { } func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) { - dl.pcount++ - dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) } -func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash) { - dl.pcount++ - - // This bad peer never returns any blocks - dl.downloader.RegisterPeer(id, hash, dl.getHashes, func([]common.Hash) error { - return nil - }) -} - -func TestDownload(t *testing.T) { - minDesiredPeerCount = 4 - blockHardTTL = 1 * time.Second - - targetBlocks := 1000 +// Tests that simple synchronization, without throttling from a good peer works. +func TestSynchronisation(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - - tester.newPeer("peer1", big.NewInt(10000), hashes[0]) - tester.newPeer("peer2", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) - tester.activePeerId = "peer1" - err := tester.sync("peer1", hashes[0]) - if err != nil { - t.Error("download error", err) - } - - inqueue := len(tester.downloader.queue.blockCache) - if inqueue != targetBlocks { - t.Error("expected", targetBlocks, "have", inqueue) - } -} - -func TestMissing(t *testing.T) { - targetBlocks := 1000 - hashes := createHashes(0, 1000) - extraHashes := createHashes(1001, 1003) - blocks := createBlocksFromHashes(append(extraHashes, hashes...)) tester := newTester(t, hashes, blocks) + tester.newPeer("peer", big.NewInt(10000), hashes[0]) - tester.newPeer("peer1", big.NewInt(10000), hashes[len(hashes)-1]) - - hashes = append(extraHashes, hashes[:len(hashes)-1]...) - tester.newPeer("peer2", big.NewInt(0), common.Hash{}) - - err := tester.sync("peer1", hashes[0]) - if err != nil { - t.Error("download error", err) + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - - inqueue := len(tester.downloader.queue.blockCache) - if inqueue != targetBlocks { - t.Error("expected", targetBlocks, "have", inqueue) + if queued := len(tester.downloader.queue.blockCache); queued != targetBlocks { + t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) } } -func TestTaking(t *testing.T) { - minDesiredPeerCount = 4 - blockHardTTL = 1 * time.Second - - targetBlocks := 1000 +// Tests that the synchronized blocks can be correctly retrieved. +func TestBlockTaking(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer1", big.NewInt(10000), hashes[0]) - tester.newPeer("peer2", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) + tester := newTester(t, hashes, blocks) + tester.newPeer("peer", big.NewInt(10000), hashes[0]) - err := tester.sync("peer1", hashes[0]) - if err != nil { - t.Error("download error", err) + // Synchronise with the peer and test block retrieval + if err := tester.sync("peer", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - bs := tester.downloader.TakeBlocks() - if len(bs) != targetBlocks { - t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) + if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { + t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks) } } +// Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader(t *testing.T) { - targetBlocks := 1000 + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashSet(createHashSet(hashes)) - tester := newTester(t, hashes, nil) - err := tester.downloader.DeliverHashes("bad peer 001", hashes) - if err != errNoSyncActive { - t.Error("expected no sync error, got", err) - } + tester := newTester(t, nil, nil) - err = tester.downloader.DeliverBlocks("bad peer 001", blocks) - if err != errNoSyncActive { - t.Error("expected no sync error, got", err) + // Check that neither hashes nor blocks are accepted + if err := tester.downloader.DeliverHashes("bad peer", hashes); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBlocks("bad peer", blocks); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } +// Tests that a canceled download wipes all previously accumulated state. func TestCancel(t *testing.T) { - minDesiredPeerCount = 4 - blockHardTTL = 1 * time.Second - - targetBlocks := 1000 + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer1", big.NewInt(10000), hashes[0]) + tester := newTester(t, hashes, blocks) + tester.newPeer("peer", big.NewInt(10000), hashes[0]) - err := tester.sync("peer1", hashes[0]) - if err != nil { - t.Error("download error", err) + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - if !tester.downloader.Cancel() { - t.Error("cancel operation unsuccessfull") + t.Fatalf("cancel operation failed") } - - hashSize, blockSize := tester.downloader.queue.Size() - if hashSize > 0 || blockSize > 0 { - t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0") + // Make sure the queue reports empty and no blocks can be taken + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + if took := tester.downloader.TakeBlocks(); len(took) != 0 { + t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0) } } +// Tests that if a large batch of blocks are being downloaded, it is throttled +// until the cached blocks are retrieved. func TestThrottling(t *testing.T) { - minDesiredPeerCount = 4 - blockHardTTL = 1 * time.Second - - targetBlocks := 16 * blockCacheLimit + // Create a long block chain to download and the tester + targetBlocks := 8 * blockCacheLimit hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer1", big.NewInt(10000), hashes[0]) - tester.newPeer("peer2", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{}) - tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) + tester := newTester(t, hashes, blocks) + tester.newPeer("peer", big.NewInt(10000), hashes[0]) - // Concurrently download and take the blocks - took, err := tester.syncTake("peer1", hashes[0]) - if err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + // Start a synchronisation concurrently + errc := make(chan error) + go func() { + errc <- tester.sync("peer", hashes[0]) + }() + // Iteratively take some blocks, always checking the retrieval count + for total := 0; total < targetBlocks; { + // Sleep a bit for sync to complete + time.Sleep(250 * time.Millisecond) + + // Fetch the next batch of blocks + took := tester.downloader.TakeBlocks() + if len(took) != blockCacheLimit { + t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit) + } + total += len(took) + if total > targetBlocks { + t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks) + } } - if len(took) != targetBlocks { - t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) + if err := <-errc; err != nil { + t.Fatalf("block synchronization failed: %v", err) } } -- cgit v1.2.3 From 84bc93d8cb3ca2beab96b567507873e53ce80056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 29 May 2015 21:04:20 +0300 Subject: eth/downloader: accumulating hash bans for reconnecting attackers --- eth/downloader/downloader.go | 96 +++++++++++++++++++++++++++++++++++++-- eth/downloader/downloader_test.go | 35 ++++++++++++++ 2 files changed, 126 insertions(+), 5 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index de6ee1734..b0d55bc44 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,7 +1,9 @@ package downloader import ( + "bytes" "errors" + "fmt" "math/rand" "sync" "sync/atomic" @@ -289,9 +291,15 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) return ErrEmptyHashSet } - for _, hash := range hashPack.hashes { + for index, hash := range hashPack.hashes { if d.banned.Has(hash) { glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) + + d.queue.Insert(hashPack.hashes[:index+1]) + if err := d.banBlocks(active.id, hash); err != nil { + fmt.Println("ban err", err) + glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) + } return ErrInvalidChain } } @@ -399,8 +407,10 @@ func (d *Downloader) fetchBlocks() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") start := time.Now() - // default ticker for re-fetching blocks every now and then + // Start a ticker to continue throttled downloads and check for bad peers ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + out: for { select { @@ -413,7 +423,7 @@ out: block := blockPack.blocks[0] if _, ok := d.checks[block.Hash()]; ok { delete(d.checks, block.Hash()) - continue + break } } // If the peer was previously banned and failed to deliver it's pack @@ -488,7 +498,7 @@ out: if d.queue.Pending() > 0 { // Throttle the download if block cache is full and waiting processing if d.queue.Throttle() { - continue + break } // Send a download request to all idle peers, until throttled idlePeers := d.peers.IdlePeers() @@ -529,10 +539,86 @@ out: } } glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start)) - return nil } +// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes, +// and bans the head of the retrieved batch. +// +// This method only fetches one single batch as the goal is not ban an entire +// (potentially long) invalid chain - wasting a lot of time in the meanwhile -, +// but rather to gradually build up a blacklist if the peer keeps reconnecting. +func (d *Downloader) banBlocks(peerId string, head common.Hash) error { + glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId) + + // Ask the peer being banned for a batch of blocks from the banning point + peer := d.peers.Peer(peerId) + if peer == nil { + return nil + } + request := d.queue.Reserve(peer, MaxBlockFetch) + if request == nil { + return nil + } + if err := peer.Fetch(request); err != nil { + return err + } + // Wait a bit for the reply to arrive, and ban if done so + timeout := time.After(blockTTL) + for { + select { + case <-d.cancelCh: + return errCancelBlockFetch + + case <-timeout: + return ErrTimeout + + case blockPack := <-d.blockCh: + blocks := blockPack.blocks + + // Short circuit if it's a stale cross check + if len(blocks) == 1 { + block := blocks[0] + if _, ok := d.checks[block.Hash()]; ok { + delete(d.checks, block.Hash()) + break + } + } + // Short circuit if it's not from the peer being banned + if blockPack.peerId != peerId { + break + } + // Short circuit if no blocks were returned + if len(blocks) == 0 { + return errors.New("no blocks returned to ban") + } + // Got the batch of invalid blocks, reconstruct their chain order + for i := 0; i < len(blocks); i++ { + for j := i + 1; j < len(blocks); j++ { + if blocks[i].NumberU64() > blocks[j].NumberU64() { + blocks[i], blocks[j] = blocks[j], blocks[i] + } + } + } + // Ensure we're really banning the correct blocks + if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 { + return errors.New("head block not the banned one") + } + index := 0 + for _, block := range blocks[1:] { + if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 { + break + } + index++ + } + d.banned.Add(blocks[index].Hash()) + + glog.V(logger.Debug).Infof("Banned %d blocks from: %s\n", index+1, peerId) + return nil + } + } +} + // DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 434861c61..288072164 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -14,6 +14,7 @@ import ( var ( knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} + bannedHash = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5} ) func createHashes(start, amount int) (hashes []common.Hash) { @@ -520,3 +521,37 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Tests that if one/multiple malicious peers try to feed a banned blockchain to +// the downloader, it will not keep refetching the same chain indefinitely, but +// gradually block pieces of it, until it's head is also blocked. +func TestBannedChainStarvationAttack(t *testing.T) { + // Construct a valid chain, but ban one of the hashes in it + hashes := createHashes(0, 8*blockCacheLimit) + hashes[len(hashes)/2+23] = bannedHash // weird index to have non multiple of ban chunk size + + blocks := createBlocksFromHashes(hashes) + + // Create the tester and ban the selected hash + tester := newTester(t, hashes, blocks) + tester.downloader.banned.Add(bannedHash) + + // Iteratively try to sync, and verify that the banned hash list grows until + // the head of the invalid chain is blocked too. + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + for banned := tester.downloader.banned.Size(); ; { + // Try to sync with the attacker, check hash chain failure + if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + } + // Check that the ban list grew with at least 1 new item, or all banned + bans := tester.downloader.banned.Size() + if bans < banned+1 { + if tester.downloader.banned.Has(hashes[0]) { + break + } + t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1) + } + banned = bans + } +} -- cgit v1.2.3 From abdfcda4dd223f2a2a932628da1e9388d2670856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 29 May 2015 21:15:28 +0300 Subject: eth/downloader: short circuit sync if head hash is banned --- eth/downloader/downloader.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b0d55bc44..af9b6b2b1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -165,6 +165,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // If the head hash is banned, terminate immediately + if d.banned.Has(hash) { + return ErrInvalidChain + } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") -- cgit v1.2.3 From 0275fcb3d3979d14e01b4e0b9980be6d0cafbc8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Sat, 30 May 2015 00:34:23 +0300 Subject: eth/downloader: clean up and simplify the code a bit --- eth/downloader/downloader.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index af9b6b2b1..159a06bd4 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -3,20 +3,18 @@ package downloader import ( "bytes" "errors" - "fmt" "math/rand" "sync" "sync/atomic" "time" - "gopkg.in/fatih/set.v0" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "gopkg.in/fatih/set.v0" ) const ( @@ -301,7 +299,6 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { d.queue.Insert(hashPack.hashes[:index+1]) if err := d.banBlocks(active.id, hash); err != nil { - fmt.Println("ban err", err) glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } return ErrInvalidChain @@ -596,15 +593,8 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { if len(blocks) == 0 { return errors.New("no blocks returned to ban") } - // Got the batch of invalid blocks, reconstruct their chain order - for i := 0; i < len(blocks); i++ { - for j := i + 1; j < len(blocks); j++ { - if blocks[i].NumberU64() > blocks[j].NumberU64() { - blocks[i], blocks[j] = blocks[j], blocks[i] - } - } - } - // Ensure we're really banning the correct blocks + // Reconstruct the original chain order and ensure we're banning the correct blocks + types.BlockBy(types.Number).Sort(blocks) if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 { return errors.New("head block not the banned one") } -- cgit v1.2.3 From 9da0232eef7e7abd9f036fccb231220e272e6049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Sat, 30 May 2015 00:45:22 +0300 Subject: eth/downloader: update test for shitty travis --- eth/downloader/downloader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 288072164..cab213499 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -274,7 +274,7 @@ func TestThrottling(t *testing.T) { // Iteratively take some blocks, always checking the retrieval count for total := 0; total < targetBlocks; { // Sleep a bit for sync to complete - time.Sleep(250 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // Fetch the next batch of blocks took := tester.downloader.TakeBlocks() -- cgit v1.2.3 From 6d497f61c65b1c02c4cfb5f4e0673574a0cd17d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 2 Jun 2015 15:57:07 +0300 Subject: eth/downloader: don't block hash deliveries while pulling blocks --- eth/downloader/downloader.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 159a06bd4..4b837eed5 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -418,6 +418,9 @@ out: case <-d.cancelCh: return errCancelBlockFetch + case <-d.hashCh: + // Out of bounds hashes received, ignore them + case blockPack := <-d.blockCh: // Short circuit if it's a stale cross check if len(blockPack.blocks) == 1 { @@ -472,30 +475,21 @@ out: glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) } } + case <-ticker.C: - // Check for bad peers. Bad peers may indicate a peer not responding - // to a `getBlocks` message. A timeout of 5 seconds is set. Peers - // that badly or poorly behave are removed from the peer set (not banned). - // Bad peers are excluded from the available peer set and therefor won't be - // reused. XXX We could re-introduce peers after X time. + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for block request timeouts and demote the responsible peers badPeers := d.queue.Expire(blockHardTTL) for _, pid := range badPeers { - // XXX We could make use of a reputation system here ranking peers - // in their performance - // 1) Time for them to respond; - // 2) Measure their speed; - // 3) Amount and availability. if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) } } - // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if d.peers.Len() == 0 { - return errNoPeers - } - // If there are unrequested hashes left start fetching - // from the available peers. + // If there are unrequested hashes left start fetching from the available peers if d.queue.Pending() > 0 { // Throttle the download if block cache is full and waiting processing if d.queue.Throttle() { @@ -565,7 +559,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { return err } // Wait a bit for the reply to arrive, and ban if done so - timeout := time.After(blockTTL) + timeout := time.After(blockHardTTL) for { select { case <-d.cancelCh: @@ -574,6 +568,9 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { case <-timeout: return ErrTimeout + case <-d.hashCh: + // Out of bounds hashes received, ignore them + case blockPack := <-d.blockCh: blocks := blockPack.blocks -- cgit v1.2.3 From 1d7bf3d39fbd6b1a53913bb309bc07500b220ded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 3 Jun 2015 18:39:56 +0300 Subject: eth/downloader: fix merge compile error --- eth/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4b837eed5..d113aa33f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -551,7 +551,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { if peer == nil { return nil } - request := d.queue.Reserve(peer, MaxBlockFetch) + request := d.queue.Reserve(peer) if request == nil { return nil } -- cgit v1.2.3 From b40c796ff726d54efc8c7933e1586869c2a0985a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 3 Jun 2015 19:00:54 +0300 Subject: eth/downloader: preallocate the block cache --- eth/downloader/downloader.go | 8 ++++---- eth/downloader/downloader_test.go | 2 +- eth/downloader/queue.go | 25 +++++++++---------------- 3 files changed, 14 insertions(+), 21 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d113aa33f..cd2fd81f1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -341,12 +341,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { active.getHashes(head) continue } - // We're done, allocate the download cache and proceed pulling the blocks + // We're done, prepare the download cache and proceed pulling the blocks offset := 0 if block := d.getBlock(head); block != nil { offset = int(block.NumberU64() + 1) } - d.queue.Alloc(offset) + d.queue.Prepare(offset) finished = true case blockPack := <-d.blockCh: @@ -504,7 +504,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer) + request := d.queue.Reserve(peer, peer.Capacity()) if request == nil { continue } @@ -551,7 +551,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { if peer == nil { return nil } - request := d.queue.Reserve(peer) + request := d.queue.Reserve(peer, MaxBlockFetch) if request == nil { return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index cab213499..4e2d527b9 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -186,7 +186,7 @@ func TestSynchronisation(t *testing.T) { if err := tester.sync("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if queued := len(tester.downloader.queue.blockCache); queued != targetBlocks { + if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 671ffe51b..79ddbb857 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -50,10 +50,11 @@ type queue struct { // newQueue creates a new download queue for scheduling block retrieval. func newQueue() *queue { return &queue{ - hashPool: make(map[common.Hash]int), - hashQueue: prque.New(), - pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]int), + hashPool: make(map[common.Hash]int), + hashQueue: prque.New(), + pendPool: make(map[string]*fetchRequest), + blockPool: make(map[common.Hash]int), + blockCache: make([]*Block, blockCacheLimit), } } @@ -70,7 +71,7 @@ func (q *queue) Reset() { q.blockPool = make(map[common.Hash]int) q.blockOffset = 0 - q.blockCache = nil + q.blockCache = make([]*Block, blockCacheLimit) } // Size retrieves the number of hashes in the queue, returning separately for @@ -208,7 +209,7 @@ func (q *queue) TakeBlocks() []*Block { // Reserve reserves a set of hashes for the given peer, skipping any previously // failed download. -func (q *queue) Reserve(p *peer) *fetchRequest { +func (q *queue) Reserve(p *peer, count int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -345,20 +346,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { return nil } -// Alloc ensures that the block cache is the correct size, given a starting -// offset, and a memory cap. -func (q *queue) Alloc(offset int) { +// Prepare configures the block cache offset to allow accepting inbound blocks. +func (q *queue) Prepare(offset int) { q.lock.Lock() defer q.lock.Unlock() if q.blockOffset < offset { q.blockOffset = offset } - size := len(q.hashPool) - if size > blockCacheLimit { - size = blockCacheLimit - } - if len(q.blockCache) < size { - q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...) - } } -- cgit v1.2.3 From 2d627995cf22b9a1187e4b22d430f84541904d2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Sun, 7 Jun 2015 18:41:05 +0300 Subject: eth/downloader: fix another rebase error --- eth/downloader/queue.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 79ddbb857..3c99efb81 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -230,8 +230,7 @@ func (q *queue) Reserve(p *peer, count int) *fetchRequest { send := make(map[common.Hash]int) skip := make(map[common.Hash]int) - capacity := p.Capacity() - for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ { + for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) -- cgit v1.2.3 From 4b2dd44711a04c639ecde68806455ccf7244acce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Sun, 7 Jun 2015 18:46:32 +0300 Subject: eth/downloader: fix throttling test to be less timing dependent --- eth/downloader/downloader_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4e2d527b9..4219c2788 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -273,9 +273,13 @@ func TestThrottling(t *testing.T) { }() // Iteratively take some blocks, always checking the retrieval count for total := 0; total < targetBlocks; { - // Sleep a bit for sync to complete - time.Sleep(500 * time.Millisecond) - + // Wait a bit for sync to complete + for start := time.Now(); time.Since(start) < 3*time.Second; { + time.Sleep(25 * time.Millisecond) + if len(tester.downloader.queue.blockPool) == blockCacheLimit { + break + } + } // Fetch the next batch of blocks took := tester.downloader.TakeBlocks() if len(took) != blockCacheLimit { -- cgit v1.2.3 From 63c6cedb14cbd461733e33ffac016dc7d8e431ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 14:06:36 +0300 Subject: eth/downloader: cap the hash ban set, add test for it --- eth/downloader/downloader.go | 23 ++++++++++++++------- eth/downloader/downloader_test.go | 43 +++++++++++++++++++++++++++++++++++++++ eth/downloader/peer.go | 2 +- eth/downloader/queue.go | 2 +- eth/handler.go | 4 ++-- eth/peer.go | 2 +- 6 files changed, 64 insertions(+), 12 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index cd2fd81f1..7eda49186 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -17,18 +17,17 @@ import ( "gopkg.in/fatih/set.v0" ) -const ( +var ( MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request - hashTTL = 5 * time.Second // Time it takes for a hash request to time out -) - -var ( + hashTTL = 5 * time.Second // Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired crossCheckCycle = time.Second // Period after which to check for expired cross checks + + maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out ) var ( @@ -602,9 +601,19 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { } index++ } + // Ban the head hash and phase out any excess d.banned.Add(blocks[index].Hash()) - - glog.V(logger.Debug).Infof("Banned %d blocks from: %s\n", index+1, peerId) + for d.banned.Size() > maxBannedHashes { + d.banned.Each(func(item interface{}) bool { + // Skip any hard coded bans + if core.BadHashes[item.(common.Hash)] { + return true + } + d.banned.Remove(item) + return false + }) + } + glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId) return nil } } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4219c2788..c9e84371c 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" ) @@ -559,3 +560,45 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } } + +// Tests that if a peer sends excessively many/large invalid chains that are +// gradually banned, it will have an upper limit on the consumed memory and also +// the origin bad hashes will not be evacuated. +func TestBannedChainMemoryExhaustionAttack(t *testing.T) { + // Reduce the test size a bit + MaxBlockFetch = 4 + maxBannedHashes = 256 + + // Construct a banned chain with more chunks than the ban limit + hashes := createHashes(0, maxBannedHashes*MaxBlockFetch) + hashes[len(hashes)-1] = bannedHash // weird index to have non multiple of ban chunk size + + blocks := createBlocksFromHashes(hashes) + + // Create the tester and ban the selected hash + tester := newTester(t, hashes, blocks) + tester.downloader.banned.Add(bannedHash) + + // Iteratively try to sync, and verify that the banned hash list grows until + // the head of the invalid chain is blocked too. + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + for { + // Try to sync with the attacker, check hash chain failure + if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + } + // Short circuit if the entire chain was banned + if tester.downloader.banned.Has(hashes[0]) { + break + } + // Otherwise ensure we never exceed the memory allowance and the hard coded bans are untouched + if bans := tester.downloader.banned.Size(); bans > maxBannedHashes { + t.Fatalf("ban cap exceeded: have %v, want max %v", bans, maxBannedHashes) + } + for hash, _ := range core.BadHashes { + if !tester.downloader.banned.Has(hash) { + t.Fatalf("hard coded ban evacuated: %x", hash) + } + } + } +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 5fbc64648..9614a6951 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -94,7 +94,7 @@ func (p *peer) SetIdle() { for { // Calculate the new download bandwidth allowance prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) + next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale))) // Try to update the old value if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 3c99efb81..7abbd42fd 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -16,7 +16,7 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -const ( +var ( blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download ) diff --git a/eth/handler.go b/eth/handler.go index aea33452c..37bbd3691 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -213,8 +213,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "->msg %v: %v", msg, err) } - if request.Amount > downloader.MaxHashFetch { - request.Amount = downloader.MaxHashFetch + if request.Amount > uint64(downloader.MaxHashFetch) { + request.Amount = uint64(downloader.MaxHashFetch) } hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) diff --git a/eth/peer.go b/eth/peer.go index bb6a20349..b2fa20ebb 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -102,7 +102,7 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { func (p *peer) requestHashes(from common.Hash) error { glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, uint64(downloader.MaxHashFetch)}) } func (p *peer) requestBlocks(hashes []common.Hash) error { -- cgit v1.2.3 From c4f224932f69099f595211755ebc0a9933616315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 14:46:31 +0300 Subject: eth/downloader: reject peer registration if head is banned --- eth/downloader/downloader.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 7eda49186..92cb1a650 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -36,6 +36,7 @@ var ( errUnknownPeer = errors.New("peer is unknown or unhealthy") ErrBadPeer = errors.New("action from bad peer ignored") ErrStallingPeer = errors.New("peer is stalling") + errBannedHead = errors.New("peer head hash already banned") errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") @@ -72,11 +73,10 @@ type crossCheck struct { type Downloader struct { mux *event.TypeMux - mu sync.RWMutex queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain - banned *set.SetNonTS // Set of hashes we've received and banned + banned *set.Set // Set of hashes we've received and banned // Callbacks hasBlock hashCheckFn @@ -114,7 +114,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa blockCh: make(chan blockPack, 1), } // Inject all the known bad hashes - downloader.banned = set.NewNonTS() + downloader.banned = set.New() for hash, _ := range core.BadHashes { downloader.banned.Add(hash) } @@ -133,6 +133,12 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { + // If the peer wants to send a banned hash, reject + if d.banned.Has(head) { + glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) + return errBannedHead + } + // Otherwise try to construct and register the peer glog.V(logger.Detail).Infoln("Registering peer", id) if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) @@ -199,6 +205,8 @@ func (d *Downloader) TakeBlocks() []*Block { return d.queue.TakeBlocks() } +// Has checks if the downloader knows about a particular hash, meaning that its +// either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } @@ -604,14 +612,17 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { // Ban the head hash and phase out any excess d.banned.Add(blocks[index].Hash()) for d.banned.Size() > maxBannedHashes { + var evacuate common.Hash + d.banned.Each(func(item interface{}) bool { // Skip any hard coded bans if core.BadHashes[item.(common.Hash)] { return true } - d.banned.Remove(item) + evacuate = item.(common.Hash) return false }) + d.banned.Remove(evacuate) } glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId) return nil -- cgit v1.2.3 From 4ed3509a02c7f5a09036e6e9cb615c6def6d25f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 15:02:52 +0300 Subject: eth/downloader: test registration rejection on head ban --- eth/downloader/downloader_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c9e84371c..5f10fb41f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -169,8 +169,9 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { } } -func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) { - dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) +// newPeer registers a new block download source into the syncer. +func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) error { + return dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) } // Tests that simple synchronization, without throttling from a good peer works. @@ -559,6 +560,13 @@ func TestBannedChainStarvationAttack(t *testing.T) { } banned = bans } + // Check that after banning an entire chain, bad peers get dropped + if err := tester.newPeer("new attacker", big.NewInt(10000), hashes[0]); err != errBannedHead { + t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) + } + if peer := tester.downloader.peers.Peer("net attacker"); peer != nil { + t.Fatalf("banned attacker registered: %v", peer) + } } // Tests that if a peer sends excessively many/large invalid chains that are -- cgit v1.2.3 From 6f415b96b3b8581e810a8f40f596d2d213681e54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 18:46:07 +0300 Subject: eth: implement the NewBlockHashes protocol proposal --- eth/handler.go | 180 +++++++++++++++++++++++++++++++++++++------------------- eth/peer.go | 4 ++ eth/protocol.go | 2 +- 3 files changed, 123 insertions(+), 63 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index aea33452c..63ebc4bdd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -2,6 +2,7 @@ package eth import ( "fmt" + "math" "math/big" "sync" "time" @@ -20,6 +21,7 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 ) @@ -186,7 +188,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() switch msg.Code { - case GetTxMsg: // ignore case StatusMsg: return errResp(ErrExtraStatusMsg, "uncontrolled status message") @@ -227,6 +228,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // returns either requested hashes or nothing (i.e. not found) return p.sendBlockHashes(hashes) + case BlockHashesMsg: msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -266,6 +268,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } return p.sendBlocks(blocks) + case BlocksMsg: var blocks []*types.Block @@ -274,7 +277,57 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - self.downloader.DeliverBlocks(p.id, blocks) + + // Either deliver to the downloader or the importer + if self.downloader.Synchronising() { + self.downloader.DeliverBlocks(p.id, blocks) + } else { + for _, block := range blocks { + if err := self.importBlock(p, block, nil); err != nil { + return err + } + } + } + + case NewBlockHashesMsg: + // Retrieve and deseralize the remote new block hashes notification + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var hashes []common.Hash + if err := msgStream.Decode(&hashes); err != nil { + break + } + // Mark the hashes as present at the remote node + for _, hash := range hashes { + p.blockHashes.Add(hash) + p.recentHash = hash + } + // Wait a bit for potentially receiving the blocks, fetch if not + go func() { + time.Sleep(blockArrivalTimeout) + + // Drop all the hashes that are already known + unknown := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !self.chainman.HasBlock(hash) { + unknown = append(unknown, hash) + } + } + if len(unknown) == 0 { + return + } + // Retrieve all the unknown hashes + if err := p.requestBlocks(unknown); err != nil { + glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err) + } + if glog.V(logger.Detail) { + hashes := make([]string, len(unknown)) + for i, hash := range unknown { + hashes[i] = fmt.Sprintf("%x", hash[:4]) + } + glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes) + } + }() case NewBlockMsg: var request newBlockMsgData @@ -286,83 +339,86 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } request.Block.ReceivedAt = msg.ReceivedAt - hash := request.Block.Hash() - // Add the block hash as a known hash to the peer. This will later be used to determine - // who should receive this. - p.blockHashes.Add(hash) - // update the peer info - p.recentHash = hash - p.td = request.TD - - _, chainHead, _ := self.chainman.Status() - - jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ - BlockHash: hash.Hex(), - BlockNumber: request.Block.Number(), // this surely must be zero - ChainHeadHash: chainHead.Hex(), - BlockPrevHash: request.Block.ParentHash().Hex(), - RemoteId: p.ID().String(), - }) - - // Make sure the block isn't already known. If this is the case simply drop - // the message and move on. If the TD is < currentTd; drop it as well. If this - // chain at some point becomes canonical, the downloader will fetch it. - if self.chainman.HasBlock(hash) { - break - } - if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { - glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD) - break + if err := self.importBlock(p, request.Block, request.TD); err != nil { + return err } - // Attempt to insert the newly received by checking if the parent exists. - // if the parent exists we process the block and propagate to our peers - // otherwise synchronize with the peer - if self.chainman.HasBlock(request.Block.ParentHash()) { - if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { - glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") - - self.removePeer(p.id) - - return nil - } - - if err := self.verifyTd(p, request); err != nil { - glog.V(logger.Error).Infoln(err) - // XXX for now return nil so it won't disconnect (we should in the future) - return nil - } - self.BroadcastBlock(hash, request.Block) - } else { - go self.synchronise(p) - } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } return nil } -func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { - if request.Block.Td.Cmp(request.TD) != 0 { - glog.V(logger.Detail).Infoln(peer) +// importBlocks injects a new block retrieved from the given peer into the chain +// manager. +func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) error { + hash := block.Hash() - return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD) + // Mark the block as present at the remote node (don't duplicate already held data) + p.blockHashes.Add(hash) + p.recentHash = hash + if td != nil { + p.td = td + } + // Log the block's arrival + _, chainHead, _ := pm.chainman.Status() + jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ + BlockHash: hash.Hex(), + BlockNumber: block.Number(), // this surely must be zero + ChainHeadHash: chainHead.Hex(), + BlockPrevHash: block.ParentHash().Hex(), + RemoteId: p.ID().String(), + }) + // If the block's already known or its difficulty is lower than ours, drop + if pm.chainman.HasBlock(hash) { + p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value + return nil + } + if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 { + glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, block.Number(), td) + return nil + } + // Attempt to insert the newly received block and propagate to our peers + if pm.chainman.HasBlock(block.ParentHash()) { + if _, err := pm.chainman.InsertChain(types.Blocks{block}); err != nil { + glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error", err) + return err + } + if td != nil && block.Td.Cmp(td) != 0 { + err := fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", block.Number(), p.id, block.Td, td) + glog.V(logger.Error).Infoln(err) + return err + } + pm.BroadcastBlock(hash, block) + return nil + } + // Parent of the block is unknown, try to sync with this peer if it seems to be good + if td != nil { + go pm.synchronise(p) } - return nil } -// BroadcastBlock will propagate the block to its connected peers. It will sort -// out which peers do not contain the block in their block set and will do a -// sqrt(peers) to determine the amount of peers we broadcast to. +// BroadcastBlock will propagate the block to a subset of its connected peers, +// only notifying the rest of the block's appearance. func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { - // Broadcast block to a batch of peers not knowing about it + // Retrieve all the target peers and split between full broadcast or only notification peers := pm.peers.PeersWithoutBlock(hash) - //peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { + split := int(math.Sqrt(float64(len(peers)))) + + transfer := peers[:split] + nofity := peers[split:] + + // Send out the data transfers and the notifications + for _, peer := range nofity { + peer.sendNewBlockHashes([]common.Hash{hash}) + } + glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.") + + for _, peer := range transfer { peer.sendNewBlock(block) } - glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt)) + glog.V(logger.Detail).Infoln("broadcast block to", len(transfer), "peers. Total processing time:", time.Since(block.ReceivedAt)) } // BroadcastTx will propagate the block to its connected peers. It will sort diff --git a/eth/peer.go b/eth/peer.go index bb6a20349..1146ebde3 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -88,6 +88,10 @@ func (p *peer) sendBlocks(blocks []*types.Block) error { return p2p.Send(p.rw, BlocksMsg, blocks) } +func (p *peer) sendNewBlockHashes(hashes []common.Hash) error { + return p2p.Send(p.rw, NewBlockHashesMsg, hashes) +} + func (p *peer) sendNewBlock(block *types.Block) error { p.blockHashes.Add(block.Hash()) diff --git a/eth/protocol.go b/eth/protocol.go index 948051ed1..9ccf2cb60 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -17,7 +17,7 @@ const ( // eth protocol message codes const ( StatusMsg = iota - GetTxMsg // unused + NewBlockHashesMsg TxMsg GetBlockHashesMsg BlockHashesMsg -- cgit v1.2.3 From 8c012e103faf2283e9be3fab26dbcf4cc63c09da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 19:11:06 +0300 Subject: eth: mark blocks as known when broadcasting hashes too --- eth/peer.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'eth') diff --git a/eth/peer.go b/eth/peer.go index 1146ebde3..5a59c1a40 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -89,6 +89,9 @@ func (p *peer) sendBlocks(blocks []*types.Block) error { } func (p *peer) sendNewBlockHashes(hashes []common.Hash) error { + for _, hash := range hashes { + p.blockHashes.Add(hash) + } return p2p.Send(p.rw, NewBlockHashesMsg, hashes) } -- cgit v1.2.3 From fdccce781e94819ec9dc13ef6540a33efd3b26c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 19:24:56 +0300 Subject: eth: fetch announced hashes from origin, periodically --- eth/handler.go | 54 +++++++++++++++++++++++++----------------------------- eth/sync.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 32 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index 63ebc4bdd..7e9ec593a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -21,7 +21,8 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 ) @@ -57,6 +58,7 @@ type ProtocolManager struct { minedBlockSub event.Subscription newPeerCh chan *peer + newHashCh chan []*blockAnnounce quitSync chan struct{} // wait group is used for graceful shutdowns during downloading // and processing @@ -74,6 +76,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo downloader: downloader, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), + newHashCh: make(chan []*blockAnnounce, 1), quitSync: make(chan struct{}), } @@ -121,7 +124,8 @@ func (pm *ProtocolManager) Start() { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - go pm.update() + go pm.syncer() + go pm.fetcher() } func (pm *ProtocolManager) Stop() { @@ -302,32 +306,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error { p.blockHashes.Add(hash) p.recentHash = hash } - // Wait a bit for potentially receiving the blocks, fetch if not - go func() { - time.Sleep(blockArrivalTimeout) - - // Drop all the hashes that are already known - unknown := make([]common.Hash, 0, len(hashes)) - for _, hash := range hashes { - if !self.chainman.HasBlock(hash) { - unknown = append(unknown, hash) - } - } - if len(unknown) == 0 { - return - } - // Retrieve all the unknown hashes - if err := p.requestBlocks(unknown); err != nil { - glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err) + // Schedule all the unknown hashes for retrieval + unknown := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !self.chainman.HasBlock(hash) { + unknown = append(unknown, hash) } - if glog.V(logger.Detail) { - hashes := make([]string, len(unknown)) - for i, hash := range unknown { - hashes[i] = fmt.Sprintf("%x", hash[:4]) - } - glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes) + } + announces := make([]*blockAnnounce, len(unknown)) + for i, hash := range unknown { + announces[i] = &blockAnnounce{ + hash: hash, + peer: p, + time: time.Now(), } - }() + } + if len(announces) > 0 { + self.newHashCh <- announces + } case NewBlockMsg: var request newBlockMsgData @@ -407,13 +403,13 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) split := int(math.Sqrt(float64(len(peers)))) transfer := peers[:split] - nofity := peers[split:] + notify := peers[split:] // Send out the data transfers and the notifications - for _, peer := range nofity { + for _, peer := range notify { peer.sendNewBlockHashes([]common.Hash{hash}) } - glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.") + glog.V(logger.Detail).Infoln("broadcast hash to", len(notify), "peers.") for _, peer := range transfer { peer.sendNewBlock(block) diff --git a/eth/sync.go b/eth/sync.go index 56084f2f0..1a1cbdb47 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -5,15 +5,67 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// update periodically tries to synchronise with the network, both downloading -// hashes and blocks as well as retrieving cached ones. -func (pm *ProtocolManager) update() { +// blockAnnounce is the hash notification of the availability of a new block in +// the network. +type blockAnnounce struct { + hash common.Hash + peer *peer + time time.Time +} + +// fetcher is responsible for collecting hash notifications, and periodically +// checking all unknown ones and individually fetching them. +func (pm *ProtocolManager) fetcher() { + announces := make(map[common.Hash]*blockAnnounce) + request := make(map[*peer][]common.Hash) + cycle := time.Tick(notifyCheckCycle) + + // Iterate the block fetching until a quit is requested + for { + select { + case notifications := <-pm.newHashCh: + // A batch of hashes the notified, schedule them for retrieval + glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id) + for _, announce := range notifications { + announces[announce.hash] = announce + } + + case <-cycle: + // Check if any notified blocks failed to arrive + for hash, announce := range announces { + if time.Since(announce.time) > notifyArriveTimeout { + if !pm.chainman.HasBlock(hash) { + request[announce.peer] = append(request[announce.peer], hash) + } + delete(announces, hash) + } + } + if len(request) == 0 { + break + } + // Send out all block requests + for peer, hashes := range request { + glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id) + peer.requestBlocks(hashes) + } + request = make(map[*peer][]common.Hash) + + case <-pm.quitSync: + return + } + } +} + +// syncer is responsible for periodically synchronising with the network, both +// downloading hashes and blocks as well as retrieving cached ones. +func (pm *ProtocolManager) syncer() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) blockProcPend := int32(0) -- cgit v1.2.3 From 6244b10a8f74d92addf977994e5a9c0e457229bb Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Jun 2015 12:12:13 +0200 Subject: core: settable genesis nonce You can set the nonce of the block with `--genesisnonce`. When the genesis nonce changes and it doesn't match with the first block in your database it will fail. A new `datadir` must be given if the nonce of the genesis block changes. --- eth/backend.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 3956dfcaa..06627416d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -58,6 +58,7 @@ type Config struct { Name string ProtocolVersion int NetworkId int + GenesisNonce int BlockChainVersion int SkipBcVersionCheck bool // e.g. blockchain export @@ -284,7 +285,11 @@ func New(config *Config) (*Ethereum, error) { } eth.pow = ethash.New() - eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux()) + genesis := core.GenesisBlock(uint64(config.GenesisNonce), blockDb) + eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, eth.pow, eth.EventMux()) + if err != nil { + return nil, err + } eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) -- cgit v1.2.3 From 9ed166c196b07047299579e5ea2b6ece26aec5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 20:38:39 +0300 Subject: eth: split and handle explicitly vs. download requested blocks --- eth/handler.go | 48 +++++++++++++++++++++++++----------------------- eth/sync.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 23 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index 7e9ec593a..acc16812a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,15 +18,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 -) - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -57,9 +48,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - quitSync chan struct{} + newPeerCh chan *peer + newHashCh chan []*blockAnnounce + newBlockCh chan chan []*types.Block + quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), + newBlockCh: make(chan chan []*types.Block), quitSync: make(chan struct{}), } @@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return p.sendBlocks(blocks) case BlocksMsg: - var blocks []*types.Block - + // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - - // Either deliver to the downloader or the importer - if self.downloader.Synchronising() { - self.downloader.DeliverBlocks(p.id, blocks) - } else { - for _, block := range blocks { - if err := self.importBlock(p, block, nil); err != nil { - return err + // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) + filter := make(chan []*types.Block) + select { + case <-self.quitSync: + case self.newBlockCh <- filter: + select { + case <-self.quitSync: + case filter <- blocks: + select { + case <-self.quitSync: + case blocks := <-filter: + self.downloader.DeliverBlocks(p.id, blocks) } } } @@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } if len(announces) > 0 { - self.newHashCh <- announces + select { + case self.newHashCh <- announces: + case <-self.quitSync: + } } case NewBlockMsg: diff --git a/eth/sync.go b/eth/sync.go index 1a1cbdb47..f761f3cd1 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,6 +12,16 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 +) + // blockAnnounce is the hash notification of the availability of a new block in // the network. type blockAnnounce struct { @@ -25,6 +35,7 @@ type blockAnnounce struct { func (pm *ProtocolManager) fetcher() { announces := make(map[common.Hash]*blockAnnounce) request := make(map[*peer][]common.Hash) + pending := make(map[common.Hash]*blockAnnounce) cycle := time.Tick(notifyCheckCycle) // Iterate the block fetching until a quit is requested @@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() { } case <-cycle: + // Clean up any expired block fetches + for hash, announce := range pending { + if time.Since(announce.time) > notifyFetchTimeout { + delete(pending, hash) + } + } // Check if any notified blocks failed to arrive for hash, announce := range announces { if time.Since(announce.time) > notifyArriveTimeout { if !pm.chainman.HasBlock(hash) { request[announce.peer] = append(request[announce.peer], hash) + pending[hash] = announce } delete(announces, hash) } @@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() { } request = make(map[*peer][]common.Hash) + case filter := <-pm.newBlockCh: + // Blocks arrived, extract any explicit requests, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-pm.quitSync: + return + } + + fetch, sync := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + if _, ok := pending[hash]; ok { + fetch = append(fetch, block) + } else { + sync = append(sync, block) + } + } + + select { + case filter <- sync: + case <-pm.quitSync: + return + } + // If any explicit fetches were replied to, import them + if len(fetch) > 0 { + go func() { + for _, block := range fetch { + if announce := pending[block.Hash()]; announce != nil { + if err := pm.importBlock(announce.peer, block, nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + } + }() + } + case <-pm.quitSync: return } -- cgit v1.2.3 From 8216bb901c9fbdcde427cc42ca7e82ec3ee2e8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 00:37:10 +0300 Subject: eth: clean up pending announce download map, polish logs --- eth/handler.go | 2 +- eth/sync.go | 38 +++++++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 14 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index acc16812a..15381b447 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -362,7 +362,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) _, chainHead, _ := pm.chainman.Status() jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ BlockHash: hash.Hex(), - BlockNumber: block.Number(), // this surely must be zero + BlockNumber: block.Number(), ChainHeadHash: chainHead.Hex(), BlockPrevHash: block.ParentHash().Hex(), RemoteId: p.ID().String(), diff --git a/eth/sync.go b/eth/sync.go index f761f3cd1..dd7414da8 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -43,7 +43,7 @@ func (pm *ProtocolManager) fetcher() { select { case notifications := <-pm.newHashCh: // A batch of hashes the notified, schedule them for retrieval - glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id) + glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) for _, announce := range notifications { announces[announce.hash] = announce } @@ -70,13 +70,13 @@ func (pm *ProtocolManager) fetcher() { } // Send out all block requests for peer, hashes := range request { - glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id) + glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) peer.requestBlocks(hashes) } request = make(map[*peer][]common.Hash) case filter := <-pm.newBlockCh: - // Blocks arrived, extract any explicit requests, return all else + // Blocks arrived, extract any explicit fetches, return all else var blocks types.Blocks select { case blocks = <-filter: @@ -84,26 +84,38 @@ func (pm *ProtocolManager) fetcher() { return } - fetch, sync := []*types.Block{}, []*types.Block{} + explicit, download := []*types.Block{}, []*types.Block{} for _, block := range blocks { hash := block.Hash() + + // Filter explicitly requested blocks from hash announcements if _, ok := pending[hash]; ok { - fetch = append(fetch, block) + // Discard if already imported by other means + if !pm.chainman.HasBlock(hash) { + explicit = append(explicit, block) + } else { + delete(pending, hash) + } } else { - sync = append(sync, block) + download = append(download, block) } } select { - case filter <- sync: + case filter <- download: case <-pm.quitSync: return } // If any explicit fetches were replied to, import them - if len(fetch) > 0 { + if count := len(explicit); count > 0 { + glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) go func() { - for _, block := range fetch { - if announce := pending[block.Hash()]; announce != nil { + for _, block := range explicit { + hash := block.Hash() + + // Make sure there's still something pending to import + if announce := pending[hash]; announce != nil { + delete(pending, hash) if err := pm.importBlock(announce.peer, block, nil); err != nil { glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) return @@ -207,15 +219,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Get the hashes from the peer (synchronously) - glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) err := pm.downloader.Synchronise(peer.id, peer.recentHash) switch err { case nil: - glog.V(logger.Debug).Infof("Synchronisation completed") + glog.V(logger.Detail).Infof("Synchronisation completed") case downloader.ErrBusy: - glog.V(logger.Debug).Infof("Synchronisation already in progress") + glog.V(logger.Detail).Infof("Synchronisation already in progress") case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) -- cgit v1.2.3 From a5b977aa9042b8b5199067d435c5cbd3f6459834 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 9 Jun 2015 11:36:23 +0200 Subject: core: write accounts to statedb. Closes #1210 --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 06627416d..fcbea04a2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -285,7 +285,7 @@ func New(config *Config) (*Ethereum, error) { } eth.pow = ethash.New() - genesis := core.GenesisBlock(uint64(config.GenesisNonce), blockDb) + genesis := core.GenesisBlock(uint64(config.GenesisNonce), stateDb) eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, eth.pow, eth.EventMux()) if err != nil { return nil, err -- cgit v1.2.3 From 44147d057dd91d8b35dd6f4ed025bdb4baf225eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:27:44 +0300 Subject: eth: fix data race accessing peer.recentHash --- eth/handler.go | 6 +++--- eth/peer.go | 31 +++++++++++++++++++++++++------ eth/sync.go | 7 ++++--- 3 files changed, 32 insertions(+), 12 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index 64f89b273..847e7a0e8 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -157,7 +157,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) - if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } // propagate existing transactions. new transactions appearing @@ -303,7 +303,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Mark the hashes as present at the remote node for _, hash := range hashes { p.blockHashes.Add(hash) - p.recentHash = hash + p.SetHead(hash) } // Schedule all the unknown hashes for retrieval unknown := make([]common.Hash, 0, len(hashes)) @@ -354,7 +354,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) // Mark the block as present at the remote node (don't duplicate already held data) p.blockHashes.Add(hash) - p.recentHash = hash + p.SetHead(hash) if td != nil { p.td = td } diff --git a/eth/peer.go b/eth/peer.go index cf2c58ec9..5f5007891 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -40,9 +40,11 @@ type peer struct { protv, netid int - recentHash common.Hash - id string - td *big.Int + id string + td *big.Int + + head common.Hash + headLock sync.RWMutex genesis, ourHash common.Hash ourTd *big.Int @@ -51,14 +53,14 @@ type peer struct { blockHashes *set.Set } -func newPeer(protv, netid int, genesis, recentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(protv, netid int, genesis, head common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() return &peer{ Peer: p, rw: rw, genesis: genesis, - ourHash: recentHash, + ourHash: head, ourTd: td, protv: protv, netid: netid, @@ -68,6 +70,23 @@ func newPeer(protv, netid int, genesis, recentHash common.Hash, td *big.Int, p * } } +// Head retrieves a copy of the current head (most recent) hash of the peer. +func (p *peer) Head() (hash common.Hash) { + p.headLock.RLock() + defer p.headLock.RUnlock() + + copy(hash[:], p.head[:]) + return hash +} + +// SetHead updates the head (most recent) hash of the peer. +func (p *peer) SetHead(hash common.Hash) { + p.headLock.Lock() + defer p.headLock.Unlock() + + copy(p.head[:], hash[:]) +} + // sendTransactions sends transactions to the peer and includes the hashes // in it's tx hash set for future reference. The tx hash will allow the // manager to check whether the peer has already received this particular @@ -160,7 +179,7 @@ func (p *peer) handleStatus() error { // Set the total difficulty of the peer p.td = status.TD // set the best hash of the peer - p.recentHash = status.CurrentBlock + p.head = status.CurrentBlock return <-errc } diff --git a/eth/sync.go b/eth/sync.go index dd7414da8..b3184364f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -214,14 +214,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // FIXME if we have the hash in our chain and the TD of the peer is // much higher than ours, something is wrong with us or the peer. // Check if the hash is on our own chain - if pm.chainman.HasBlock(peer.recentHash) { + head := peer.Head() + if pm.chainman.HasBlock(head) { glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") return } // Get the hashes from the peer (synchronously) - glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head) - err := pm.downloader.Synchronise(peer.id, peer.recentHash) + err := pm.downloader.Synchronise(peer.id, head) switch err { case nil: glog.V(logger.Detail).Infof("Synchronisation completed") -- cgit v1.2.3 From f86707713c42da02b70a3a389684e19e902d8759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:56:27 +0300 Subject: eth: fix data race accessing peer.td --- eth/handler.go | 4 ++-- eth/peer.go | 41 ++++++++++++++++++++++++++++++----------- eth/sync.go | 2 +- 3 files changed, 33 insertions(+), 14 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index 847e7a0e8..f2027c3c6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -356,7 +356,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) p.blockHashes.Add(hash) p.SetHead(hash) if td != nil { - p.td = td + p.SetTd(td) } // Log the block's arrival _, chainHead, _ := pm.chainman.Status() @@ -369,7 +369,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) }) // If the block's already known or its difficulty is lower than ours, drop if pm.chainman.HasBlock(hash) { - p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value + p.SetTd(pm.chainman.GetBlock(hash).Td) // update the peer's TD to the real value return nil } if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 { diff --git a/eth/peer.go b/eth/peer.go index 5f5007891..c7045282b 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -41,10 +41,10 @@ type peer struct { protv, netid int id string - td *big.Int - head common.Hash - headLock sync.RWMutex + head common.Hash + td *big.Int + lock sync.RWMutex genesis, ourHash common.Hash ourTd *big.Int @@ -72,8 +72,8 @@ func newPeer(protv, netid int, genesis, head common.Hash, td *big.Int, p *p2p.Pe // Head retrieves a copy of the current head (most recent) hash of the peer. func (p *peer) Head() (hash common.Hash) { - p.headLock.RLock() - defer p.headLock.RUnlock() + p.lock.RLock() + defer p.lock.RUnlock() copy(hash[:], p.head[:]) return hash @@ -81,12 +81,28 @@ func (p *peer) Head() (hash common.Hash) { // SetHead updates the head (most recent) hash of the peer. func (p *peer) SetHead(hash common.Hash) { - p.headLock.Lock() - defer p.headLock.Unlock() + p.lock.Lock() + defer p.lock.Unlock() copy(p.head[:], hash[:]) } +// Td retrieves the current total difficulty of a peer. +func (p *peer) Td() *big.Int { + p.lock.RLock() + defer p.lock.RUnlock() + + return new(big.Int).Set(p.td) +} + +// SetTd updates the current total difficulty of a peer. +func (p *peer) SetTd(td *big.Int) { + p.lock.Lock() + defer p.lock.Unlock() + + p.td.Set(td) +} + // sendTransactions sends transactions to the peer and includes the hashes // in it's tx hash set for future reference. The tx hash will allow the // manager to check whether the peer has already received this particular @@ -275,11 +291,14 @@ func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() defer ps.lock.RUnlock() - var best *peer + var ( + bestPeer *peer + bestTd *big.Int + ) for _, p := range ps.peers { - if best == nil || p.td.Cmp(best.td) > 0 { - best = p + if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 { + bestPeer, bestTd = p, td } } - return best + return bestPeer } diff --git a/eth/sync.go b/eth/sync.go index b3184364f..3a33fe149 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -208,7 +208,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Make sure the peer's TD is higher than our own. If not drop. - if peer.td.Cmp(pm.chainman.Td()) <= 0 { + if peer.Td().Cmp(pm.chainman.Td()) <= 0 { return } // FIXME if we have the hash in our chain and the TD of the peer is -- cgit v1.2.3 From d09ead546cbdf8e4659e65581f23715101f5b686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 15:09:15 +0300 Subject: eth: fix a data race in the hash announcement processing --- eth/sync.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) (limited to 'eth') diff --git a/eth/sync.go b/eth/sync.go index 3a33fe149..8e4e3cfbe 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -109,17 +109,25 @@ func (pm *ProtocolManager) fetcher() { // If any explicit fetches were replied to, import them if count := len(explicit); count > 0 { glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) + + // Create a closure with the retrieved blocks and origin peers + peers := make([]*peer, 0, count) + blocks := make([]*types.Block, 0, count) + for _, block := range explicit { + hash := block.Hash() + if announce := pending[hash]; announce != nil { + peers = append(peers, announce.peer) + blocks = append(blocks, block) + + delete(pending, hash) + } + } + // Run the importer on a new thread go func() { - for _, block := range explicit { - hash := block.Hash() - - // Make sure there's still something pending to import - if announce := pending[hash]; announce != nil { - delete(pending, hash) - if err := pm.importBlock(announce.peer, block, nil); err != nil { - glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) - return - } + for i := 0; i < len(blocks); i++ { + if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return } } }() -- cgit v1.2.3 From 41b2008a669a8454ae19f783eb2dcd967e8752cf Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:00:41 +0200 Subject: eth: limit number of sent blocks based on message size If blocks get larger, sending 256 at once can make messages large enough to exceed the low-level write timeout. --- eth/handler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index f2027c3c6..a67d956fb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +// This is the target maximum size of returned blocks for the +// getBlocks message. The reply message may exceed it +// if a single block is larger than the limit. +const maxBlockRespSize = 2 * 1024 * 1024 + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -246,7 +251,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := msgStream.List(); err != nil { return err } - var i int + var ( + i int + totalsize common.StorageSize + ) for { i++ var hash common.Hash @@ -260,8 +268,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { block := self.chainman.GetBlock(hash) if block != nil { blocks = append(blocks, block) + totalsize += block.Size() } - if i == downloader.MaxBlockFetch { + if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize { break } } -- cgit v1.2.3 From 6c73a5980640581903d8f56b3912b22641d5195c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:03:14 +0200 Subject: eth: limit number of sent transactions based on message size Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth. --- eth/handler.go | 24 +++++++++------ eth/sync.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 9 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index a67d956fb..f002727f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -53,9 +53,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer newHashCh chan []*blockAnnounce newBlockCh chan chan []*types.Block + txsyncCh chan *txsync quitSync chan struct{} // wait group is used for graceful shutdowns during downloading @@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), newBlockCh: make(chan chan []*types.Block), + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), @@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // start sync handlers go pm.syncer() go pm.fetcher() + go pm.txsyncLoop() } func (pm *ProtocolManager) Stop() { @@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() { pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits the sync handler + close(pm.quitSync) // quits syncer, fetcher, txsyncLoop // Wait for any process action pm.wg.Wait() @@ -150,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake, short circuit if fails + // Execute the Ethereum handshake. if err := p.handleStatus(); err != nil { return err } - // Register the peer locally and in the downloader too + + // Register the peer locally. glog.V(logger.Detail).Infoln("Adding peer", p.id) if err := pm.peers.Register(p); err != nil { glog.V(logger.Error).Infoln("Addition failed:", err) @@ -162,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) + // Register the peer in the downloader. If the downloader + // considers it banned, we disconnect. if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // propagate existing transactions. new transactions appearing + + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. - if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { - return err - } + pm.syncTransactions(p) + // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { diff --git a/eth/sync.go b/eth/sync.go index 8e4e3cfbe..a25d4d4fd 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "math/rand" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -20,6 +22,10 @@ const ( notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 + + // This is the target size for the packs of transactions sent by txsyncLoop. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 ) // blockAnnounce is the hash notification of the availability of a new block in @@ -30,6 +36,94 @@ type blockAnnounce struct { time time.Time } +type txsync struct { + p *peer + txs []*types.Transaction +} + +// syncTransactions starts sending all currently pending transactions to the given peer. +func (pm *ProtocolManager) syncTransactions(p *peer) { + txs := pm.txpool.GetTransactions() + if len(txs) == 0 { + return + } + select { + case pm.txsyncCh <- &txsync{p, txs}: + case <-pm.quitSync: + } +} + +// txsyncLoop takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (pm *ProtocolManager) txsyncLoop() { + var ( + pending = make(map[discover.NodeID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size) + sending = true + go func() { done <- pack.p.sendTransactions(pack.txs) }() + } + + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.txsyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // fetcher is responsible for collecting hash notifications, and periodically // checking all unknown ones and individually fetching them. func (pm *ProtocolManager) fetcher() { -- cgit v1.2.3 From 2c24a73e253a0b0c205406c7dc9fc4b8a7d97e86 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:10:19 +0200 Subject: eth: add protocol tests The protocol tests were commented out when eth/downloader was introduced. --- eth/protocol_test.go | 526 +++++++++++++++++++-------------------------------- 1 file changed, 190 insertions(+), 336 deletions(-) (limited to 'eth') diff --git a/eth/protocol_test.go b/eth/protocol_test.go index d44f66b89..bbea9fc11 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -1,388 +1,242 @@ package eth -/* -TODO All of these tests need to be re-written - -var logsys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel)) - -var ini = false - -func logInit() { - if !ini { - ethlogger.AddLogSystem(logsys) - ini = true - } -} - -type testTxPool struct { - getTransactions func() []*types.Transaction - addTransactions func(txs []*types.Transaction) -} - -type testChainManager struct { - getBlockHashes func(hash common.Hash, amount uint64) (hashes []common.Hash) - getBlock func(hash common.Hash) *types.Block - status func() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) -} - -type testBlockPool struct { - addBlockHashes func(next func() (common.Hash, bool), peerId string) - addBlock func(block *types.Block, peerId string) (err error) - addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) - removePeer func(peerId string) -} - -func (self *testTxPool) AddTransactions(txs []*types.Transaction) { - if self.addTransactions != nil { - self.addTransactions(txs) - } -} - -func (self *testTxPool) GetTransactions() types.Transactions { return nil } - -func (self *testChainManager) GetBlockHashesFromHash(hash common.Hash, amount uint64) (hashes []common.Hash) { - if self.getBlockHashes != nil { - hashes = self.getBlockHashes(hash, amount) - } - return -} - -func (self *testChainManager) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) { - if self.status != nil { - td, currentBlock, genesisBlock = self.status() - } else { - td = common.Big1 - currentBlock = common.Hash{1} - genesisBlock = common.Hash{2} - } - return -} - -func (self *testChainManager) GetBlock(hash common.Hash) (block *types.Block) { - if self.getBlock != nil { - block = self.getBlock(hash) - } - return -} - -func (self *testBlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) { - if self.addBlockHashes != nil { - self.addBlockHashes(next, peerId) - } -} - -func (self *testBlockPool) AddBlock(block *types.Block, peerId string) { - if self.addBlock != nil { - self.addBlock(block, peerId) - } -} - -func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) { - if self.addPeer != nil { - best, suspended = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError) - } - return -} - -func (self *testBlockPool) RemovePeer(peerId string) { - if self.removePeer != nil { - self.removePeer(peerId) - } -} - -func testPeer() *p2p.Peer { - var id discover.NodeID - pk := crypto.GenerateNewKeyPair().PublicKey - copy(id[:], pk) - return p2p.NewPeer(id, "test peer", []p2p.Cap{}) -} - -type ethProtocolTester struct { - p2p.MsgReadWriter // writing to the tester feeds the protocol - - quit chan error - pipe *p2p.MsgPipeRW // the protocol read/writes on this end - txPool *testTxPool // txPool - chainManager *testChainManager // chainManager - blockPool *testBlockPool // blockPool - t *testing.T -} - -func newEth(t *testing.T) *ethProtocolTester { - p1, p2 := p2p.MsgPipe() - return ðProtocolTester{ - MsgReadWriter: p1, - quit: make(chan error, 1), - pipe: p2, - txPool: &testTxPool{}, - chainManager: &testChainManager{}, - blockPool: &testBlockPool{}, - t: t, - } -} - -func (self *ethProtocolTester) reset() { - self.pipe.Close() - - p1, p2 := p2p.MsgPipe() - self.MsgReadWriter = p1 - self.pipe = p2 - self.quit = make(chan error, 1) -} - -func (self *ethProtocolTester) checkError(expCode int, delay time.Duration) (err error) { - var timer = time.After(delay) - select { - case err = <-self.quit: - case <-timer: - self.t.Errorf("no error after %v, expected %v", delay, expCode) - return - } - perr, ok := err.(*errs.Error) - if ok && perr != nil { - if code := perr.Code; code != expCode { - self.t.Errorf("expected protocol error (code %v), got %v (%v)", expCode, code, err) - } - } else { - self.t.Errorf("expected protocol error (code %v), got %v", expCode, err) - } - return -} - -func (self *ethProtocolTester) run() { - err := runEthProtocol(ProtocolVersion, NetworkId, self.txPool, self.chainManager, self.blockPool, testPeer(), self.pipe) - self.quit <- err -} - -func (self *ethProtocolTester) handshake(t *testing.T, mock bool) { - td, currentBlock, genesis := self.chainManager.Status() - // first outgoing msg should be StatusMsg. - err := p2p.ExpectMsg(self, StatusMsg, &statusMsgData{ - ProtocolVersion: ProtocolVersion, - NetworkId: NetworkId, - TD: td, - CurrentBlock: currentBlock, - GenesisBlock: genesis, - }) - if err != nil { - t.Fatalf("incorrect outgoing status: %v", err) - } - if mock { - go p2p.Send(self, StatusMsg, &statusMsgData{ProtocolVersion, NetworkId, td, currentBlock, genesis}) - } -} +import ( + "crypto/rand" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +func init() { + // glog.SetToStderr(true) + // glog.SetV(6) +} + +var testAccount = crypto.NewKey(rand.Reader) func TestStatusMsgErrors(t *testing.T) { - logInit() - eth := newEth(t) - go eth.run() - td, currentBlock, genesis := eth.chainManager.Status() + pm := newProtocolManagerForTesting(nil) + td, currentBlock, genesis := pm.chainman.Status() + defer pm.Stop() tests := []struct { - code uint64 - data interface{} - wantErrorCode int + code uint64 + data interface{} + wantError error }{ { code: TxMsg, data: []interface{}{}, - wantErrorCode: ErrNoStatusMsg, + wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{10, NetworkId, td, currentBlock, genesis}, - wantErrorCode: ErrProtocolVersionMismatch, + wantError: errResp(ErrProtocolVersionMismatch, "10 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{ProtocolVersion, 999, td, currentBlock, genesis}, - wantErrorCode: ErrNetworkIdMismatch, + wantError: errResp(ErrNetworkIdMismatch, "999 (!= 0)"), }, { code: StatusMsg, data: statusMsgData{ProtocolVersion, NetworkId, td, currentBlock, common.Hash{3}}, - wantErrorCode: ErrGenesisBlockMismatch, + wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000000000000000000000000000000000000000000000000000 (!= %x)", genesis), }, } - for _, test := range tests { - eth.handshake(t, false) - // the send call might hang until reset because + + for i, test := range tests { + p, errc := newTestPeer(pm) + // The send call might hang until reset because // the protocol might not read the payload. - go p2p.Send(eth, test.code, test.data) - eth.checkError(test.wantErrorCode, 1*time.Second) + go p2p.Send(p, test.code, test.data) - eth.reset() - go eth.run() + select { + case err := <-errc: + if err == nil { + t.Errorf("test %d: protocol returned nil error, want %q", test.wantError) + } else if err.Error() != test.wantError.Error() { + t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.wantError) + } + case <-time.After(2 * time.Second): + t.Errorf("protocol did not shut down withing 2 seconds") + } + p.close() } } -func TestNewBlockMsg(t *testing.T) { - // logInit() - eth := newEth(t) - - var disconnected bool - eth.blockPool.removePeer = func(peerId string) { - disconnected = true - } - - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) - } - - var tds = make(chan *big.Int) - eth.blockPool.addPeer = func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) { - tds <- td - return - } - - var delay = 1 * time.Second - // eth.reset() - block := types.NewBlock(common.Hash{1}, common.Address{1}, common.Hash{1}, common.Big1, 1, []byte("extra")) - - go p2p.Send(eth, NewBlockMsg, &newBlockMsgData{Block: block}) - timer := time.After(delay) +// This test checks that received transactions are added to the local pool. +func TestRecvTransactions(t *testing.T) { + txAdded := make(chan []*types.Transaction) + pm := newProtocolManagerForTesting(txAdded) + p, _ := newTestPeer(pm) + defer pm.Stop() + defer p.close() + p.handshake(t) - select { - case td := <-tds: - if td.Cmp(common.Big0) != 0 { - t.Errorf("incorrect td %v, expected %v", td, common.Big0) - } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return + tx := newtx(testAccount, 0, 0) + if err := p2p.Send(p, TxMsg, []interface{}{tx}); err != nil { + t.Fatalf("send error: %v", err) } - - go p2p.Send(eth, NewBlockMsg, &newBlockMsgData{block, common.Big2}) - timer = time.After(delay) - select { - case td := <-tds: - if td.Cmp(common.Big2) != 0 { - t.Errorf("incorrect td %v, expected %v", td, common.Big2) + case added := <-txAdded: + if len(added) != 1 { + t.Errorf("wrong number of added transactions: got %d, want 1", len(added)) + } else if added[0].Hash() != tx.Hash() { + t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return + case <-time.After(2 * time.Second): + t.Errorf("no TxPreEvent received within 2 seconds") } - - go p2p.Send(eth, NewBlockMsg, []interface{}{}) - // Block.DecodeRLP: validation failed: header is nil - eth.checkError(ErrDecode, delay) - } -func TestBlockMsg(t *testing.T) { - // logInit() - eth := newEth(t) - blocks := make(chan *types.Block) - eth.blockPool.addBlock = func(block *types.Block, peerId string) (err error) { - blocks <- block - return - } +// This test checks that pending transactions are sent. +func TestSendTransactions(t *testing.T) { + pm := newProtocolManagerForTesting(nil) + defer pm.Stop() - var disconnected bool - eth.blockPool.removePeer = func(peerId string) { - disconnected = true + // Fill the pool with big transactions. + const txsize = txsyncPackSize / 10 + alltxs := make([]*types.Transaction, 100) + for nonce := range alltxs { + alltxs[nonce] = newtx(testAccount, uint64(nonce), txsize) } + pm.txpool.AddTransactions(alltxs) - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) - } - - var delay = 3 * time.Second - // eth.reset() - newblock := func(i int64) *types.Block { - return types.NewBlock(common.Hash{byte(i)}, common.Address{byte(i)}, common.Hash{byte(i)}, big.NewInt(i), uint64(i), []byte{byte(i)}) - } - b := newblock(0) - b.Header().Difficulty = nil // check if nil as *big.Int decodes as 0 - go p2p.Send(eth, BlocksMsg, types.Blocks{b, newblock(1), newblock(2)}) - timer := time.After(delay) - for i := int64(0); i < 3; i++ { - select { - case block := <-blocks: - if (block.ParentHash() != common.Hash{byte(i)}) { - t.Errorf("incorrect block %v, expected %v", block.ParentHash(), common.Hash{byte(i)}) + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checktxs := func(p *testPeer) { + defer wg.Done() + defer p.close() + seen := make(map[common.Hash]bool) + for _, tx := range alltxs { + seen[tx.Hash()] = false + } + for n := 0; n < len(alltxs) && !t.Failed(); { + var txs []*types.Transaction + msg, err := p.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != TxMsg { + t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&txs); err != nil { + t.Errorf("%v: %v", p.Peer, err) } - if block.Difficulty().Cmp(big.NewInt(i)) != 0 { - t.Errorf("incorrect block %v, expected %v", block.Difficulty(), big.NewInt(i)) + for _, tx := range txs { + hash := tx.Hash() + seentx, want := seen[hash] + if seentx { + t.Errorf("%v: got tx more than once: %x", p.Peer, hash) + } + if !want { + t.Errorf("%v: got unexpected tx: %x", p.Peer, hash) + } + seen[hash] = true + n++ } - case <-timer: - t.Errorf("no td recorded after %v", delay) - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return } } - - go p2p.Send(eth, BlocksMsg, []interface{}{[]interface{}{}}) - eth.checkError(ErrDecode, delay) - if !disconnected { - t.Errorf("peer not disconnected after error") - } - - // test empty transaction - eth.reset() - go eth.run() - eth.handshake(t, true) - err = p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) + for i := 0; i < 3; i++ { + p, _ := newTestPeer(pm) + p.handshake(t) + wg.Add(1) + go checktxs(p) } - b = newblock(0) - b.AddTransaction(nil) - go p2p.Send(eth, BlocksMsg, types.Blocks{b}) - eth.checkError(ErrDecode, delay) + wg.Wait() +} +// testPeer wraps all peer-related data for tests. +type testPeer struct { + p2p.MsgReadWriter // writing to the test peer feeds the protocol + pipe *p2p.MsgPipeRW // the protocol read/writes on this end + pm *ProtocolManager + *peer } -func TestTransactionsMsg(t *testing.T) { - logInit() - eth := newEth(t) - txs := make(chan *types.Transaction) +func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *ProtocolManager { + var ( + em = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em) + txpool = &fakeTxPool{added: txAdded} + dl = downloader.New(em, chain.HasBlock, chain.GetBlock) + pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain, dl) + ) + pm.Start() + return pm +} - eth.txPool.addTransactions = func(t []*types.Transaction) { - for _, tx := range t { - txs <- tx - } +func newTestPeer(pm *ProtocolManager) (*testPeer, <-chan error) { + var id discover.NodeID + rand.Read(id[:]) + rw1, rw2 := p2p.MsgPipe() + peer := pm.newPeer(pm.protVer, pm.netId, p2p.NewPeer(id, "test peer", nil), rw2) + errc := make(chan error, 1) + go func() { + pm.newPeerCh <- peer + errc <- pm.handle(peer) + }() + return &testPeer{rw1, rw2, pm, peer}, errc +} + +func (p *testPeer) handshake(t *testing.T) { + td, currentBlock, genesis := p.pm.chainman.Status() + msg := &statusMsgData{ + ProtocolVersion: uint32(p.pm.protVer), + NetworkId: uint32(p.pm.netId), + TD: td, + CurrentBlock: currentBlock, + GenesisBlock: genesis, } - go eth.run() - - eth.handshake(t, true) - err := p2p.ExpectMsg(eth, TxMsg, []interface{}{}) - if err != nil { - t.Errorf("transactions expected, got %v", err) + if err := p2p.ExpectMsg(p, StatusMsg, msg); err != nil { + t.Fatalf("status recv: %v", err) } + if err := p2p.Send(p, StatusMsg, msg); err != nil { + t.Fatalf("status send: %v", err) + } +} - var delay = 3 * time.Second - tx := &types.Transaction{} +func (p *testPeer) close() { + p.pipe.Close() +} - go p2p.Send(eth, TxMsg, []interface{}{tx, tx}) - timer := time.After(delay) - for i := int64(0); i < 2; i++ { - select { - case <-txs: - case <-timer: - return - case err := <-eth.quit: - t.Errorf("no error expected, got %v", err) - return - } +type fakeTxPool struct { + // all transactions are collected. + mu sync.Mutex + all []*types.Transaction + // if added is non-nil, it receives added transactions. + added chan<- []*types.Transaction +} + +func (pool *fakeTxPool) AddTransactions(txs []*types.Transaction) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.all = append(pool.all, txs...) + if pool.added != nil { + pool.added <- txs } +} - go p2p.Send(eth, TxMsg, []interface{}{[]interface{}{}}) - eth.checkError(ErrDecode, delay) +func (pool *fakeTxPool) GetTransactions() types.Transactions { + pool.mu.Lock() + defer pool.mu.Unlock() + txs := make([]*types.Transaction, len(pool.all)) + copy(txs, pool.all) + return types.Transactions(txs) +} +func newtx(from *crypto.Key, nonce uint64, datasize int) *types.Transaction { + data := make([]byte, datasize) + tx := types.NewTransactionMessage(common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), data) + tx.SetNonce(nonce) + return tx } -*/ -- cgit v1.2.3 From 8dc3048f6556e3fb2f719f383834332656b4c8fe Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 13:13:21 +0200 Subject: eth/downloader: fix hash fetch timeout handling Fixes #1206 --- eth/downloader/downloader.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 92cb1a650..29b627771 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -263,23 +263,29 @@ func (d *Downloader) Cancel() bool { // XXX Make synchronous func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { - glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) - - start := time.Now() - - // Add the hash to the queue first, and start hash retrieval - d.queue.Insert([]common.Hash{h}) - p.getHashes(h) - var ( + start = time.Now() active = p // active peer will help determine the current active peer head = common.Hash{} // common and last hash - timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer + timeout = time.NewTimer(0) // timer to dump a non-responsive active peer attempted = make(map[string]bool) // attempted peers will help with retries crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) defer crossTicker.Stop() + defer timeout.Stop() + + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) + <-timeout.C // timeout channel should be initially empty. + + getHashes := func(from common.Hash) { + active.getHashes(from) + timeout.Reset(hashTTL) + } + + // Add the hash to the queue, and start hash retrieval. + d.queue.Insert([]common.Hash{h}) + getHashes(h) attempted[p.id] = true for finished := false; !finished; { @@ -293,7 +299,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } - timeout.Reset(hashTTL) + timeout.Stop() // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { @@ -345,7 +351,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { active.getBlocks([]common.Hash{origin}) // Also fetch a fresh - active.getHashes(head) + getHashes(head) continue } // We're done, prepare the download cache and proceed pulling the blocks @@ -399,7 +405,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. active = p - p.getHashes(head) + getHashes(head) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } -- cgit v1.2.3 From 73c355591fe0279334675c555b6d614aa25b6781 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 17:03:07 +0200 Subject: core, eth: document that result of GetTransactions is modifiable --- eth/protocol.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/protocol.go b/eth/protocol.go index 9ccf2cb60..57805d9bd 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -57,10 +57,12 @@ var errorToString = map[int]string{ ErrSuspendedPeer: "Suspended peer", } -// backend is the interface the ethereum protocol backend should implement -// used as an argument to EthProtocol type txPool interface { + // AddTransactions should add the given transactions to the pool. AddTransactions([]*types.Transaction) + + // GetTransactions should return pending transactions. + // The slice should be modifiable by the caller. GetTransactions() types.Transactions } -- cgit v1.2.3