aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-04-02 22:04:28 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:59 +0800
commite02415d6224060ca9d7c3cef9005c729ac6b6c05 (patch)
tree9a15cf979ba16ea9990af921ff518dc6882fc0fb /dex/peer.go
parent5d3ac45f9300821560509be0e3be38be55ca992a (diff)
downloaddexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar.gz
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar.bz2
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar.lz
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar.xz
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.tar.zst
dexon-e02415d6224060ca9d7c3cef9005c729ac6b6c05.zip
dex: try to reduce the chance to pull non-finalized blocks (#327)
* Send non-finalized blocks to notary set only * Update randomness field for blocks in cache upon receiving agreement result * Filter AgreementResult by its position * Avoid overwriting finalized blocks with non-finalized ones * Add blocks to finalized cache when pulling * Update to finalized cache when we have corresponding element in non-finalized one.
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go60
1 files changed, 35 insertions, 25 deletions
diff --git a/dex/peer.go b/dex/peer.go
index d0e717233..1ade2820e 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -66,7 +66,6 @@ const (
maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownAgreements = 10240
maxKnownDKGPrivateShares = 1024 // this related to DKG Size
// maxQueuedTxs is the maximum number of transaction lists to queue up before
@@ -141,24 +140,26 @@ type peer struct {
number uint64
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownRecords mapset.Set // Set of node record known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownAgreements mapset.Set
- knownDKGPrivateShares mapset.Set
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
- queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedCoreBlocks chan []*coreTypes.Block
- queuedVotes chan []*coreTypes.Vote
- queuedAgreements chan *coreTypes.AgreementResult
- queuedDKGPrivateShares chan *dkgTypes.PrivateShare
- queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
- queuedPullBlocks chan coreCommon.Hashes
- queuedPullVotes chan coreTypes.Position
- queuedPullRandomness chan coreCommon.Hashes
- term chan struct{} // Termination channel to stop the broadcaster
+ lastKnownAgreementPositionLock sync.RWMutex
+ lastKnownAgreementPosition coreTypes.Position // The position of latest agreement to be known by this peer
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownRecords mapset.Set // Set of node record known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownAgreements mapset.Set
+ knownDKGPrivateShares mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
+ queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedCoreBlocks chan []*coreTypes.Block
+ queuedVotes chan []*coreTypes.Vote
+ queuedAgreements chan *coreTypes.AgreementResult
+ queuedDKGPrivateShares chan *dkgTypes.PrivateShare
+ queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
+ queuedPullBlocks chan coreCommon.Hashes
+ queuedPullVotes chan coreTypes.Position
+ queuedPullRandomness chan coreCommon.Hashes
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@@ -340,11 +341,14 @@ func (p *peer) MarkNodeRecord(hash common.Hash) {
p.knownRecords.Add(hash)
}
-func (p *peer) MarkAgreement(hash common.Hash) {
- for p.knownAgreements.Cardinality() >= maxKnownAgreements {
- p.knownAgreements.Pop()
+func (p *peer) MarkAgreement(position coreTypes.Position) bool {
+ p.lastKnownAgreementPositionLock.Lock()
+ defer p.lastKnownAgreementPositionLock.Unlock()
+ if position.Newer(p.lastKnownAgreementPosition) {
+ p.lastKnownAgreementPosition = position
+ return true
}
- p.knownAgreements.Add(hash)
+ return false
}
func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
@@ -354,6 +358,12 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
p.knownDKGPrivateShares.Add(hash)
}
+func (p *peer) isAgreementKnown(position coreTypes.Position) bool {
+ p.lastKnownAgreementPositionLock.RLock()
+ defer p.lastKnownAgreementPositionLock.RUnlock()
+ return !p.lastKnownAgreementPosition.Older(position)
+}
+
func (p *peer) logSend(err error, code uint64) error {
if err != nil {
p.Log().Error("Failed to send peer message", "code", code, "err", err)
@@ -846,12 +856,12 @@ func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
return list
}
-func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutAgreement(position coreTypes.Position) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownAgreements.Contains(hash) {
+ if !p.isAgreementKnown(position) {
list = append(list, p)
}
}