diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-10 00:51:09 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-10 00:51:09 +0800 |
commit | 5f341e5db5c2c3c14c9e076959a84e05c6d917f4 (patch) | |
tree | 73e61b8040ae81de2a5108d54dd93bf8dba581ae /eth/handler.go | |
parent | fda49f2b5216c7d5655d19efe54651593874adf8 (diff) | |
parent | 73c355591fe0279334675c555b6d614aa25b6781 (diff) | |
download | dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar.gz dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar.bz2 dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar.lz dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar.xz dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.tar.zst dexon-5f341e5db5c2c3c14c9e076959a84e05c6d917f4.zip |
Merge pull request #1212 from fjl/p2p-eth-block-timeout
eth, p2p: improve write timeouts and behaviour under load
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/eth/handler.go b/eth/handler.go index f2027c3c6..f002727f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +// This is the target maximum size of returned blocks for the +// getBlocks message. The reply message may exceed it +// if a single block is larger than the limit. +const maxBlockRespSize = 2 * 1024 * 1024 + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -48,9 +53,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer newHashCh chan []*blockAnnounce newBlockCh chan chan []*types.Block + txsyncCh chan *txsync quitSync chan struct{} // wait group is used for graceful shutdowns during downloading @@ -71,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), newBlockCh: make(chan chan []*types.Block), + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), @@ -113,13 +120,14 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // start sync handlers go pm.syncer() go pm.fetcher() + go pm.txsyncLoop() } func (pm *ProtocolManager) Stop() { @@ -130,7 +138,7 @@ func (pm *ProtocolManager) Stop() { pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits the sync handler + close(pm.quitSync) // quits syncer, fetcher, txsyncLoop // Wait for any process action pm.wg.Wait() @@ -145,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake, short circuit if fails + // Execute the Ethereum handshake. if err := p.handleStatus(); err != nil { return err } - // Register the peer locally and in the downloader too + + // Register the peer locally. glog.V(logger.Detail).Infoln("Adding peer", p.id) if err := pm.peers.Register(p); err != nil { glog.V(logger.Error).Infoln("Addition failed:", err) @@ -157,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { } defer pm.removePeer(p.id) + // Register the peer in the downloader. If the downloader + // considers it banned, we disconnect. if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { return err } - // propagate existing transactions. new transactions appearing + + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. - if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { - return err - } + pm.syncTransactions(p) + // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { @@ -246,7 +257,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := msgStream.List(); err != nil { return err } - var i int + var ( + i int + totalsize common.StorageSize + ) for { i++ var hash common.Hash @@ -260,8 +274,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { block := self.chainman.GetBlock(hash) if block != nil { blocks = append(blocks, block) + totalsize += block.Size() } - if i == downloader.MaxBlockFetch { + if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize { break } } |