diff options
author | Sonic <sonic@cobinhood.com> | 2018-10-17 18:02:52 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-03-12 12:19:09 +0800 |
commit | 8dd647934c2836bd3a1b6a5ba197bfd4f886b720 (patch) | |
tree | 16230401ea176fcf66a16e9b19902a8762e76163 /dex | |
parent | 71c6a58419e1b68b49b6c7d307acee305474d248 (diff) | |
download | dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.gz dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.bz2 dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.lz dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.xz dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.zst dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.zip |
dex: polish network related function
Diffstat (limited to 'dex')
-rw-r--r-- | dex/handler.go | 88 | ||||
-rw-r--r-- | dex/nodetable_test.go | 17 | ||||
-rw-r--r-- | dex/peer.go | 177 | ||||
-rw-r--r-- | dex/protocol_test.go | 32 |
4 files changed, 211 insertions, 103 deletions
diff --git a/dex/handler.go b/dex/handler.go index cdadd2874..35d2901c9 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -823,6 +823,16 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { } } +// TODO(sonic): block size is big, try not to send to all peers +// to reduce traffic +func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { + hash := rlpHash(toRLPLatticeBlock(block)) + for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) { + peer.AsyncSendLatticeBlock(block) + } +} + +// BroadcastVote broadcasts the given vote to all peers in same notary set func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { label := peerLabel{ set: notaryset, @@ -837,53 +847,71 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { } } -// TODO(sonic): try to reduce traffic -func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { - hash := rlpHash(toRLPLatticeBlock(block)) - for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) { - peer.AsyncSendLatticeBlock(block) - } -} - -// TODO(sonic): try to reduce traffic -func (pm *ProtocolManager) SendDKGPrivateShare( - pub coreCrypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) { - id := discover.MustBytesID(pub.Bytes()[1:]) - if p := pm.peers.Peer(id.String()); p != nil { - p.AsyncSendDKGPrivateShare(privateShare) +func (pm *ProtocolManager) BroadcastAgreementResult( + agreement *coreTypes.AgreementResult) { + // send to dkg nodes first (direct) + label := peerLabel{ + set: dkgset, + round: agreement.Position.Round, } -} - -// TODO(sonic): try to reduce traffic -func (pm *ProtocolManager) BroadcastDKGPrivateShare( - privateShare *coreTypes.DKGPrivateShare) { - for _, peer := range pm.peers.allPeers() { - peer.AsyncSendDKGPrivateShare(privateShare) + for _, peer := range pm.peers.PeersWithLabel(label) { + peer.AsyncSendAgreement(agreement) } -} -// TODO(sonic): try to reduce traffic -func (pm *ProtocolManager) BroadcastAgreementResult( - agreement *coreTypes.AgreementResult) { + // TODO(sonic): send to some of other nodes (gossip) for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { peer.AsyncSendAgreement(agreement) } } -// TODO(sonic): try to reduce traffic func (pm *ProtocolManager) BroadcastRandomnessResult( randomness *coreTypes.BlockRandomnessResult) { - // random pick n peers + // send to notary nodes first (direct) + label := peerLabel{ + set: notaryset, + chainID: randomness.Position.ChainID, + round: randomness.Position.Round, + } + for _, peer := range pm.peers.PeersWithLabel(label) { + peer.AsyncSendRandomness(randomness) + } + + // TODO(sonic): send to some of other nodes (gossip) for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { peer.AsyncSendRandomness(randomness) } } -// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) SendDKGPrivateShare( + pub coreCrypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) { + uncompressedKey, err := crypto.DecompressPubkey(pub.Bytes()) + if err != nil { + log.Error("decompress key fail", "err", err) + } + id := discover.PubkeyID(uncompressedKey) + if p := pm.peers.Peer(id.String()); p != nil { + p.AsyncSendDKGPrivateShare(privateShare) + } +} + +func (pm *ProtocolManager) BroadcastDKGPrivateShare( + privateShare *coreTypes.DKGPrivateShare) { + label := peerLabel{set: dkgset, round: privateShare.Round} + h := rlpHash(toRLPDKGPrivateShare(privateShare)) + for _, peer := range pm.peers.PeersWithLabel(label) { + if !peer.knownDKGPrivateShares.Contains(h) { + peer.AsyncSendDKGPrivateShare(privateShare) + } + } +} + func (pm *ProtocolManager) BroadcastDKGPartialSignature( psig *coreTypes.DKGPartialSignature) { - for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) { - peer.AsyncSendDKGPartialSignature(psig) + label := peerLabel{set: dkgset, round: psig.Round} + for _, peer := range pm.peers.PeersWithLabel(label) { + if !peer.knownDKGPartialSignatures.Contains(rlpHash(psig)) { + peer.AsyncSendDKGPartialSignature(psig) + } } } diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go index 5b3f7de57..2e44eabe5 100644 --- a/dex/nodetable_test.go +++ b/dex/nodetable_test.go @@ -1,11 +1,13 @@ package dex import ( - "math/rand" + "crypto/ecdsa" "testing" "time" "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" ) @@ -85,9 +87,14 @@ func TestNodeTable(t *testing.T) { } } -func randomID() (id enode.ID) { - for i := range id { - id[i] = byte(rand.Intn(255)) +func randomID() enode.ID { + var err error + var privkey *ecdsa.PrivateKey + for { + privkey, err = crypto.GenerateKey() + if err == nil { + break + } } - return id + return discover.PubkeyID(&(privkey.PublicKey)) } diff --git a/dex/peer.go b/dex/peer.go index 1279a190b..db68ea590 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -46,6 +46,13 @@ const ( maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownLatticeBLocks = 2048 + maxKnownVotes = 2048 + maxKnownAgreements = 10240 + maxKnownRandomnesses = 10240 + maxKnownDKGPrivateShare = 1024 // this related to DKG Size + maxKnownDKGPartialSignature = 1024 // this related to DKG Size + // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. @@ -63,6 +70,13 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 + maxQueuedLatticeBlocks = 16 + maxQueuedVotes = 128 + maxQueuedAgreements = 16 + maxQueuedRandomnesses = 16 + maxQueuedDKGPrivateShare = 16 + maxQueuedDKGParitialSignature = 16 + handshakeTimeout = 5 * time.Second groupNodeNum = 3 @@ -107,43 +121,53 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownMetas mapset.Set // Set of node metas known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownVotes mapset.Set - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedLatticeBlock chan *coreTypes.Block - queuedVote chan *coreTypes.Vote - queuedAgreement chan *coreTypes.AgreementResult - queuedRandomness chan *coreTypes.BlockRandomnessResult - queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare - queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature - term chan struct{} // Termination channel to stop the broadcaster + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownLatticeBlocks mapset.Set + knownVotes mapset.Set + knownAgreements mapset.Set + knownRandomnesses mapset.Set + knownDKGPrivateShares mapset.Set + knownDKGPartialSignatures mapset.Set + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedLatticeBlocks chan *coreTypes.Block + queuedVotes chan *coreTypes.Vote + queuedAgreements chan *coreTypes.AgreementResult + queuedRandomnesses chan *coreTypes.BlockRandomnessResult + queuedDKGPrivateShares chan *coreTypes.DKGPrivateShare + queuedDKGPartialSignatures chan *coreTypes.DKGPartialSignature + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - id: p.ID().String(), - knownTxs: mapset.NewSet(), - knownMetas: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - knownVotes: mapset.NewSet(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedLatticeBlock: make(chan *coreTypes.Block, 16), - queuedVote: make(chan *coreTypes.Vote, 16), - queuedAgreement: make(chan *coreTypes.AgreementResult, 16), - queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16), - queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16), - queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16), + Peer: p, + rw: rw, + version: version, + id: p.ID().String(), + knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + knownLatticeBlocks: mapset.NewSet(), + knownVotes: mapset.NewSet(), + knownAgreements: mapset.NewSet(), + knownRandomnesses: mapset.NewSet(), + knownDKGPrivateShares: mapset.NewSet(), + knownDKGPartialSignatures: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks), + queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes), + queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements), + queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), + queuedDKGPrivateShares: make(chan *coreTypes.DKGPrivateShare, maxQueuedDKGPrivateShare), + queuedDKGPartialSignatures: make(chan *coreTypes.DKGPartialSignature, maxQueuedDKGParitialSignature), term: make(chan struct{}), } } @@ -177,32 +201,32 @@ func (p *peer) broadcast() { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case block := <-p.queuedLatticeBlock: + case block := <-p.queuedLatticeBlocks: if err := p.SendLatticeBlock(block); err != nil { return } p.Log().Trace("Broadcast lattice block") - case vote := <-p.queuedVote: + case vote := <-p.queuedVotes: if err := p.SendVote(vote); err != nil { return } p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote)) - case agreement := <-p.queuedAgreement: + case agreement := <-p.queuedAgreements: if err := p.SendAgreement(agreement); err != nil { return } p.Log().Trace("Broadcast agreement") - case randomness := <-p.queuedRandomness: + case randomness := <-p.queuedRandomnesses: if err := p.SendRandomness(randomness); err != nil { return } p.Log().Trace("Broadcast randomness") - case privateShare := <-p.queuedDKGPrivateShare: + case privateShare := <-p.queuedDKGPrivateShares: if err := p.SendDKGPrivateShare(privateShare); err != nil { return } p.Log().Trace("Broadcast DKG private share") - case psig := <-p.queuedDKGPartialSignature: + case psig := <-p.queuedDKGPartialSignatures: if err := p.SendDKGPartialSignature(psig); err != nil { return } @@ -364,72 +388,88 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } func (p *peer) SendLatticeBlock(block *coreTypes.Block) error { + r := toRLPLatticeBlock(block) + p.knownLatticeBlocks.Add(rlpHash(r)) return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block)) } func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) { select { - case p.queuedLatticeBlock <- block: + case p.queuedLatticeBlocks <- block: + r := toRLPLatticeBlock(block) + p.knownLatticeBlocks.Add(rlpHash(r)) default: p.Log().Debug("Dropping lattice block propagation") } } func (p *peer) SendVote(vote *coreTypes.Vote) error { + p.knownVotes.Add(rlpHash(vote)) return p2p.Send(p.rw, VoteMsg, vote) } func (p *peer) AsyncSendVote(vote *coreTypes.Vote) { select { - case p.queuedVote <- vote: + case p.queuedVotes <- vote: + p.knownVotes.Add(rlpHash(vote)) default: p.Log().Debug("Dropping vote propagation") } } func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error { + p.knownAgreements.Add(rlpHash(agreement)) return p2p.Send(p.rw, AgreementMsg, agreement) } func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { select { - case p.queuedAgreement <- agreement: + case p.queuedAgreements <- agreement: + p.knownAgreements.Add(rlpHash(agreement)) default: p.Log().Debug("Dropping agreement result") } } func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error { + p.knownRandomnesses.Add(rlpHash(randomness)) return p2p.Send(p.rw, RandomnessMsg, randomness) } func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) { select { - case p.queuedRandomness <- randomness: + case p.queuedRandomnesses <- randomness: + p.knownRandomnesses.Add(rlpHash(randomness)) default: p.Log().Debug("Dropping randomness result") } } func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error { + r := toRLPDKGPrivateShare(privateShare) + p.knownDKGPrivateShares.Add(rlpHash(r)) return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare)) } func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) { select { - case p.queuedDKGPrivateShare <- privateShare: + case p.queuedDKGPrivateShares <- privateShare: + r := toRLPDKGPrivateShare(privateShare) + p.knownDKGPrivateShares.Add(rlpHash(r)) default: p.Log().Debug("Dropping DKG private share") } } func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error { + p.knownDKGPartialSignatures.Add(rlpHash(psig)) return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) } func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) { select { - case p.queuedDKGPartialSignature <- psig: + case p.queuedDKGPartialSignatures <- psig: + p.knownDKGPartialSignatures.Add(rlpHash(psig)) default: p.Log().Debug("Dropping DKG partial signature") } @@ -727,27 +767,50 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { return list } -// TODO(sonic): finish the following dummy function. +func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownLatticeBlocks.Contains(hash) { + list = append(list, p) + } + } + return list +} + func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { - return ps.allPeers() + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownAgreements.Contains(hash) { + list = append(list, p) + } + } + return list } func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { - return ps.allPeers() + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownRandomnesses.Contains(hash) { + list = append(list, p) + } + } + return list } func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer { - return ps.allPeers() -} - -func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { - return ps.allPeers() -} - -func (ps *peerSet) allPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - list = append(list, p) + if !p.knownDKGPartialSignatures.Contains(hash) { + list = append(list, p) + } } return list } diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 4bb3dc9e8..8bc24e8de 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -33,6 +33,7 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/eth/downloader" "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/rlp" ) @@ -558,14 +559,17 @@ func TestSendVote(t *testing.T) { wg.Wait() } -type mockPublicKey []byte +type mockPublicKey struct { + id enode.ID +} -func (p mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool { +func (p *mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool { return true } -func (p mockPublicKey) Bytes() []byte { - return append([]byte{1}, p...) +func (p *mockPublicKey) Bytes() []byte { + b, _ := p.id.Pubkey() + return crypto.CompressPubkey(b) } func TestRecvDKGPrivateShare(t *testing.T) { @@ -625,7 +629,7 @@ func TestSendDKGPrivateShare(t *testing.T) { }, } - go pm.SendDKGPrivateShare(mockPublicKey(p1.ID().Bytes()), &privateShare) + go pm.SendDKGPrivateShare(&mockPublicKey{p1.ID()}, &privateShare) msg, err := p1.app.ReadMsg() if err != nil { t.Errorf("%v: read error: %v", p1.Peer, err) @@ -678,7 +682,6 @@ func TestRecvAgreement(t *testing.T) { agreement := coreTypes.AgreementResult{ BlockHash: coreCommon.Hash{9, 9, 9}, - Round: 13, Position: vote.Position, Votes: []coreTypes.Vote{vote}, } @@ -722,7 +725,6 @@ func TestSendAgreement(t *testing.T) { agreement := coreTypes.AgreementResult{ BlockHash: coreCommon.Hash{9, 9, 9}, - Round: 13, Position: vote.Position, Votes: []coreTypes.Vote{vote}, } @@ -754,8 +756,12 @@ func TestRecvRandomness(t *testing.T) { // TODO(sonic): polish this randomness := coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.Hash{8, 8, 8}, - Round: 17, + BlockHash: coreCommon.Hash{8, 8, 8}, + Position: coreTypes.Position{ + ChainID: 1, + Round: 10, + Height: 13, + }, Randomness: []byte{7, 7, 7, 7}, } @@ -783,8 +789,12 @@ func TestSendRandomness(t *testing.T) { // TODO(sonic): polish this randomness := coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.Hash{8, 8, 8}, - Round: 17, + BlockHash: coreCommon.Hash{8, 8, 8}, + Position: coreTypes.Position{ + ChainID: 1, + Round: 10, + Height: 13, + }, Randomness: []byte{7, 7, 7, 7}, } |