aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-02-14 14:00:34 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:27:22 +0800
commitee1950f58d3e22ff16385290656b0347c473db46 (patch)
tree43cfc4536577d4084710da42f3ff4527ff828224 /dex/peer.go
parent5ea856585a994390c0f22f11868086a985c1f3f7 (diff)
downloadgo-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.gz
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.bz2
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.lz
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.xz
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.tar.zst
go-tangerine-ee1950f58d3e22ff16385290656b0347c473db46.zip
dex: some minor improvements (#195)
* dex: improve some msg propagation * dex: support send a batch of lattice blocks, votes, randomnesses To reduce msgs number of PullBlocks, PullVotes, PullRandomness * dex: minor improvement
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go139
1 files changed, 60 insertions, 79 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 2c531ee07..a157709f0 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -66,14 +66,9 @@ const (
maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- /*
- maxKnownLatticeBLocks = 2048
- maxKnownVotes = 2048
- maxKnownAgreements = 10240
- maxKnownRandomnesses = 10240
- maxKnownDKGPrivateShare = 1024 // this related to DKG Size
- maxKnownDKGPartialSignature = 1024 // this related to DKG Size
- */
+ maxKnownAgreements = 10240
+ maxKnownRandomnesses = 10240
+ maxKnownDKGPrivateShares = 1024 // this related to DKG Size
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
@@ -155,20 +150,17 @@ type peer struct {
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownRecords mapset.Set // Set of node record known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownLatticeBlocks mapset.Set
- knownVotes mapset.Set
knownAgreements mapset.Set
knownRandomnesses mapset.Set
knownDKGPrivateShares mapset.Set
- knownDKGPartialSignatures mapset.Set
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedLatticeBlocks chan *coreTypes.Block
- queuedVotes chan *coreTypes.Vote
+ queuedLatticeBlocks chan []*coreTypes.Block
+ queuedVotes chan []*coreTypes.Vote
queuedAgreements chan *coreTypes.AgreementResult
- queuedRandomnesses chan *coreTypes.BlockRandomnessResult
+ queuedRandomnesses chan []*coreTypes.BlockRandomnessResult
queuedDKGPrivateShares chan *dkgTypes.PrivateShare
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
queuedPullBlocks chan coreCommon.Hashes
@@ -186,20 +178,17 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
knownTxs: mapset.NewSet(),
knownRecords: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownLatticeBlocks: mapset.NewSet(),
- knownVotes: mapset.NewSet(),
knownAgreements: mapset.NewSet(),
knownRandomnesses: mapset.NewSet(),
knownDKGPrivateShares: mapset.NewSet(),
- knownDKGPartialSignatures: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedRecords: make(chan []*enr.Record, maxQueuedRecords),
queuedProps: make(chan *types.Block, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks),
- queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes),
+ queuedLatticeBlocks: make(chan []*coreTypes.Block, maxQueuedLatticeBlocks),
+ queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes),
queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements),
- queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
+ queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare),
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
@@ -232,26 +221,26 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case block := <-p.queuedLatticeBlocks:
- if err := p.SendLatticeBlock(block); err != nil {
+ case blocks := <-p.queuedLatticeBlocks:
+ if err := p.SendLatticeBlocks(blocks); err != nil {
return
}
- p.Log().Trace("Broadcast lattice block")
- case vote := <-p.queuedVotes:
- if err := p.SendVote(vote); err != nil {
+ p.Log().Trace("Broadcast lattice blocks", "count", len(blocks))
+ case votes := <-p.queuedVotes:
+ if err := p.SendVotes(votes); err != nil {
return
}
- p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote))
+ p.Log().Trace("Broadcast votes", "count", len(votes))
case agreement := <-p.queuedAgreements:
if err := p.SendAgreement(agreement); err != nil {
return
}
p.Log().Trace("Broadcast agreement")
- case randomness := <-p.queuedRandomnesses:
- if err := p.SendRandomness(randomness); err != nil {
+ case randomnesses := <-p.queuedRandomnesses:
+ if err := p.SendRandomnesses(randomnesses); err != nil {
return
}
- p.Log().Trace("Broadcast randomness")
+ p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses))
case privateShare := <-p.queuedDKGPrivateShares:
if err := p.SendDKGPrivateShare(privateShare); err != nil {
return
@@ -276,7 +265,7 @@ func (p *peer) broadcast() {
if err := p.SendPullRandomness(hashes); err != nil {
return
}
- p.Log().Trace("Pulling Randomness", "hashes", hashes)
+ p.Log().Trace("Pulling Randomnesses", "hashes", hashes)
case <-p.term:
return
case <-time.After(100 * time.Millisecond):
@@ -354,6 +343,27 @@ func (p *peer) MarkNodeRecord(hash common.Hash) {
p.knownRecords.Add(hash)
}
+func (p *peer) MarkAgreement(hash common.Hash) {
+ for p.knownAgreements.Cardinality() >= maxKnownAgreements {
+ p.knownAgreements.Pop()
+ }
+ p.knownAgreements.Add(hash)
+}
+
+func (p *peer) MarkRandomness(hash common.Hash) {
+ for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses {
+ p.knownRandomnesses.Pop()
+ }
+ p.knownRandomnesses.Add(hash)
+}
+
+func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
+ for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares {
+ p.knownDKGPrivateShares.Pop()
+ }
+ p.knownDKGPrivateShares.Add(hash)
+}
+
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
@@ -442,29 +452,25 @@ func (p *peer) AsyncSendNewBlock(block *types.Block) {
}
}
-func (p *peer) SendLatticeBlock(block *coreTypes.Block) error {
- p.knownLatticeBlocks.Add(rlpHash(block))
- return p2p.Send(p.rw, LatticeBlockMsg, block)
+func (p *peer) SendLatticeBlocks(blocks []*coreTypes.Block) error {
+ return p2p.Send(p.rw, LatticeBlockMsg, blocks)
}
-func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) {
+func (p *peer) AsyncSendLatticeBlocks(blocks []*coreTypes.Block) {
select {
- case p.queuedLatticeBlocks <- block:
- p.knownLatticeBlocks.Add(rlpHash(block))
+ case p.queuedLatticeBlocks <- blocks:
default:
p.Log().Debug("Dropping lattice block propagation")
}
}
-func (p *peer) SendVote(vote *coreTypes.Vote) error {
- p.knownVotes.Add(rlpHash(vote))
- return p2p.Send(p.rw, VoteMsg, vote)
+func (p *peer) SendVotes(votes []*coreTypes.Vote) error {
+ return p2p.Send(p.rw, VoteMsg, votes)
}
-func (p *peer) AsyncSendVote(vote *coreTypes.Vote) {
+func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) {
select {
- case p.queuedVotes <- vote:
- p.knownVotes.Add(rlpHash(vote))
+ case p.queuedVotes <- votes:
default:
p.Log().Debug("Dropping vote propagation")
}
@@ -484,15 +490,19 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
}
}
-func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error {
- p.knownRandomnesses.Add(rlpHash(randomness))
- return p2p.Send(p.rw, RandomnessMsg, randomness)
+func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error {
+ for _, randomness := range randomnesses {
+ p.knownRandomnesses.Add(rlpHash(randomness))
+ }
+ return p2p.Send(p.rw, RandomnessMsg, randomnesses)
}
-func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) {
+func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) {
select {
- case p.queuedRandomnesses <- randomness:
- p.knownRandomnesses.Add(rlpHash(randomness))
+ case p.queuedRandomnesses <- randomnesses:
+ for _, randomness := range randomnesses {
+ p.knownRandomnesses.Add(rlpHash(randomness))
+ }
default:
p.Log().Debug("Dropping randomness result")
}
@@ -513,14 +523,12 @@ func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) {
}
func (p *peer) SendDKGPartialSignature(psig *dkgTypes.PartialSignature) error {
- p.knownDKGPartialSignatures.Add(rlpHash(psig))
return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
}
func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
select {
case p.queuedDKGPartialSignatures <- psig:
- p.knownDKGPartialSignatures.Add(rlpHash(psig))
default:
p.Log().Debug("Dropping DKG partial signature")
}
@@ -854,21 +862,6 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
return list
}
-func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- list := make([]*peer, 0, len(ps.label2Nodes[label]))
- for id := range ps.label2Nodes[label] {
- if p, ok := ps.peers[id]; ok {
- if !p.knownVotes.Contains(hash) {
- list = append(list, p)
- }
- }
- }
- return list
-}
-
// PeersWithoutNodeRecord retrieves a list of peers that do not have a
// given record in their set of known hashes.
func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
@@ -883,18 +876,6 @@ func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
return list
}
-func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownLatticeBlocks.Contains(hash) {
- list = append(list, p)
- }
- }
- return list
-}
-
func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -919,12 +900,12 @@ func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
return list
}
-func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutDKGPrivateShares(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownDKGPartialSignatures.Contains(hash) {
+ if !p.knownDKGPrivateShares.Contains(hash) {
list = append(list, p)
}
}