aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-17 18:02:52 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-19 20:54:27 +0800
commit5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991 (patch)
tree34450b0d8202e09deee37ecfeb4f4b5c918ce177
parent218f6e00e4af70ccfc5c8823ab4d9da60cbdcfed (diff)
downloaddexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar.gz
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar.bz2
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar.lz
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar.xz
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.tar.zst
dexon-5ceac39dba0a8d4c3228d41a1b3fab8cdbe3d991.zip
dex: polish network related function
-rw-r--r--dex/handler.go88
-rw-r--r--dex/nodetable_test.go17
-rw-r--r--dex/peer.go177
-rw-r--r--dex/protocol_test.go32
4 files changed, 211 insertions, 103 deletions
diff --git a/dex/handler.go b/dex/handler.go
index cdadd2874..35d2901c9 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -823,6 +823,16 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
}
}
+// TODO(sonic): block size is big, try not to send to all peers
+// to reduce traffic
+func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
+ hash := rlpHash(toRLPLatticeBlock(block))
+ for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) {
+ peer.AsyncSendLatticeBlock(block)
+ }
+}
+
+// BroadcastVote broadcasts the given vote to all peers in same notary set
func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
label := peerLabel{
set: notaryset,
@@ -837,53 +847,71 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
}
}
-// TODO(sonic): try to reduce traffic
-func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
- hash := rlpHash(toRLPLatticeBlock(block))
- for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) {
- peer.AsyncSendLatticeBlock(block)
- }
-}
-
-// TODO(sonic): try to reduce traffic
-func (pm *ProtocolManager) SendDKGPrivateShare(
- pub coreCrypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) {
- id := discover.MustBytesID(pub.Bytes()[1:])
- if p := pm.peers.Peer(id.String()); p != nil {
- p.AsyncSendDKGPrivateShare(privateShare)
+func (pm *ProtocolManager) BroadcastAgreementResult(
+ agreement *coreTypes.AgreementResult) {
+ // send to dkg nodes first (direct)
+ label := peerLabel{
+ set: dkgset,
+ round: agreement.Position.Round,
}
-}
-
-// TODO(sonic): try to reduce traffic
-func (pm *ProtocolManager) BroadcastDKGPrivateShare(
- privateShare *coreTypes.DKGPrivateShare) {
- for _, peer := range pm.peers.allPeers() {
- peer.AsyncSendDKGPrivateShare(privateShare)
+ for _, peer := range pm.peers.PeersWithLabel(label) {
+ peer.AsyncSendAgreement(agreement)
}
-}
-// TODO(sonic): try to reduce traffic
-func (pm *ProtocolManager) BroadcastAgreementResult(
- agreement *coreTypes.AgreementResult) {
+ // TODO(sonic): send to some of other nodes (gossip)
for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
peer.AsyncSendAgreement(agreement)
}
}
-// TODO(sonic): try to reduce traffic
func (pm *ProtocolManager) BroadcastRandomnessResult(
randomness *coreTypes.BlockRandomnessResult) {
- // random pick n peers
+ // send to notary nodes first (direct)
+ label := peerLabel{
+ set: notaryset,
+ chainID: randomness.Position.ChainID,
+ round: randomness.Position.Round,
+ }
+ for _, peer := range pm.peers.PeersWithLabel(label) {
+ peer.AsyncSendRandomness(randomness)
+ }
+
+ // TODO(sonic): send to some of other nodes (gossip)
for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
peer.AsyncSendRandomness(randomness)
}
}
-// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) SendDKGPrivateShare(
+ pub coreCrypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) {
+ uncompressedKey, err := crypto.DecompressPubkey(pub.Bytes())
+ if err != nil {
+ log.Error("decompress key fail", "err", err)
+ }
+ id := discover.PubkeyID(uncompressedKey)
+ if p := pm.peers.Peer(id.String()); p != nil {
+ p.AsyncSendDKGPrivateShare(privateShare)
+ }
+}
+
+func (pm *ProtocolManager) BroadcastDKGPrivateShare(
+ privateShare *coreTypes.DKGPrivateShare) {
+ label := peerLabel{set: dkgset, round: privateShare.Round}
+ h := rlpHash(toRLPDKGPrivateShare(privateShare))
+ for _, peer := range pm.peers.PeersWithLabel(label) {
+ if !peer.knownDKGPrivateShares.Contains(h) {
+ peer.AsyncSendDKGPrivateShare(privateShare)
+ }
+ }
+}
+
func (pm *ProtocolManager) BroadcastDKGPartialSignature(
psig *coreTypes.DKGPartialSignature) {
- for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) {
- peer.AsyncSendDKGPartialSignature(psig)
+ label := peerLabel{set: dkgset, round: psig.Round}
+ for _, peer := range pm.peers.PeersWithLabel(label) {
+ if !peer.knownDKGPartialSignatures.Contains(rlpHash(psig)) {
+ peer.AsyncSendDKGPartialSignature(psig)
+ }
}
}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
index 5b3f7de57..2e44eabe5 100644
--- a/dex/nodetable_test.go
+++ b/dex/nodetable_test.go
@@ -1,11 +1,13 @@
package dex
import (
- "math/rand"
+ "crypto/ecdsa"
"testing"
"time"
"github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
)
@@ -85,9 +87,14 @@ func TestNodeTable(t *testing.T) {
}
}
-func randomID() (id enode.ID) {
- for i := range id {
- id[i] = byte(rand.Intn(255))
+func randomID() enode.ID {
+ var err error
+ var privkey *ecdsa.PrivateKey
+ for {
+ privkey, err = crypto.GenerateKey()
+ if err == nil {
+ break
+ }
}
- return id
+ return discover.PubkeyID(&(privkey.PublicKey))
}
diff --git a/dex/peer.go b/dex/peer.go
index 1279a190b..db68ea590 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -46,6 +46,13 @@ const (
maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ maxKnownLatticeBLocks = 2048
+ maxKnownVotes = 2048
+ maxKnownAgreements = 10240
+ maxKnownRandomnesses = 10240
+ maxKnownDKGPrivateShare = 1024 // this related to DKG Size
+ maxKnownDKGPartialSignature = 1024 // this related to DKG Size
+
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
@@ -63,6 +70,13 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
+ maxQueuedLatticeBlocks = 16
+ maxQueuedVotes = 128
+ maxQueuedAgreements = 16
+ maxQueuedRandomnesses = 16
+ maxQueuedDKGPrivateShare = 16
+ maxQueuedDKGParitialSignature = 16
+
handshakeTimeout = 5 * time.Second
groupNodeNum = 3
@@ -107,43 +121,53 @@ type peer struct {
td *big.Int
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownMetas mapset.Set // Set of node metas known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownVotes mapset.Set
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
- queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedLatticeBlock chan *coreTypes.Block
- queuedVote chan *coreTypes.Vote
- queuedAgreement chan *coreTypes.AgreementResult
- queuedRandomness chan *coreTypes.BlockRandomnessResult
- queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare
- queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature
- term chan struct{} // Termination channel to stop the broadcaster
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownLatticeBlocks mapset.Set
+ knownVotes mapset.Set
+ knownAgreements mapset.Set
+ knownRandomnesses mapset.Set
+ knownDKGPrivateShares mapset.Set
+ knownDKGPartialSignatures mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedLatticeBlocks chan *coreTypes.Block
+ queuedVotes chan *coreTypes.Vote
+ queuedAgreements chan *coreTypes.AgreementResult
+ queuedRandomnesses chan *coreTypes.BlockRandomnessResult
+ queuedDKGPrivateShares chan *coreTypes.DKGPrivateShare
+ queuedDKGPartialSignatures chan *coreTypes.DKGPartialSignature
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- id: p.ID().String(),
- knownTxs: mapset.NewSet(),
- knownMetas: mapset.NewSet(),
- knownBlocks: mapset.NewSet(),
- knownVotes: mapset.NewSet(),
- queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
- queuedProps: make(chan *propEvent, maxQueuedProps),
- queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedLatticeBlock: make(chan *coreTypes.Block, 16),
- queuedVote: make(chan *coreTypes.Vote, 16),
- queuedAgreement: make(chan *coreTypes.AgreementResult, 16),
- queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16),
- queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16),
- queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16),
+ Peer: p,
+ rw: rw,
+ version: version,
+ id: p.ID().String(),
+ knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ knownLatticeBlocks: mapset.NewSet(),
+ knownVotes: mapset.NewSet(),
+ knownAgreements: mapset.NewSet(),
+ knownRandomnesses: mapset.NewSet(),
+ knownDKGPrivateShares: mapset.NewSet(),
+ knownDKGPartialSignatures: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks),
+ queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes),
+ queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements),
+ queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
+ queuedDKGPrivateShares: make(chan *coreTypes.DKGPrivateShare, maxQueuedDKGPrivateShare),
+ queuedDKGPartialSignatures: make(chan *coreTypes.DKGPartialSignature, maxQueuedDKGParitialSignature),
term: make(chan struct{}),
}
}
@@ -177,32 +201,32 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case block := <-p.queuedLatticeBlock:
+ case block := <-p.queuedLatticeBlocks:
if err := p.SendLatticeBlock(block); err != nil {
return
}
p.Log().Trace("Broadcast lattice block")
- case vote := <-p.queuedVote:
+ case vote := <-p.queuedVotes:
if err := p.SendVote(vote); err != nil {
return
}
p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote))
- case agreement := <-p.queuedAgreement:
+ case agreement := <-p.queuedAgreements:
if err := p.SendAgreement(agreement); err != nil {
return
}
p.Log().Trace("Broadcast agreement")
- case randomness := <-p.queuedRandomness:
+ case randomness := <-p.queuedRandomnesses:
if err := p.SendRandomness(randomness); err != nil {
return
}
p.Log().Trace("Broadcast randomness")
- case privateShare := <-p.queuedDKGPrivateShare:
+ case privateShare := <-p.queuedDKGPrivateShares:
if err := p.SendDKGPrivateShare(privateShare); err != nil {
return
}
p.Log().Trace("Broadcast DKG private share")
- case psig := <-p.queuedDKGPartialSignature:
+ case psig := <-p.queuedDKGPartialSignatures:
if err := p.SendDKGPartialSignature(psig); err != nil {
return
}
@@ -364,72 +388,88 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
func (p *peer) SendLatticeBlock(block *coreTypes.Block) error {
+ r := toRLPLatticeBlock(block)
+ p.knownLatticeBlocks.Add(rlpHash(r))
return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block))
}
func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) {
select {
- case p.queuedLatticeBlock <- block:
+ case p.queuedLatticeBlocks <- block:
+ r := toRLPLatticeBlock(block)
+ p.knownLatticeBlocks.Add(rlpHash(r))
default:
p.Log().Debug("Dropping lattice block propagation")
}
}
func (p *peer) SendVote(vote *coreTypes.Vote) error {
+ p.knownVotes.Add(rlpHash(vote))
return p2p.Send(p.rw, VoteMsg, vote)
}
func (p *peer) AsyncSendVote(vote *coreTypes.Vote) {
select {
- case p.queuedVote <- vote:
+ case p.queuedVotes <- vote:
+ p.knownVotes.Add(rlpHash(vote))
default:
p.Log().Debug("Dropping vote propagation")
}
}
func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
+ p.knownAgreements.Add(rlpHash(agreement))
return p2p.Send(p.rw, AgreementMsg, agreement)
}
func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
select {
- case p.queuedAgreement <- agreement:
+ case p.queuedAgreements <- agreement:
+ p.knownAgreements.Add(rlpHash(agreement))
default:
p.Log().Debug("Dropping agreement result")
}
}
func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error {
+ p.knownRandomnesses.Add(rlpHash(randomness))
return p2p.Send(p.rw, RandomnessMsg, randomness)
}
func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) {
select {
- case p.queuedRandomness <- randomness:
+ case p.queuedRandomnesses <- randomness:
+ p.knownRandomnesses.Add(rlpHash(randomness))
default:
p.Log().Debug("Dropping randomness result")
}
}
func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error {
+ r := toRLPDKGPrivateShare(privateShare)
+ p.knownDKGPrivateShares.Add(rlpHash(r))
return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare))
}
func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) {
select {
- case p.queuedDKGPrivateShare <- privateShare:
+ case p.queuedDKGPrivateShares <- privateShare:
+ r := toRLPDKGPrivateShare(privateShare)
+ p.knownDKGPrivateShares.Add(rlpHash(r))
default:
p.Log().Debug("Dropping DKG private share")
}
}
func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error {
+ p.knownDKGPartialSignatures.Add(rlpHash(psig))
return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
}
func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) {
select {
- case p.queuedDKGPartialSignature <- psig:
+ case p.queuedDKGPartialSignatures <- psig:
+ p.knownDKGPartialSignatures.Add(rlpHash(psig))
default:
p.Log().Debug("Dropping DKG partial signature")
}
@@ -727,27 +767,50 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
return list
}
-// TODO(sonic): finish the following dummy function.
+func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownLatticeBlocks.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
- return ps.allPeers()
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownAgreements.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
}
func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
- return ps.allPeers()
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownRandomnesses.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
}
func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer {
- return ps.allPeers()
-}
-
-func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
- return ps.allPeers()
-}
-
-func (ps *peerSet) allPeers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- list = append(list, p)
+ if !p.knownDKGPartialSignatures.Contains(hash) {
+ list = append(list, p)
+ }
}
return list
}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 4bb3dc9e8..8bc24e8de 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -33,6 +33,7 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/eth/downloader"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -558,14 +559,17 @@ func TestSendVote(t *testing.T) {
wg.Wait()
}
-type mockPublicKey []byte
+type mockPublicKey struct {
+ id enode.ID
+}
-func (p mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool {
+func (p *mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool {
return true
}
-func (p mockPublicKey) Bytes() []byte {
- return append([]byte{1}, p...)
+func (p *mockPublicKey) Bytes() []byte {
+ b, _ := p.id.Pubkey()
+ return crypto.CompressPubkey(b)
}
func TestRecvDKGPrivateShare(t *testing.T) {
@@ -625,7 +629,7 @@ func TestSendDKGPrivateShare(t *testing.T) {
},
}
- go pm.SendDKGPrivateShare(mockPublicKey(p1.ID().Bytes()), &privateShare)
+ go pm.SendDKGPrivateShare(&mockPublicKey{p1.ID()}, &privateShare)
msg, err := p1.app.ReadMsg()
if err != nil {
t.Errorf("%v: read error: %v", p1.Peer, err)
@@ -678,7 +682,6 @@ func TestRecvAgreement(t *testing.T) {
agreement := coreTypes.AgreementResult{
BlockHash: coreCommon.Hash{9, 9, 9},
- Round: 13,
Position: vote.Position,
Votes: []coreTypes.Vote{vote},
}
@@ -722,7 +725,6 @@ func TestSendAgreement(t *testing.T) {
agreement := coreTypes.AgreementResult{
BlockHash: coreCommon.Hash{9, 9, 9},
- Round: 13,
Position: vote.Position,
Votes: []coreTypes.Vote{vote},
}
@@ -754,8 +756,12 @@ func TestRecvRandomness(t *testing.T) {
// TODO(sonic): polish this
randomness := coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.Hash{8, 8, 8},
- Round: 17,
+ BlockHash: coreCommon.Hash{8, 8, 8},
+ Position: coreTypes.Position{
+ ChainID: 1,
+ Round: 10,
+ Height: 13,
+ },
Randomness: []byte{7, 7, 7, 7},
}
@@ -783,8 +789,12 @@ func TestSendRandomness(t *testing.T) {
// TODO(sonic): polish this
randomness := coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.Hash{8, 8, 8},
- Round: 17,
+ BlockHash: coreCommon.Hash{8, 8, 8},
+ Position: coreTypes.Position{
+ ChainID: 1,
+ Round: 10,
+ Height: 13,
+ },
Randomness: []byte{7, 7, 7, 7},
}