diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 159 |
1 files changed, 50 insertions, 109 deletions
diff --git a/eth/handler.go b/eth/handler.go index fecd71632..1e0663816 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -1,39 +1,5 @@ package eth -// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change -// The idea is that most of the calls within the protocol will become synchronous. -// Block downloading and block processing will be complete seperate processes -/* -# Possible scenarios - -// Synching scenario -// Use the best peer to synchronise -blocks, err := pm.downloader.Synchronise() -if err != nil { - // handle - break -} -pm.chainman.InsertChain(blocks) - -// Receiving block with known parent -if parent_exist { - if err := pm.chainman.InsertChain(block); err != nil { - // handle - break - } - pm.BroadcastBlock(block) -} - -// Receiving block with unknown parent -blocks, err := pm.downloader.SynchroniseWithPeer(peer) -if err != nil { - // handle - break -} -pm.chainman.InsertChain(blocks) - -*/ - import ( "fmt" "math" @@ -54,7 +20,9 @@ import ( const ( peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcTimer = 500 * time.Millisecond + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 ) func errResp(code errCode, format string, v ...interface{}) error { @@ -91,6 +59,10 @@ type ProtocolManager struct { newPeerCh chan *peer quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading + // and processing + wg sync.WaitGroup + quit bool } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -122,60 +94,11 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo return manager } -func (pm *ProtocolManager) syncHandler() { - // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTimer(peerCountTimeout) -out: - for { - select { - case <-pm.newPeerCh: - // Meet the `minDesiredPeerCount` before we select our best peer - if len(pm.peers) < minDesiredPeerCount { - break - } - - // Find the best peer - peer := getBestPeer(pm.peers) - if peer == nil { - glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") - } - - itimer.Stop() - go pm.synchronise(peer) - case <-itimer.C: - // The timer will make sure that the downloader keeps an active state - // in which it attempts to always check the network for highest td peers - // Either select the peer or restart the timer if no peers could - // be selected. - if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronise(peer) - } else { - itimer.Reset(5 * time.Second) - } - case <-pm.quitSync: - break out - } - } -} - -func (pm *ProtocolManager) synchronise(peer *peer) { - // Make sure the peer's TD is higher than our own. If not drop. - if peer.td.Cmp(pm.chainman.Td()) <= 0 { - return - } - // Check downloader if it's busy so it doesn't show the sync message - // for every attempty - if pm.downloader.IsBusy() { - return - } - - glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td) - // Get the hashes from the peer (synchronously) - err := pm.downloader.Synchronise(peer.id, peer.recentHash) - if err != nil { - // handle error - glog.V(logger.Debug).Infoln("error downloading:", err) - } +func (pm *ProtocolManager) removePeer(peer *peer) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + pm.downloader.UnregisterPeer(peer.id) + delete(pm.peers, peer.id) } func (pm *ProtocolManager) Start() { @@ -187,18 +110,26 @@ func (pm *ProtocolManager) Start() { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // sync handler - go pm.syncHandler() + go pm.update() } func (pm *ProtocolManager) Stop() { + // Showing a log message. During download / process this could actually + // take between 5 to 10 seconds and therefor feedback is required. + glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") + + pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop close(pm.quitSync) // quits the sync handler + + // Wait for any process action + pm.wg.Wait() + + glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - td, current, genesis := pm.chainman.Status() return newPeer(pv, nv, genesis, current, td, p, rw) @@ -214,10 +145,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { - pm.pmu.Lock() - defer pm.pmu.Unlock() - delete(pm.peers, p.id) - pm.downloader.UnregisterPeer(p.id) + pm.removePeer(p) }() // propagate existing transactions. new transactions appearing @@ -352,6 +280,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Add the block hash as a known hash to the peer. This will later be used to determine // who should receive this. p.blockHashes.Add(hash) + // update the peer info + p.recentHash = hash + p.td = request.TD _, chainHead, _ := self.chainman.Status() @@ -376,24 +307,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Attempt to insert the newly received by checking if the parent exists. // if the parent exists we process the block and propagate to our peers - // if the parent does not exists we delegate to the downloader. + // otherwise synchronise with the peer if self.chainman.HasBlock(request.Block.ParentHash()) { if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { - // handle error + glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") + + self.removePeer(p) + + return nil + } + + if err := self.verifyTd(p, request); err != nil { + glog.V(logger.Error).Infoln(err) + // XXX for now return nil so it won't disconnect (we should in the future) return nil } self.BroadcastBlock(hash, request.Block) } else { - // adding blocks is synchronous - go func() { - // TODO check parent error - err := self.downloader.AddBlock(p.id, request.Block, request.TD) - if err != nil { - glog.V(logger.Detail).Infoln("downloader err:", err) - return - } - self.BroadcastBlock(hash, request.Block) - }() + go self.synchronise(p) } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -401,6 +332,16 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return nil } +func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { + if request.Block.Td.Cmp(request.TD) != 0 { + glog.V(logger.Detail).Infoln(peer) + + return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD) + } + + return nil +} + // BroadcastBlock will propagate the block to its connected peers. It will sort // out which peers do not contain the block in their block set and will do a // sqrt(peers) to determine the amount of peers we broadcast to. @@ -421,7 +362,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) for _, peer := range peers { peer.sendNewBlock(block) } - glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total propagation time:", time.Since(block.ReceivedAt)) + glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt)) } // BroadcastTx will propagate the block to its connected peers. It will sort |