aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-27 20:47:32 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:59 +0800
commit0c63646ca8b06bb527737cd6e2a7fe58f169efff (patch)
treeb0666613c2a3cb84d53b60597bfef5ec45548c3a /dex/peer.go
parent91981becf98b988470810aa1c26d86de2d294e29 (diff)
downloaddexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.gz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.bz2
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.lz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.xz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.zst
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.zip
core: merge notarySet and DKGSet (#265)
* vendor: sync to latest core * core: merge notarySet and dkgSet * dex: optimize network traffic for finalized block
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go101
1 files changed, 17 insertions, 84 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 0fa1ac61d..0d23e630f 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -67,7 +67,6 @@ const (
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
maxKnownAgreements = 10240
- maxKnownRandomnesses = 10240
maxKnownDKGPrivateShares = 1024 // this related to DKG Size
// maxQueuedTxs is the maximum number of transaction lists to queue up before
@@ -90,7 +89,6 @@ const (
maxQueuedCoreBlocks = 16
maxQueuedVotes = 128
maxQueuedAgreements = 16
- maxQueuedRandomnesses = 16
maxQueuedDKGPrivateShare = 16
maxQueuedDKGParitialSignature = 16
maxQueuedPullBlocks = 128
@@ -114,8 +112,7 @@ type PeerInfo struct {
type setType uint32
const (
- dkgset = iota
- notaryset
+ notaryset = iota
)
type peerLabel struct {
@@ -126,8 +123,6 @@ type peerLabel struct {
func (p peerLabel) String() string {
var t string
switch p.set {
- case dkgset:
- t = fmt.Sprintf("DKGSet round: %d", p.round)
case notaryset:
t = fmt.Sprintf("NotarySet round: %d", p.round)
}
@@ -150,7 +145,6 @@ type peer struct {
knownRecords mapset.Set // Set of node record known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownAgreements mapset.Set
- knownRandomnesses mapset.Set
knownDKGPrivateShares mapset.Set
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
@@ -159,7 +153,6 @@ type peer struct {
queuedCoreBlocks chan []*coreTypes.Block
queuedVotes chan []*coreTypes.Vote
queuedAgreements chan *coreTypes.AgreementResult
- queuedRandomnesses chan []*coreTypes.BlockRandomnessResult
queuedDKGPrivateShares chan *dkgTypes.PrivateShare
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
queuedPullBlocks chan coreCommon.Hashes
@@ -178,7 +171,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
knownRecords: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownAgreements: mapset.NewSet(),
- knownRandomnesses: mapset.NewSet(),
knownDKGPrivateShares: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedRecords: make(chan []*enr.Record, maxQueuedRecords),
@@ -187,7 +179,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks),
queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes),
queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements),
- queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare),
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
@@ -252,11 +243,6 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Broadcast agreement")
- case randomnesses := <-p.queuedRandomnesses:
- if err := p.SendRandomnesses(randomnesses); err != nil {
- return
- }
- p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses))
case privateShare := <-p.queuedDKGPrivateShares:
if err := p.SendDKGPrivateShare(privateShare); err != nil {
return
@@ -277,11 +263,6 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Pulling Votes", "position", pos)
- case hashes := <-p.queuedPullRandomness:
- if err := p.SendPullRandomness(hashes); err != nil {
- return
- }
- p.Log().Trace("Pulling Randomnesses", "hashes", hashes)
case <-p.term:
return
case <-time.After(100 * time.Millisecond):
@@ -366,13 +347,6 @@ func (p *peer) MarkAgreement(hash common.Hash) {
p.knownAgreements.Add(hash)
}
-func (p *peer) MarkRandomness(hash common.Hash) {
- for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses {
- p.knownRandomnesses.Pop()
- }
- p.knownRandomnesses.Add(hash)
-}
-
func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares {
p.knownDKGPrivateShares.Pop()
@@ -513,24 +487,6 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
}
}
-func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error {
- for _, randomness := range randomnesses {
- p.knownRandomnesses.Add(rlpHash(randomness))
- }
- return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg)
-}
-
-func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) {
- select {
- case p.queuedRandomnesses <- randomnesses:
- for _, randomness := range randomnesses {
- p.knownRandomnesses.Add(rlpHash(randomness))
- }
- default:
- p.Log().Debug("Dropping randomness result")
- }
-}
-
func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error {
p.knownDKGPrivateShares.Add(rlpHash(privateShare))
return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg)
@@ -581,18 +537,6 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
}
-func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
- return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg)
-}
-
-func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
- select {
- case p.queuedPullRandomness <- hashes:
- default:
- p.Log().Debug("Dropping Pull Randomness")
- }
-}
-
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error {
return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg)
@@ -871,38 +815,43 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
return list
}
-// PeersWithoutNodeRecord retrieves a list of peers that do not have a
-// given record in their set of known hashes.
-func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownRecords.Contains(hash) {
+ length := len(ps.peers) - len(ps.label2Nodes[label])
+ if length <= 0 {
+ return []*peer{}
+ }
+ list := make([]*peer, 0, len(ps.peers)-len(ps.label2Nodes[label]))
+ peersWithLabel := ps.label2Nodes[label]
+ for id, p := range ps.peers {
+ if _, exist := peersWithLabel[id]; !exist {
list = append(list, p)
}
}
return list
}
-func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+// PeersWithoutNodeRecord retrieves a list of peers that do not have a
+// given record in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownAgreements.Contains(hash) {
+ if !p.knownRecords.Contains(hash) {
list = append(list, p)
}
}
return list
}
-func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownRandomnesses.Contains(hash) {
+ if !p.knownAgreements.Contains(hash) {
list = append(list, p)
}
}
@@ -956,23 +905,6 @@ func (ps *peerSet) BuildConnection(round uint64) {
log.Info("Build connection", "round", round)
- dkgLabel := peerLabel{set: dkgset, round: round}
- if _, ok := ps.label2Nodes[dkgLabel]; !ok {
- dkgPKs, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get DKG set fail", "round", round, "err", err)
- }
-
- nodes := ps.pksToNodes(dkgPKs)
- ps.label2Nodes[dkgLabel] = nodes
-
- if _, exists := nodes[ps.srvr.Self().ID().String()]; exists {
- ps.buildDirectConn(dkgLabel)
- } else {
- ps.buildGroupConn(dkgLabel)
- }
- }
-
notaryLabel := peerLabel{set: notaryset, round: round}
if _, ok := ps.label2Nodes[notaryLabel]; !ok {
notaryPKs, err := ps.gov.NotarySet(round)
@@ -990,6 +922,7 @@ func (ps *peerSet) BuildConnection(round uint64) {
ps.buildGroupConn(notaryLabel)
}
}
+
}
func (ps *peerSet) ForgetConnection(round uint64) {