aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go53
1 files changed, 45 insertions, 8 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 398a8443a..8e5898817 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -134,7 +134,8 @@ type ProtocolManager struct {
chainHeadSub event.Subscription
// channels for dexon consensus core
- receiveCh chan interface{}
+ receiveCh chan coreTypes.Msg
+ reportBadPeerChan chan interface{}
receiveCoreMessage int32
srvr p2pServer
@@ -176,7 +177,8 @@ func NewProtocolManager(
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
+ receiveCh: make(chan coreTypes.Msg, 1024),
+ reportBadPeerChan: make(chan interface{}, 128),
receiveCoreMessage: 0,
isBlockProposer: isBlockProposer,
app: app,
@@ -299,6 +301,9 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+
+ // Listen to bad peer and disconnect it.
+ go pm.badPeerWatchLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -330,10 +335,26 @@ func (pm *ProtocolManager) Stop() {
log.Info("DEXON protocol stopped")
}
-func (pm *ProtocolManager) ReceiveChan() <-chan interface{} {
+func (pm *ProtocolManager) ReceiveChan() <-chan coreTypes.Msg {
return pm.receiveCh
}
+func (pm *ProtocolManager) ReportBadPeerChan() chan<- interface{} {
+ return pm.reportBadPeerChan
+}
+
+func (pm *ProtocolManager) badPeerWatchLoop() {
+ for {
+ select {
+ case id := <-pm.reportBadPeerChan:
+ log.Debug("Bad peer detected, removing", "id", id.(string))
+ pm.removePeer(id.(string))
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
@@ -382,6 +403,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
return err
}
}
+
// Handle incoming messages until the connection is torn down
for {
if err := pm.handleMsg(p); err != nil {
@@ -832,7 +854,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.cache.addBlocks(blocks)
for _, block := range blocks {
- pm.receiveCh <- block
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: block,
+ }
}
case msg.Code == VoteMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
@@ -846,7 +871,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if vote.Type >= coreTypes.VotePreCom {
pm.cache.addVote(vote)
}
- pm.receiveCh <- vote
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: vote,
+ }
}
case msg.Code == AgreementMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
@@ -864,7 +892,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
block[0].Randomness = agreement.Randomness
pm.cache.addFinalizedBlock(block[0])
}
- pm.receiveCh <- &agreement
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &agreement,
+ }
case msg.Code == DKGPrivateShareMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -875,7 +906,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.MarkDKGPrivateShares(rlpHash(ps))
- pm.receiveCh <- &ps
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &ps,
+ }
case msg.Code == DKGPartialSignatureMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -885,7 +919,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&psig); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- pm.receiveCh <- &psig
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &psig,
+ }
case msg.Code == PullBlocksMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break