diff options
author | Sonic <sonic@dexon.org> | 2019-02-14 14:00:34 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:27:22 +0800 |
commit | ee1950f58d3e22ff16385290656b0347c473db46 (patch) | |
tree | 43cfc4536577d4084710da42f3ff4527ff828224 /dex/peer.go | |
parent | 5ea856585a994390c0f22f11868086a985c1f3f7 (diff) | |
download | go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.gz go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.bz2 go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.lz go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.xz go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.zst go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.zip |
dex: some minor improvements (#195)
* dex: improve some msg propagation
* dex: support send a batch of lattice blocks, votes, randomnesses
To reduce msgs number of PullBlocks, PullVotes, PullRandomness
* dex: minor improvement
Diffstat (limited to 'dex/peer.go')
-rw-r--r-- | dex/peer.go | 139 |
1 files changed, 60 insertions, 79 deletions
diff --git a/dex/peer.go b/dex/peer.go index 2c531ee07..a157709f0 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -66,14 +66,9 @@ const ( maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - /* - maxKnownLatticeBLocks = 2048 - maxKnownVotes = 2048 - maxKnownAgreements = 10240 - maxKnownRandomnesses = 10240 - maxKnownDKGPrivateShare = 1024 // this related to DKG Size - maxKnownDKGPartialSignature = 1024 // this related to DKG Size - */ + maxKnownAgreements = 10240 + maxKnownRandomnesses = 10240 + maxKnownDKGPrivateShares = 1024 // this related to DKG Size // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might @@ -155,20 +150,17 @@ type peer struct { knownTxs mapset.Set // Set of transaction hashes known to be known by this peer knownRecords mapset.Set // Set of node record known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownLatticeBlocks mapset.Set - knownVotes mapset.Set knownAgreements mapset.Set knownRandomnesses mapset.Set knownDKGPrivateShares mapset.Set - knownDKGPartialSignatures mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer queuedProps chan *types.Block // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedLatticeBlocks chan *coreTypes.Block - queuedVotes chan *coreTypes.Vote + queuedLatticeBlocks chan []*coreTypes.Block + queuedVotes chan []*coreTypes.Vote queuedAgreements chan *coreTypes.AgreementResult - queuedRandomnesses chan *coreTypes.BlockRandomnessResult + queuedRandomnesses chan []*coreTypes.BlockRandomnessResult queuedDKGPrivateShares chan *dkgTypes.PrivateShare queuedDKGPartialSignatures chan *dkgTypes.PartialSignature queuedPullBlocks chan coreCommon.Hashes @@ -186,20 +178,17 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { knownTxs: mapset.NewSet(), knownRecords: mapset.NewSet(), knownBlocks: mapset.NewSet(), - knownLatticeBlocks: mapset.NewSet(), - knownVotes: mapset.NewSet(), knownAgreements: mapset.NewSet(), knownRandomnesses: mapset.NewSet(), knownDKGPrivateShares: mapset.NewSet(), - knownDKGPartialSignatures: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedRecords: make(chan []*enr.Record, maxQueuedRecords), queuedProps: make(chan *types.Block, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks), - queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes), + queuedLatticeBlocks: make(chan []*coreTypes.Block, maxQueuedLatticeBlocks), + queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes), queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements), - queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), + queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare), queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), @@ -232,26 +221,26 @@ func (p *peer) broadcast() { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case block := <-p.queuedLatticeBlocks: - if err := p.SendLatticeBlock(block); err != nil { + case blocks := <-p.queuedLatticeBlocks: + if err := p.SendLatticeBlocks(blocks); err != nil { return } - p.Log().Trace("Broadcast lattice block") - case vote := <-p.queuedVotes: - if err := p.SendVote(vote); err != nil { + p.Log().Trace("Broadcast lattice blocks", "count", len(blocks)) + case votes := <-p.queuedVotes: + if err := p.SendVotes(votes); err != nil { return } - p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote)) + p.Log().Trace("Broadcast votes", "count", len(votes)) case agreement := <-p.queuedAgreements: if err := p.SendAgreement(agreement); err != nil { return } p.Log().Trace("Broadcast agreement") - case randomness := <-p.queuedRandomnesses: - if err := p.SendRandomness(randomness); err != nil { + case randomnesses := <-p.queuedRandomnesses: + if err := p.SendRandomnesses(randomnesses); err != nil { return } - p.Log().Trace("Broadcast randomness") + p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses)) case privateShare := <-p.queuedDKGPrivateShares: if err := p.SendDKGPrivateShare(privateShare); err != nil { return @@ -276,7 +265,7 @@ func (p *peer) broadcast() { if err := p.SendPullRandomness(hashes); err != nil { return } - p.Log().Trace("Pulling Randomness", "hashes", hashes) + p.Log().Trace("Pulling Randomnesses", "hashes", hashes) case <-p.term: return case <-time.After(100 * time.Millisecond): @@ -354,6 +343,27 @@ func (p *peer) MarkNodeRecord(hash common.Hash) { p.knownRecords.Add(hash) } +func (p *peer) MarkAgreement(hash common.Hash) { + for p.knownAgreements.Cardinality() >= maxKnownAgreements { + p.knownAgreements.Pop() + } + p.knownAgreements.Add(hash) +} + +func (p *peer) MarkRandomness(hash common.Hash) { + for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses { + p.knownRandomnesses.Pop() + } + p.knownRandomnesses.Add(hash) +} + +func (p *peer) MarkDKGPrivateShares(hash common.Hash) { + for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares { + p.knownDKGPrivateShares.Pop() + } + p.knownDKGPrivateShares.Add(hash) +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { @@ -442,29 +452,25 @@ func (p *peer) AsyncSendNewBlock(block *types.Block) { } } -func (p *peer) SendLatticeBlock(block *coreTypes.Block) error { - p.knownLatticeBlocks.Add(rlpHash(block)) - return p2p.Send(p.rw, LatticeBlockMsg, block) +func (p *peer) SendLatticeBlocks(blocks []*coreTypes.Block) error { + return p2p.Send(p.rw, LatticeBlockMsg, blocks) } -func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) { +func (p *peer) AsyncSendLatticeBlocks(blocks []*coreTypes.Block) { select { - case p.queuedLatticeBlocks <- block: - p.knownLatticeBlocks.Add(rlpHash(block)) + case p.queuedLatticeBlocks <- blocks: default: p.Log().Debug("Dropping lattice block propagation") } } -func (p *peer) SendVote(vote *coreTypes.Vote) error { - p.knownVotes.Add(rlpHash(vote)) - return p2p.Send(p.rw, VoteMsg, vote) +func (p *peer) SendVotes(votes []*coreTypes.Vote) error { + return p2p.Send(p.rw, VoteMsg, votes) } -func (p *peer) AsyncSendVote(vote *coreTypes.Vote) { +func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) { select { - case p.queuedVotes <- vote: - p.knownVotes.Add(rlpHash(vote)) + case p.queuedVotes <- votes: default: p.Log().Debug("Dropping vote propagation") } @@ -484,15 +490,19 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { } } -func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error { - p.knownRandomnesses.Add(rlpHash(randomness)) - return p2p.Send(p.rw, RandomnessMsg, randomness) +func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error { + for _, randomness := range randomnesses { + p.knownRandomnesses.Add(rlpHash(randomness)) + } + return p2p.Send(p.rw, RandomnessMsg, randomnesses) } -func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) { +func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) { select { - case p.queuedRandomnesses <- randomness: - p.knownRandomnesses.Add(rlpHash(randomness)) + case p.queuedRandomnesses <- randomnesses: + for _, randomness := range randomnesses { + p.knownRandomnesses.Add(rlpHash(randomness)) + } default: p.Log().Debug("Dropping randomness result") } @@ -513,14 +523,12 @@ func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) { } func (p *peer) SendDKGPartialSignature(psig *dkgTypes.PartialSignature) error { - p.knownDKGPartialSignatures.Add(rlpHash(psig)) return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) } func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) { select { case p.queuedDKGPartialSignatures <- psig: - p.knownDKGPartialSignatures.Add(rlpHash(psig)) default: p.Log().Debug("Dropping DKG partial signature") } @@ -854,21 +862,6 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer { return list } -func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*peer, 0, len(ps.label2Nodes[label])) - for id := range ps.label2Nodes[label] { - if p, ok := ps.peers[id]; ok { - if !p.knownVotes.Contains(hash) { - list = append(list, p) - } - } - } - return list -} - // PeersWithoutNodeRecord retrieves a list of peers that do not have a // given record in their set of known hashes. func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { @@ -883,18 +876,6 @@ func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { return list } -func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.knownLatticeBlocks.Contains(hash) { - list = append(list, p) - } - } - return list -} - func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -919,12 +900,12 @@ func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { return list } -func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutDKGPrivateShares(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownDKGPartialSignatures.Contains(hash) { + if !p.knownDKGPrivateShares.Contains(hash) { list = append(list, p) } } |