From 63c6cedb14cbd461733e33ffac016dc7d8e431ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 14:06:36 +0300 Subject: eth/downloader: cap the hash ban set, add test for it --- eth/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index aea33452c..37bbd3691 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -213,8 +213,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "->msg %v: %v", msg, err) } - if request.Amount > downloader.MaxHashFetch { - request.Amount = downloader.MaxHashFetch + if request.Amount > uint64(downloader.MaxHashFetch) { + request.Amount = uint64(downloader.MaxHashFetch) } hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) -- cgit v1.2.3 From 6f415b96b3b8581e810a8f40f596d2d213681e54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 18:46:07 +0300 Subject: eth: implement the NewBlockHashes protocol proposal --- eth/handler.go | 180 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 118 insertions(+), 62 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index aea33452c..63ebc4bdd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -2,6 +2,7 @@ package eth import ( "fmt" + "math" "math/big" "sync" "time" @@ -20,6 +21,7 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 ) @@ -186,7 +188,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() switch msg.Code { - case GetTxMsg: // ignore case StatusMsg: return errResp(ErrExtraStatusMsg, "uncontrolled status message") @@ -227,6 +228,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // returns either requested hashes or nothing (i.e. not found) return p.sendBlockHashes(hashes) + case BlockHashesMsg: msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -266,6 +268,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } return p.sendBlocks(blocks) + case BlocksMsg: var blocks []*types.Block @@ -274,7 +277,57 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - self.downloader.DeliverBlocks(p.id, blocks) + + // Either deliver to the downloader or the importer + if self.downloader.Synchronising() { + self.downloader.DeliverBlocks(p.id, blocks) + } else { + for _, block := range blocks { + if err := self.importBlock(p, block, nil); err != nil { + return err + } + } + } + + case NewBlockHashesMsg: + // Retrieve and deseralize the remote new block hashes notification + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var hashes []common.Hash + if err := msgStream.Decode(&hashes); err != nil { + break + } + // Mark the hashes as present at the remote node + for _, hash := range hashes { + p.blockHashes.Add(hash) + p.recentHash = hash + } + // Wait a bit for potentially receiving the blocks, fetch if not + go func() { + time.Sleep(blockArrivalTimeout) + + // Drop all the hashes that are already known + unknown := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !self.chainman.HasBlock(hash) { + unknown = append(unknown, hash) + } + } + if len(unknown) == 0 { + return + } + // Retrieve all the unknown hashes + if err := p.requestBlocks(unknown); err != nil { + glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err) + } + if glog.V(logger.Detail) { + hashes := make([]string, len(unknown)) + for i, hash := range unknown { + hashes[i] = fmt.Sprintf("%x", hash[:4]) + } + glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes) + } + }() case NewBlockMsg: var request newBlockMsgData @@ -286,83 +339,86 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } request.Block.ReceivedAt = msg.ReceivedAt - hash := request.Block.Hash() - // Add the block hash as a known hash to the peer. This will later be used to determine - // who should receive this. - p.blockHashes.Add(hash) - // update the peer info - p.recentHash = hash - p.td = request.TD - - _, chainHead, _ := self.chainman.Status() - - jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ - BlockHash: hash.Hex(), - BlockNumber: request.Block.Number(), // this surely must be zero - ChainHeadHash: chainHead.Hex(), - BlockPrevHash: request.Block.ParentHash().Hex(), - RemoteId: p.ID().String(), - }) - - // Make sure the block isn't already known. If this is the case simply drop - // the message and move on. If the TD is < currentTd; drop it as well. If this - // chain at some point becomes canonical, the downloader will fetch it. - if self.chainman.HasBlock(hash) { - break - } - if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { - glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD) - break + if err := self.importBlock(p, request.Block, request.TD); err != nil { + return err } - // Attempt to insert the newly received by checking if the parent exists. - // if the parent exists we process the block and propagate to our peers - // otherwise synchronize with the peer - if self.chainman.HasBlock(request.Block.ParentHash()) { - if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { - glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") - - self.removePeer(p.id) - - return nil - } - - if err := self.verifyTd(p, request); err != nil { - glog.V(logger.Error).Infoln(err) - // XXX for now return nil so it won't disconnect (we should in the future) - return nil - } - self.BroadcastBlock(hash, request.Block) - } else { - go self.synchronise(p) - } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } return nil } -func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { - if request.Block.Td.Cmp(request.TD) != 0 { - glog.V(logger.Detail).Infoln(peer) +// importBlocks injects a new block retrieved from the given peer into the chain +// manager. +func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) error { + hash := block.Hash() - return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD) + // Mark the block as present at the remote node (don't duplicate already held data) + p.blockHashes.Add(hash) + p.recentHash = hash + if td != nil { + p.td = td + } + // Log the block's arrival + _, chainHead, _ := pm.chainman.Status() + jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ + BlockHash: hash.Hex(), + BlockNumber: block.Number(), // this surely must be zero + ChainHeadHash: chainHead.Hex(), + BlockPrevHash: block.ParentHash().Hex(), + RemoteId: p.ID().String(), + }) + // If the block's already known or its difficulty is lower than ours, drop + if pm.chainman.HasBlock(hash) { + p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value + return nil + } + if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 { + glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, block.Number(), td) + return nil + } + // Attempt to insert the newly received block and propagate to our peers + if pm.chainman.HasBlock(block.ParentHash()) { + if _, err := pm.chainman.InsertChain(types.Blocks{block}); err != nil { + glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error", err) + return err + } + if td != nil && block.Td.Cmp(td) != 0 { + err := fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", block.Number(), p.id, block.Td, td) + glog.V(logger.Error).Infoln(err) + return err + } + pm.BroadcastBlock(hash, block) + return nil + } + // Parent of the block is unknown, try to sync with this peer if it seems to be good + if td != nil { + go pm.synchronise(p) } - return nil } -// BroadcastBlock will propagate the block to its connected peers. It will sort -// out which peers do not contain the block in their block set and will do a -// sqrt(peers) to determine the amount of peers we broadcast to. +// BroadcastBlock will propagate the block to a subset of its connected peers, +// only notifying the rest of the block's appearance. func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { - // Broadcast block to a batch of peers not knowing about it + // Retrieve all the target peers and split between full broadcast or only notification peers := pm.peers.PeersWithoutBlock(hash) - //peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { + split := int(math.Sqrt(float64(len(peers)))) + + transfer := peers[:split] + nofity := peers[split:] + + // Send out the data transfers and the notifications + for _, peer := range nofity { + peer.sendNewBlockHashes([]common.Hash{hash}) + } + glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.") + + for _, peer := range transfer { peer.sendNewBlock(block) } - glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt)) + glog.V(logger.Detail).Infoln("broadcast block to", len(transfer), "peers. Total processing time:", time.Since(block.ReceivedAt)) } // BroadcastTx will propagate the block to its connected peers. It will sort -- cgit v1.2.3 From fdccce781e94819ec9dc13ef6540a33efd3b26c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 19:24:56 +0300 Subject: eth: fetch announced hashes from origin, periodically --- eth/handler.go | 54 +++++++++++++++++++++++++----------------------------- 1 file changed, 25 insertions(+), 29 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 63ebc4bdd..7e9ec593a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -21,7 +21,8 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 ) @@ -57,6 +58,7 @@ type ProtocolManager struct { minedBlockSub event.Subscription newPeerCh chan *peer + newHashCh chan []*blockAnnounce quitSync chan struct{} // wait group is used for graceful shutdowns during downloading // and processing @@ -74,6 +76,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo downloader: downloader, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), + newHashCh: make(chan []*blockAnnounce, 1), quitSync: make(chan struct{}), } @@ -121,7 +124,8 @@ func (pm *ProtocolManager) Start() { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - go pm.update() + go pm.syncer() + go pm.fetcher() } func (pm *ProtocolManager) Stop() { @@ -302,32 +306,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error { p.blockHashes.Add(hash) p.recentHash = hash } - // Wait a bit for potentially receiving the blocks, fetch if not - go func() { - time.Sleep(blockArrivalTimeout) - - // Drop all the hashes that are already known - unknown := make([]common.Hash, 0, len(hashes)) - for _, hash := range hashes { - if !self.chainman.HasBlock(hash) { - unknown = append(unknown, hash) - } - } - if len(unknown) == 0 { - return - } - // Retrieve all the unknown hashes - if err := p.requestBlocks(unknown); err != nil { - glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err) + // Schedule all the unknown hashes for retrieval + unknown := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !self.chainman.HasBlock(hash) { + unknown = append(unknown, hash) } - if glog.V(logger.Detail) { - hashes := make([]string, len(unknown)) - for i, hash := range unknown { - hashes[i] = fmt.Sprintf("%x", hash[:4]) - } - glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes) + } + announces := make([]*blockAnnounce, len(unknown)) + for i, hash := range unknown { + announces[i] = &blockAnnounce{ + hash: hash, + peer: p, + time: time.Now(), } - }() + } + if len(announces) > 0 { + self.newHashCh <- announces + } case NewBlockMsg: var request newBlockMsgData @@ -407,13 +403,13 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) split := int(math.Sqrt(float64(len(peers)))) transfer := peers[:split] - nofity := peers[split:] + notify := peers[split:] // Send out the data transfers and the notifications - for _, peer := range nofity { + for _, peer := range notify { peer.sendNewBlockHashes([]common.Hash{hash}) } - glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.") + glog.V(logger.Detail).Infoln("broadcast hash to", len(notify), "peers.") for _, peer := range transfer { peer.sendNewBlock(block) -- cgit v1.2.3 From 9ed166c196b07047299579e5ea2b6ece26aec5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 20:38:39 +0300 Subject: eth: split and handle explicitly vs. download requested blocks --- eth/handler.go | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 7e9ec593a..acc16812a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,15 +18,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 -) - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -57,9 +48,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - quitSync chan struct{} + newPeerCh chan *peer + newHashCh chan []*blockAnnounce + newBlockCh chan chan []*types.Block + quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), + newBlockCh: make(chan chan []*types.Block), quitSync: make(chan struct{}), } @@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return p.sendBlocks(blocks) case BlocksMsg: - var blocks []*types.Block - + // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - - // Either deliver to the downloader or the importer - if self.downloader.Synchronising() { - self.downloader.DeliverBlocks(p.id, blocks) - } else { - for _, block := range blocks { - if err := self.importBlock(p, block, nil); err != nil { - return err + // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) + filter := make(chan []*types.Block) + select { + case <-self.quitSync: + case self.newBlockCh <- filter: + select { + case <-self.quitSync: + case filter <- blocks: + select { + case <-self.quitSync: + case blocks := <-filter: + self.downloader.DeliverBlocks(p.id, blocks) } } } @@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } if len(announces) > 0 { - self.newHashCh <- announces + select { + case self.newHashCh <- announces: + case <-self.quitSync: + } } case NewBlockMsg: -- cgit v1.2.3 From 8216bb901c9fbdcde427cc42ca7e82ec3ee2e8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 00:37:10 +0300 Subject: eth: clean up pending announce download map, polish logs --- eth/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index acc16812a..15381b447 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -362,7 +362,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) _, chainHead, _ := pm.chainman.Status() jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ BlockHash: hash.Hex(), - BlockNumber: block.Number(), // this surely must be zero + BlockNumber: block.Number(), ChainHeadHash: chainHead.Hex(), BlockPrevHash: block.ParentHash().Hex(), RemoteId: p.ID().String(), -- cgit v1.2.3 From 44147d057dd91d8b35dd6f4ed025bdb4baf225eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:27:44 +0300 Subject: eth: fix data race accessing peer.recentHash --- eth/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 64f89b273..847e7a0e8 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -157,7 +157,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) - if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } // propagate existing transactions. new transactions appearing @@ -303,7 +303,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Mark the hashes as present at the remote node for _, hash := range hashes { p.blockHashes.Add(hash) - p.recentHash = hash + p.SetHead(hash) } // Schedule all the unknown hashes for retrieval unknown := make([]common.Hash, 0, len(hashes)) @@ -354,7 +354,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) // Mark the block as present at the remote node (don't duplicate already held data) p.blockHashes.Add(hash) - p.recentHash = hash + p.SetHead(hash) if td != nil { p.td = td } -- cgit v1.2.3 From f86707713c42da02b70a3a389684e19e902d8759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:56:27 +0300 Subject: eth: fix data race accessing peer.td --- eth/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 847e7a0e8..f2027c3c6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -356,7 +356,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) p.blockHashes.Add(hash) p.SetHead(hash) if td != nil { - p.td = td + p.SetTd(td) } // Log the block's arrival _, chainHead, _ := pm.chainman.Status() @@ -369,7 +369,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) }) // If the block's already known or its difficulty is lower than ours, drop if pm.chainman.HasBlock(hash) { - p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value + p.SetTd(pm.chainman.GetBlock(hash).Td) // update the peer's TD to the real value return nil } if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 { -- cgit v1.2.3 From 41b2008a669a8454ae19f783eb2dcd967e8752cf Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:00:41 +0200 Subject: eth: limit number of sent blocks based on message size If blocks get larger, sending 256 at once can make messages large enough to exceed the low-level write timeout. --- eth/handler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index f2027c3c6..a67d956fb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +// This is the target maximum size of returned blocks for the +// getBlocks message. The reply message may exceed it +// if a single block is larger than the limit. +const maxBlockRespSize = 2 * 1024 * 1024 + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -246,7 +251,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := msgStream.List(); err != nil { return err } - var i int + var ( + i int + totalsize common.StorageSize + ) for { i++ var hash common.Hash @@ -260,8 +268,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { block := self.chainman.GetBlock(hash) if block != nil { blocks = append(blocks, block) + totalsize += block.Size() } - if i == downloader.MaxBlockFetch { + if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize { break } } -- cgit v1.2.3 From 6c73a5980640581903d8f56b3912b22641d5195c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:03:14 +0200 Subject: eth: limit number of sent transactions based on message size Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth. --- eth/handler.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index a67d956fb..f002727f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -53,9 +53,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer newHashCh chan []*blockAnnounce newBlockCh chan chan []*types.Block + txsyncCh chan *txsync quitSync chan struct{} // wait group is used for graceful shutdowns during downloading @@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), newBlockCh: make(chan chan []*types.Block), + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), @@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // start sync handlers go pm.syncer() go pm.fetcher() + go pm.txsyncLoop() } func (pm *ProtocolManager) Stop() { @@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() { pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits the sync handler + close(pm.quitSync) // quits syncer, fetcher, txsyncLoop // Wait for any process action pm.wg.Wait() @@ -150,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake, short circuit if fails + // Execute the Ethereum handshake. if err := p.handleStatus(); err != nil { return err } - // Register the peer locally and in the downloader too + + // Register the peer locally. glog.V(logger.Detail).Infoln("Adding peer", p.id) if err := pm.peers.Register(p); err != nil { glog.V(logger.Error).Infoln("Addition failed:", err) @@ -162,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) + // Register the peer in the downloader. If the downloader + // considers it banned, we disconnect. if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // propagate existing transactions. new transactions appearing + + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. - if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { - return err - } + pm.syncTransactions(p) + // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { -- cgit v1.2.3