From 41b2008a669a8454ae19f783eb2dcd967e8752cf Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:00:41 +0200 Subject: eth: limit number of sent blocks based on message size If blocks get larger, sending 256 at once can make messages large enough to exceed the low-level write timeout. --- eth/handler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index f2027c3c6..a67d956fb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +// This is the target maximum size of returned blocks for the +// getBlocks message. The reply message may exceed it +// if a single block is larger than the limit. +const maxBlockRespSize = 2 * 1024 * 1024 + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -246,7 +251,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := msgStream.List(); err != nil { return err } - var i int + var ( + i int + totalsize common.StorageSize + ) for { i++ var hash common.Hash @@ -260,8 +268,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { block := self.chainman.GetBlock(hash) if block != nil { blocks = append(blocks, block) + totalsize += block.Size() } - if i == downloader.MaxBlockFetch { + if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize { break } } -- cgit v1.2.3 From 6c73a5980640581903d8f56b3912b22641d5195c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:03:14 +0200 Subject: eth: limit number of sent transactions based on message size Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth. --- eth/handler.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index a67d956fb..f002727f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -53,9 +53,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer newHashCh chan []*blockAnnounce newBlockCh chan chan []*types.Block + txsyncCh chan *txsync quitSync chan struct{} // wait group is used for graceful shutdowns during downloading @@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), newBlockCh: make(chan chan []*types.Block), + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), @@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // start sync handlers go pm.syncer() go pm.fetcher() + go pm.txsyncLoop() } func (pm *ProtocolManager) Stop() { @@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() { pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits the sync handler + close(pm.quitSync) // quits syncer, fetcher, txsyncLoop // Wait for any process action pm.wg.Wait() @@ -150,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake, short circuit if fails + // Execute the Ethereum handshake. if err := p.handleStatus(); err != nil { return err } - // Register the peer locally and in the downloader too + + // Register the peer locally. glog.V(logger.Detail).Infoln("Adding peer", p.id) if err := pm.peers.Register(p); err != nil { glog.V(logger.Error).Infoln("Addition failed:", err) @@ -162,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) + // Register the peer in the downloader. If the downloader + // considers it banned, we disconnect. if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // propagate existing transactions. new transactions appearing + + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. - if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { - return err - } + pm.syncTransactions(p) + // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { -- cgit v1.2.3