diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 147 |
1 files changed, 136 insertions, 11 deletions
diff --git a/eth/handler.go b/eth/handler.go index 622f22132..d00d00f23 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -39,17 +39,24 @@ import ( "math" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" ) +const ( + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -77,16 +84,26 @@ type ProtocolManager struct { peers map[string]*peer SubProtocol p2p.Protocol + + eventMux *event.TypeMux + txSub event.Subscription + minedBlockSub event.Subscription + + newPeerCh chan *peer + quitSync chan struct{} } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { manager := &ProtocolManager{ + eventMux: mux, txpool: txpool, chainman: chainman, downloader: downloader, peers: make(map[string]*peer), + newPeerCh: make(chan *peer, 1), + quitSync: make(chan struct{}), } manager.SubProtocol = p2p.Protocol{ @@ -95,16 +112,86 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman Length: ProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) - err := manager.handle(peer) - //glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err) - return err + manager.newPeerCh <- peer + + return manager.handle(peer) }, } return manager } +func (pm *ProtocolManager) syncHandler() { + // itimer is used to determine when to start ignoring `minDesiredPeerCount` + itimer := time.NewTimer(peerCountTimeout) +out: + for { + select { + case <-pm.newPeerCh: + // Meet the `minDesiredPeerCount` before we select our best peer + if len(pm.peers) < minDesiredPeerCount { + break + } + + // Find the best peer + peer := getBestPeer(pm.peers) + if peer == nil { + glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") + } + + itimer.Stop() + go pm.synchronise(peer) + case <-itimer.C: + // The timer will make sure that the downloader keeps an active state + // in which it attempts to always check the network for highest td peers + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := getBestPeer(pm.peers); peer != nil { + go pm.synchronise(peer) + } else { + itimer.Reset(5 * time.Second) + } + case <-pm.quitSync: + break out + } + } +} + +func (pm *ProtocolManager) synchronise(peer *peer) { + // Make sure the peer's TD is higher than our own. If not drop. + if peer.td.Cmp(pm.chainman.Td()) <= 0 { + return + } + + glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td) + // Get the hashes from the peer (synchronously) + err := pm.downloader.Synchronise(peer.id, peer.recentHash) + if err != nil { + // handle error + glog.V(logger.Debug).Infoln("error downloading:", err) + } +} + +func (pm *ProtocolManager) Start() { + // broadcast transactions + pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() + + // sync handler + go pm.syncHandler() +} + +func (pm *ProtocolManager) Stop() { + pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + close(pm.quitSync) // quits the sync handler +} + func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { td, current, genesis := pm.chainman.Status() @@ -120,7 +207,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.peers[p.id] = p pm.pmu.Unlock() - pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks) + pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { pm.pmu.Lock() defer pm.pmu.Unlock() @@ -255,7 +342,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "block validation %v: %v", msg, err) } hash := request.Block.Hash() - // Add the block hash as a known hash to the peer. This will later be used to detirmine + // Add the block hash as a known hash to the peer. This will later be used to determine // who should receive this. p.blockHashes.Add(hash) @@ -275,7 +362,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if self.chainman.HasBlock(hash) { break } - /* XXX unsure about this */ if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD) break @@ -284,24 +370,22 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Attempt to insert the newly received by checking if the parent exists. // if the parent exists we process the block and propagate to our peers // if the parent does not exists we delegate to the downloader. - // NOTE we can reduce chatter by dropping blocks with Td < currentTd if self.chainman.HasBlock(request.Block.ParentHash()) { if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { // handle error return nil } self.BroadcastBlock(hash, request.Block) - //fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD) } else { // adding blocks is synchronous go func() { + // TODO check parent error err := self.downloader.AddBlock(p.id, request.Block, request.TD) if err != nil { glog.V(logger.Detail).Infoln("downloader err:", err) return } self.BroadcastBlock(hash, request.Block) - //fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD) }() } default: @@ -326,10 +410,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) } } // Broadcast block to peer set - // XXX due to the current shit state of the network disable the limit peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) } glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") } + +// BroadcastTx will propagate the block to its connected peers. It will sort +// out which peers do not contain the block in their block set and will do a +// sqrt(peers) to determine the amount of peers we broadcast to. +func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + + // Find peers who don't know anything about the given hash. Peers that + // don't know about the hash will be a candidate for the broadcast loop + var peers []*peer + for _, peer := range pm.peers { + if !peer.txHashes.Has(hash) { + peers = append(peers, peer) + } + } + // Broadcast block to peer set + peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.sendTransaction(tx) + } + glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") +} + +// Mined broadcast loop +func (self *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.minedBlockSub.Chan() { + switch ev := obj.(type) { + case core.NewMinedBlockEvent: + self.BroadcastBlock(ev.Block.Hash(), ev.Block) + } + } +} + +func (self *ProtocolManager) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.BroadcastTx(event.Tx.Hash(), event.Tx) + } +} |