diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 53 |
1 files changed, 45 insertions, 8 deletions
diff --git a/dex/handler.go b/dex/handler.go index 398a8443a..8e5898817 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -134,7 +134,8 @@ type ProtocolManager struct { chainHeadSub event.Subscription // channels for dexon consensus core - receiveCh chan interface{} + receiveCh chan coreTypes.Msg + reportBadPeerChan chan interface{} receiveCoreMessage int32 srvr p2pServer @@ -176,7 +177,8 @@ func NewProtocolManager( noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + receiveCh: make(chan coreTypes.Msg, 1024), + reportBadPeerChan: make(chan interface{}, 128), receiveCoreMessage: 0, isBlockProposer: isBlockProposer, app: app, @@ -299,6 +301,9 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() + + // Listen to bad peer and disconnect it. + go pm.badPeerWatchLoop() } func (pm *ProtocolManager) Stop() { @@ -330,10 +335,26 @@ func (pm *ProtocolManager) Stop() { log.Info("DEXON protocol stopped") } -func (pm *ProtocolManager) ReceiveChan() <-chan interface{} { +func (pm *ProtocolManager) ReceiveChan() <-chan coreTypes.Msg { return pm.receiveCh } +func (pm *ProtocolManager) ReportBadPeerChan() chan<- interface{} { + return pm.reportBadPeerChan +} + +func (pm *ProtocolManager) badPeerWatchLoop() { + for { + select { + case id := <-pm.reportBadPeerChan: + log.Debug("Bad peer detected, removing", "id", id.(string)) + pm.removePeer(id.(string)) + case <-pm.quitSync: + return + } + } +} + func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return newPeer(pv, p, newMeteredMsgWriter(rw)) } @@ -382,6 +403,7 @@ func (pm *ProtocolManager) handle(p *peer) error { return err } } + // Handle incoming messages until the connection is torn down for { if err := pm.handleMsg(p); err != nil { @@ -832,7 +854,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.cache.addBlocks(blocks) for _, block := range blocks { - pm.receiveCh <- block + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: block, + } } case msg.Code == VoteMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -846,7 +871,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if vote.Type >= coreTypes.VotePreCom { pm.cache.addVote(vote) } - pm.receiveCh <- vote + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: vote, + } } case msg.Code == AgreementMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -864,7 +892,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { block[0].Randomness = agreement.Randomness pm.cache.addFinalizedBlock(block[0]) } - pm.receiveCh <- &agreement + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &agreement, + } case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -875,7 +906,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.MarkDKGPrivateShares(rlpHash(ps)) - pm.receiveCh <- &ps + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &ps, + } case msg.Code == DKGPartialSignatureMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -885,7 +919,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&psig); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - pm.receiveCh <- &psig + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &psig, + } case msg.Code == PullBlocksMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break |