diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/handler.go | 53 | ||||
-rw-r--r-- | dex/network.go | 7 | ||||
-rw-r--r-- | dex/protocol_test.go | 8 |
3 files changed, 55 insertions, 13 deletions
diff --git a/dex/handler.go b/dex/handler.go index 398a8443a..8e5898817 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -134,7 +134,8 @@ type ProtocolManager struct { chainHeadSub event.Subscription // channels for dexon consensus core - receiveCh chan interface{} + receiveCh chan coreTypes.Msg + reportBadPeerChan chan interface{} receiveCoreMessage int32 srvr p2pServer @@ -176,7 +177,8 @@ func NewProtocolManager( noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + receiveCh: make(chan coreTypes.Msg, 1024), + reportBadPeerChan: make(chan interface{}, 128), receiveCoreMessage: 0, isBlockProposer: isBlockProposer, app: app, @@ -299,6 +301,9 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() + + // Listen to bad peer and disconnect it. + go pm.badPeerWatchLoop() } func (pm *ProtocolManager) Stop() { @@ -330,10 +335,26 @@ func (pm *ProtocolManager) Stop() { log.Info("DEXON protocol stopped") } -func (pm *ProtocolManager) ReceiveChan() <-chan interface{} { +func (pm *ProtocolManager) ReceiveChan() <-chan coreTypes.Msg { return pm.receiveCh } +func (pm *ProtocolManager) ReportBadPeerChan() chan<- interface{} { + return pm.reportBadPeerChan +} + +func (pm *ProtocolManager) badPeerWatchLoop() { + for { + select { + case id := <-pm.reportBadPeerChan: + log.Debug("Bad peer detected, removing", "id", id.(string)) + pm.removePeer(id.(string)) + case <-pm.quitSync: + return + } + } +} + func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return newPeer(pv, p, newMeteredMsgWriter(rw)) } @@ -382,6 +403,7 @@ func (pm *ProtocolManager) handle(p *peer) error { return err } } + // Handle incoming messages until the connection is torn down for { if err := pm.handleMsg(p); err != nil { @@ -832,7 +854,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.cache.addBlocks(blocks) for _, block := range blocks { - pm.receiveCh <- block + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: block, + } } case msg.Code == VoteMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -846,7 +871,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if vote.Type >= coreTypes.VotePreCom { pm.cache.addVote(vote) } - pm.receiveCh <- vote + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: vote, + } } case msg.Code == AgreementMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -864,7 +892,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { block[0].Randomness = agreement.Randomness pm.cache.addFinalizedBlock(block[0]) } - pm.receiveCh <- &agreement + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &agreement, + } case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -875,7 +906,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.MarkDKGPrivateShares(rlpHash(ps)) - pm.receiveCh <- &ps + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &ps, + } case msg.Code == DKGPartialSignatureMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -885,7 +919,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&psig); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - pm.receiveCh <- &psig + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &psig, + } case msg.Code == PullBlocksMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break diff --git a/dex/network.go b/dex/network.go index 0e2d338c1..ab676a349 100644 --- a/dex/network.go +++ b/dex/network.go @@ -84,6 +84,11 @@ func (n *DexconNetwork) BroadcastAgreementResult(result *types.AgreementResult) } // ReceiveChan returns a channel to receive messages from DEXON network. -func (n *DexconNetwork) ReceiveChan() <-chan interface{} { +func (n *DexconNetwork) ReceiveChan() <-chan types.Msg { return n.pm.ReceiveChan() } + +// ReportBadPeerChan returns a channel to receive messages from DEXON network. +func (n *DexconNetwork) ReportBadPeerChan() chan<- interface{} { + return n.pm.ReportBadPeerChan() +} diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 3ed93c061..81550d918 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -271,7 +271,7 @@ func TestRecvCoreBlocks(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - rb := msg.(*coreTypes.Block) + rb := msg.Payload.(*coreTypes.Block) if !reflect.DeepEqual(rb, &block) { t.Errorf("block mismatch") } @@ -416,7 +416,7 @@ func TestRecvVotes(t *testing.T) { select { case msg := <-ch: - rvote := msg.(*coreTypes.Vote) + rvote := msg.Payload.(*coreTypes.Vote) if !reflect.DeepEqual(rvote, &vote) { t.Errorf("vote mismatch") } @@ -557,7 +557,7 @@ func TestRecvDKGPrivateShare(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - rps := msg.(*dkgTypes.PrivateShare) + rps := msg.Payload.(*dkgTypes.PrivateShare) if !reflect.DeepEqual(rps, &privateShare) { t.Errorf("vote mismatch") } @@ -657,7 +657,7 @@ func TestRecvAgreement(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - a := msg.(*coreTypes.AgreementResult) + a := msg.Payload.(*coreTypes.AgreementResult) if !reflect.DeepEqual(a, &agreement) { t.Errorf("agreement mismatch") } |