aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-02-14 14:00:34 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:57 +0800
commitb37c4f950d1005901e874eabcebeb5baf7797cfd (patch)
treea4c424aa74b8bcfd5730e9c9c85436c078f32879 /dex/handler.go
parentb3622c655271c594d19c78eb92b26c1f539ea31b (diff)
downloaddexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar.gz
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar.bz2
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar.lz
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar.xz
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.tar.zst
dexon-b37c4f950d1005901e874eabcebeb5baf7797cfd.zip
dex: some minor improvements (#195)
* dex: improve some msg propagation * dex: support send a batch of lattice blocks, votes, randomnesses To reduce msgs number of PullBlocks, PullVotes, PullRandomness * dex: minor improvement
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go84
1 files changed, 38 insertions, 46 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 71962b865..f56c6f5dc 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -40,7 +40,6 @@ import (
"fmt"
"math"
"math/rand"
- "net"
"sync"
"sync/atomic"
"time"
@@ -794,24 +793,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if !pm.isBlockProposer {
break
}
- var block coreTypes.Block
- if err := msg.Decode(&block); err != nil {
+ var blocks []*coreTypes.Block
+ if err := msg.Decode(&blocks); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- pm.cache.addBlock(&block)
- pm.receiveCh <- &block
+ for _, block := range blocks {
+ pm.cache.addBlock(block)
+ pm.receiveCh <- block
+ }
case msg.Code == VoteMsg:
if !pm.isBlockProposer {
break
}
- var vote coreTypes.Vote
- if err := msg.Decode(&vote); err != nil {
+ var votes []*coreTypes.Vote
+ if err := msg.Decode(&votes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- if vote.Type >= coreTypes.VotePreCom {
- pm.cache.addVote(&vote)
+ for _, vote := range votes {
+ if vote.Type >= coreTypes.VotePreCom {
+ pm.cache.addVote(vote)
+ }
+ pm.receiveCh <- vote
}
- pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
if !pm.isBlockProposer {
break
@@ -821,17 +824,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&agreement); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
if !pm.isBlockProposer {
break
}
// Broadcast this to all peer
- var randomness coreTypes.BlockRandomnessResult
- if err := msg.Decode(&randomness); err != nil {
+ var randomnesses []*coreTypes.BlockRandomnessResult
+ if err := msg.Decode(&randomnesses); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- pm.receiveCh <- &randomness
+ for _, randomness := range randomnesses {
+ p.MarkRandomness(rlpHash(randomness))
+ pm.receiveCh <- randomness
+ }
case msg.Code == DKGPrivateShareMsg:
if !pm.isBlockProposer {
break
@@ -841,6 +848,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&ps); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ p.MarkDKGPrivateShares(rlpHash(ps))
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
if !pm.isBlockProposer {
@@ -862,11 +870,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
blocks := pm.cache.blocks(hashes)
log.Debug("Push blocks", "blocks", blocks)
- for _, block := range blocks {
- if err := p.SendLatticeBlock(block); err != nil {
- return err
- }
- }
+ return p.SendLatticeBlocks(blocks)
case msg.Code == PullVotesMsg:
if !pm.isBlockProposer {
break
@@ -885,11 +889,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
votes := pm.cache.votes(pos)
log.Debug("Push votes", "votes", votes)
- for _, vote := range votes {
- if err := p.SendVote(vote); err != nil {
- return err
- }
- }
+ return p.SendVotes(votes)
case msg.Code == PullRandomnessMsg:
if !pm.isBlockProposer {
break
@@ -898,13 +898,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- randomness := pm.cache.randomness(hashes)
- log.Debug("Push randomness", "randomness", randomness)
- for _, randomness := range randomness {
- if err := p.SendRandomness(randomness); err != nil {
- return err
- }
- }
+ randomnesses := pm.cache.randomness(hashes)
+ log.Debug("Push randomness", "randomness", randomnesses)
+ return p.SendRandomnesses(randomnesses)
case msg.Code == GetGovStateMsg:
var hash common.Hash
if err := msg.Decode(&hash); err != nil {
@@ -995,12 +991,11 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
}
}
-// TODO(sonic): block size is big, try not to send to all peers
-// to reduce traffic
+// BroadcastLatticeBlock broadcasts the lattice block to all its peers.
func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
pm.cache.addBlock(block)
- for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) {
- peer.AsyncSendLatticeBlock(block)
+ for _, peer := range pm.peers.Peers() {
+ peer.AsyncSendLatticeBlocks([]*coreTypes.Block{block})
}
}
@@ -1014,11 +1009,8 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
chainID: vote.Position.ChainID,
round: vote.Position.Round,
}
- h := rlpHash(vote)
for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownVotes.Contains(h) {
- peer.AsyncSendVote(vote)
- }
+ peer.AsyncSendVotes([]*coreTypes.Vote{vote})
}
}
@@ -1050,15 +1042,16 @@ func (pm *ProtocolManager) BroadcastRandomnessResult(
chainID: randomness.Position.ChainID,
round: randomness.Position.Round,
}
+ randomnesses := []*coreTypes.BlockRandomnessResult{randomness}
for _, peer := range pm.peers.PeersWithLabel(label) {
if !peer.knownRandomnesses.Contains(rlpHash(randomness)) {
- peer.AsyncSendRandomness(randomness)
+ peer.AsyncSendRandomnesses(randomnesses)
}
}
// TODO(sonic): send to some of other nodes (gossip)
for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
- peer.AsyncSendRandomness(randomness)
+ peer.AsyncSendRandomnesses(randomnesses)
}
}
@@ -1069,12 +1062,13 @@ func (pm *ProtocolManager) SendDKGPrivateShare(
if err != nil {
panic(err)
}
- n := enode.NewV4(pk, net.IP{}, 0, 0)
- if p := pm.peers.Peer(n.ID().String()); p != nil {
+ id := enode.PubkeyToIDV4(pk)
+
+ if p := pm.peers.Peer(id.String()); p != nil {
p.AsyncSendDKGPrivateShare(privateShare)
} else {
- log.Error("Failed to send DKG private share", "publicKey", n.ID().String())
+ log.Error("Failed to send DKG private share", "publicKey", id.String())
}
}
@@ -1092,9 +1086,7 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature(
psig *dkgTypes.PartialSignature) {
label := peerLabel{set: dkgset, round: psig.Round}
for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownDKGPartialSignatures.Contains(rlpHash(psig)) {
- peer.AsyncSendDKGPartialSignature(psig)
- }
+ peer.AsyncSendDKGPartialSignature(psig)
}
}