From 977d2e13c1616fa21705960c6765ccd33141141d Mon Sep 17 00:00:00 2001 From: Sonic Date: Wed, 17 Oct 2018 18:02:52 +0800 Subject: dex: polish network related function --- dex/peer.go | 177 ++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 57 deletions(-) (limited to 'dex/peer.go') diff --git a/dex/peer.go b/dex/peer.go index 1279a190b..db68ea590 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -46,6 +46,13 @@ const ( maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownLatticeBLocks = 2048 + maxKnownVotes = 2048 + maxKnownAgreements = 10240 + maxKnownRandomnesses = 10240 + maxKnownDKGPrivateShare = 1024 // this related to DKG Size + maxKnownDKGPartialSignature = 1024 // this related to DKG Size + // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. @@ -63,6 +70,13 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 + maxQueuedLatticeBlocks = 16 + maxQueuedVotes = 128 + maxQueuedAgreements = 16 + maxQueuedRandomnesses = 16 + maxQueuedDKGPrivateShare = 16 + maxQueuedDKGParitialSignature = 16 + handshakeTimeout = 5 * time.Second groupNodeNum = 3 @@ -107,43 +121,53 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownMetas mapset.Set // Set of node metas known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownVotes mapset.Set - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedLatticeBlock chan *coreTypes.Block - queuedVote chan *coreTypes.Vote - queuedAgreement chan *coreTypes.AgreementResult - queuedRandomness chan *coreTypes.BlockRandomnessResult - queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare - queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature - term chan struct{} // Termination channel to stop the broadcaster + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownLatticeBlocks mapset.Set + knownVotes mapset.Set + knownAgreements mapset.Set + knownRandomnesses mapset.Set + knownDKGPrivateShares mapset.Set + knownDKGPartialSignatures mapset.Set + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedLatticeBlocks chan *coreTypes.Block + queuedVotes chan *coreTypes.Vote + queuedAgreements chan *coreTypes.AgreementResult + queuedRandomnesses chan *coreTypes.BlockRandomnessResult + queuedDKGPrivateShares chan *coreTypes.DKGPrivateShare + queuedDKGPartialSignatures chan *coreTypes.DKGPartialSignature + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - id: p.ID().String(), - knownTxs: mapset.NewSet(), - knownMetas: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - knownVotes: mapset.NewSet(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedLatticeBlock: make(chan *coreTypes.Block, 16), - queuedVote: make(chan *coreTypes.Vote, 16), - queuedAgreement: make(chan *coreTypes.AgreementResult, 16), - queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16), - queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16), - queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16), + Peer: p, + rw: rw, + version: version, + id: p.ID().String(), + knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + knownLatticeBlocks: mapset.NewSet(), + knownVotes: mapset.NewSet(), + knownAgreements: mapset.NewSet(), + knownRandomnesses: mapset.NewSet(), + knownDKGPrivateShares: mapset.NewSet(), + knownDKGPartialSignatures: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks), + queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes), + queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements), + queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), + queuedDKGPrivateShares: make(chan *coreTypes.DKGPrivateShare, maxQueuedDKGPrivateShare), + queuedDKGPartialSignatures: make(chan *coreTypes.DKGPartialSignature, maxQueuedDKGParitialSignature), term: make(chan struct{}), } } @@ -177,32 +201,32 @@ func (p *peer) broadcast() { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case block := <-p.queuedLatticeBlock: + case block := <-p.queuedLatticeBlocks: if err := p.SendLatticeBlock(block); err != nil { return } p.Log().Trace("Broadcast lattice block") - case vote := <-p.queuedVote: + case vote := <-p.queuedVotes: if err := p.SendVote(vote); err != nil { return } p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote)) - case agreement := <-p.queuedAgreement: + case agreement := <-p.queuedAgreements: if err := p.SendAgreement(agreement); err != nil { return } p.Log().Trace("Broadcast agreement") - case randomness := <-p.queuedRandomness: + case randomness := <-p.queuedRandomnesses: if err := p.SendRandomness(randomness); err != nil { return } p.Log().Trace("Broadcast randomness") - case privateShare := <-p.queuedDKGPrivateShare: + case privateShare := <-p.queuedDKGPrivateShares: if err := p.SendDKGPrivateShare(privateShare); err != nil { return } p.Log().Trace("Broadcast DKG private share") - case psig := <-p.queuedDKGPartialSignature: + case psig := <-p.queuedDKGPartialSignatures: if err := p.SendDKGPartialSignature(psig); err != nil { return } @@ -364,72 +388,88 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } func (p *peer) SendLatticeBlock(block *coreTypes.Block) error { + r := toRLPLatticeBlock(block) + p.knownLatticeBlocks.Add(rlpHash(r)) return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block)) } func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) { select { - case p.queuedLatticeBlock <- block: + case p.queuedLatticeBlocks <- block: + r := toRLPLatticeBlock(block) + p.knownLatticeBlocks.Add(rlpHash(r)) default: p.Log().Debug("Dropping lattice block propagation") } } func (p *peer) SendVote(vote *coreTypes.Vote) error { + p.knownVotes.Add(rlpHash(vote)) return p2p.Send(p.rw, VoteMsg, vote) } func (p *peer) AsyncSendVote(vote *coreTypes.Vote) { select { - case p.queuedVote <- vote: + case p.queuedVotes <- vote: + p.knownVotes.Add(rlpHash(vote)) default: p.Log().Debug("Dropping vote propagation") } } func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error { + p.knownAgreements.Add(rlpHash(agreement)) return p2p.Send(p.rw, AgreementMsg, agreement) } func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { select { - case p.queuedAgreement <- agreement: + case p.queuedAgreements <- agreement: + p.knownAgreements.Add(rlpHash(agreement)) default: p.Log().Debug("Dropping agreement result") } } func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error { + p.knownRandomnesses.Add(rlpHash(randomness)) return p2p.Send(p.rw, RandomnessMsg, randomness) } func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) { select { - case p.queuedRandomness <- randomness: + case p.queuedRandomnesses <- randomness: + p.knownRandomnesses.Add(rlpHash(randomness)) default: p.Log().Debug("Dropping randomness result") } } func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error { + r := toRLPDKGPrivateShare(privateShare) + p.knownDKGPrivateShares.Add(rlpHash(r)) return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare)) } func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) { select { - case p.queuedDKGPrivateShare <- privateShare: + case p.queuedDKGPrivateShares <- privateShare: + r := toRLPDKGPrivateShare(privateShare) + p.knownDKGPrivateShares.Add(rlpHash(r)) default: p.Log().Debug("Dropping DKG private share") } } func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error { + p.knownDKGPartialSignatures.Add(rlpHash(psig)) return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) } func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) { select { - case p.queuedDKGPartialSignature <- psig: + case p.queuedDKGPartialSignatures <- psig: + p.knownDKGPartialSignatures.Add(rlpHash(psig)) default: p.Log().Debug("Dropping DKG partial signature") } @@ -727,27 +767,50 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { return list } -// TODO(sonic): finish the following dummy function. +func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownLatticeBlocks.Contains(hash) { + list = append(list, p) + } + } + return list +} + func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { - return ps.allPeers() + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownAgreements.Contains(hash) { + list = append(list, p) + } + } + return list } func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { - return ps.allPeers() + ps.lock.RLock() + defer ps.lock.RUnlock() + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownRandomnesses.Contains(hash) { + list = append(list, p) + } + } + return list } func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer { - return ps.allPeers() -} - -func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { - return ps.allPeers() -} - -func (ps *peerSet) allPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - list = append(list, p) + if !p.knownDKGPartialSignatures.Contains(hash) { + list = append(list, p) + } } return list } -- cgit v1.2.3