aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/handler.go53
-rw-r--r--dex/network.go7
-rw-r--r--dex/protocol_test.go8
3 files changed, 55 insertions, 13 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 398a8443a..8e5898817 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -134,7 +134,8 @@ type ProtocolManager struct {
chainHeadSub event.Subscription
// channels for dexon consensus core
- receiveCh chan interface{}
+ receiveCh chan coreTypes.Msg
+ reportBadPeerChan chan interface{}
receiveCoreMessage int32
srvr p2pServer
@@ -176,7 +177,8 @@ func NewProtocolManager(
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
+ receiveCh: make(chan coreTypes.Msg, 1024),
+ reportBadPeerChan: make(chan interface{}, 128),
receiveCoreMessage: 0,
isBlockProposer: isBlockProposer,
app: app,
@@ -299,6 +301,9 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+
+ // Listen to bad peer and disconnect it.
+ go pm.badPeerWatchLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -330,10 +335,26 @@ func (pm *ProtocolManager) Stop() {
log.Info("DEXON protocol stopped")
}
-func (pm *ProtocolManager) ReceiveChan() <-chan interface{} {
+func (pm *ProtocolManager) ReceiveChan() <-chan coreTypes.Msg {
return pm.receiveCh
}
+func (pm *ProtocolManager) ReportBadPeerChan() chan<- interface{} {
+ return pm.reportBadPeerChan
+}
+
+func (pm *ProtocolManager) badPeerWatchLoop() {
+ for {
+ select {
+ case id := <-pm.reportBadPeerChan:
+ log.Debug("Bad peer detected, removing", "id", id.(string))
+ pm.removePeer(id.(string))
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
@@ -382,6 +403,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
return err
}
}
+
// Handle incoming messages until the connection is torn down
for {
if err := pm.handleMsg(p); err != nil {
@@ -832,7 +854,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.cache.addBlocks(blocks)
for _, block := range blocks {
- pm.receiveCh <- block
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: block,
+ }
}
case msg.Code == VoteMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
@@ -846,7 +871,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if vote.Type >= coreTypes.VotePreCom {
pm.cache.addVote(vote)
}
- pm.receiveCh <- vote
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: vote,
+ }
}
case msg.Code == AgreementMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
@@ -864,7 +892,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
block[0].Randomness = agreement.Randomness
pm.cache.addFinalizedBlock(block[0])
}
- pm.receiveCh <- &agreement
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &agreement,
+ }
case msg.Code == DKGPrivateShareMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -875,7 +906,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.MarkDKGPrivateShares(rlpHash(ps))
- pm.receiveCh <- &ps
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &ps,
+ }
case msg.Code == DKGPartialSignatureMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -885,7 +919,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&psig); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- pm.receiveCh <- &psig
+ pm.receiveCh <- coreTypes.Msg{
+ PeerID: p.ID().String(),
+ Payload: &psig,
+ }
case msg.Code == PullBlocksMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
diff --git a/dex/network.go b/dex/network.go
index 0e2d338c1..ab676a349 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -84,6 +84,11 @@ func (n *DexconNetwork) BroadcastAgreementResult(result *types.AgreementResult)
}
// ReceiveChan returns a channel to receive messages from DEXON network.
-func (n *DexconNetwork) ReceiveChan() <-chan interface{} {
+func (n *DexconNetwork) ReceiveChan() <-chan types.Msg {
return n.pm.ReceiveChan()
}
+
+// ReportBadPeerChan returns a channel to receive messages from DEXON network.
+func (n *DexconNetwork) ReportBadPeerChan() chan<- interface{} {
+ return n.pm.ReportBadPeerChan()
+}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 3ed93c061..81550d918 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -271,7 +271,7 @@ func TestRecvCoreBlocks(t *testing.T) {
ch := pm.ReceiveChan()
select {
case msg := <-ch:
- rb := msg.(*coreTypes.Block)
+ rb := msg.Payload.(*coreTypes.Block)
if !reflect.DeepEqual(rb, &block) {
t.Errorf("block mismatch")
}
@@ -416,7 +416,7 @@ func TestRecvVotes(t *testing.T) {
select {
case msg := <-ch:
- rvote := msg.(*coreTypes.Vote)
+ rvote := msg.Payload.(*coreTypes.Vote)
if !reflect.DeepEqual(rvote, &vote) {
t.Errorf("vote mismatch")
}
@@ -557,7 +557,7 @@ func TestRecvDKGPrivateShare(t *testing.T) {
ch := pm.ReceiveChan()
select {
case msg := <-ch:
- rps := msg.(*dkgTypes.PrivateShare)
+ rps := msg.Payload.(*dkgTypes.PrivateShare)
if !reflect.DeepEqual(rps, &privateShare) {
t.Errorf("vote mismatch")
}
@@ -657,7 +657,7 @@ func TestRecvAgreement(t *testing.T) {
ch := pm.ReceiveChan()
select {
case msg := <-ch:
- a := msg.(*coreTypes.AgreementResult)
+ a := msg.Payload.(*coreTypes.AgreementResult)
if !reflect.DeepEqual(a, &agreement) {
t.Errorf("agreement mismatch")
}