aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-12 15:02:33 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:23:39 +0800
commit046ea228bf3892a01b8de060390e766483a69249 (patch)
treefd03cfde6234c4be590a1fcd6e6c69afea03a705 /dex/peer.go
parent8250fd5357e8a9c4b634cf289489ce2c4ec3470c (diff)
downloadgo-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.gz
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.bz2
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.lz
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.xz
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.zst
go-tangerine-046ea228bf3892a01b8de060390e766483a69249.zip
dex: network: implement the network interface
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go230
1 files changed, 200 insertions, 30 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 05947b456..e7c4f5d53 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -24,6 +24,7 @@ import (
"time"
mapset "github.com/deckarep/golang-set"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
@@ -104,31 +105,45 @@ type peer struct {
td *big.Int
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownMetas mapset.Set // Set of node metas known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
- queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- term chan struct{} // Termination channel to stop the broadcaster
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownVotes mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedLatticeBlock chan *coreTypes.Block
+ queuedVote chan *coreTypes.Vote
+ queuedAgreement chan *coreTypes.AgreementResult
+ queuedRandomness chan *coreTypes.BlockRandomnessResult
+ queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare
+ queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- labels: mapset.NewSet(),
- id: p.ID().String(),
- knownTxs: mapset.NewSet(),
- knownMetas: mapset.NewSet(),
- knownBlocks: mapset.NewSet(),
- queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
- queuedProps: make(chan *propEvent, maxQueuedProps),
- queuedAnns: make(chan *types.Block, maxQueuedAnns),
- term: make(chan struct{}),
+ Peer: p,
+ rw: rw,
+ version: version,
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
+ knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ knownVotes: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ queuedLatticeBlock: make(chan *coreTypes.Block, 16),
+ queuedVote: make(chan *coreTypes.Vote, 16),
+ queuedAgreement: make(chan *coreTypes.AgreementResult, 16),
+ queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16),
+ queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16),
+ queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16),
+ term: make(chan struct{}),
}
}
@@ -161,7 +176,36 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
-
+ case block := <-p.queuedLatticeBlock:
+ if err := p.SendLatticeBlock(block); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast lattice block")
+ case vote := <-p.queuedVote:
+ if err := p.SendVote(vote); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote))
+ case agreement := <-p.queuedAgreement:
+ if err := p.SendAgreement(agreement); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast agreement")
+ case randomness := <-p.queuedRandomness:
+ if err := p.SendRandomness(randomness); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast randomness")
+ case privateShare := <-p.queuedDKGPrivateShare:
+ if err := p.SendDKGPrivateShare(privateShare); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast DKG private share")
+ case psig := <-p.queuedDKGPartialSignature:
+ if err := p.SendDKGPartialSignature(psig); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast DKG partial signature")
case <-p.term:
return
}
@@ -326,6 +370,78 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
}
+func (p *peer) SendLatticeBlock(block *coreTypes.Block) error {
+ return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block))
+}
+
+func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) {
+ select {
+ case p.queuedLatticeBlock <- block:
+ default:
+ p.Log().Debug("Dropping lattice block propagation")
+ }
+}
+
+func (p *peer) SendVote(vote *coreTypes.Vote) error {
+ return p2p.Send(p.rw, VoteMsg, vote)
+}
+
+func (p *peer) AsyncSendVote(vote *coreTypes.Vote) {
+ select {
+ case p.queuedVote <- vote:
+ default:
+ p.Log().Debug("Dropping vote propagation")
+ }
+}
+
+func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
+ return p2p.Send(p.rw, AgreementMsg, agreement)
+}
+
+func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
+ select {
+ case p.queuedAgreement <- agreement:
+ default:
+ p.Log().Debug("Dropping agreement result")
+ }
+}
+
+func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error {
+ return p2p.Send(p.rw, RandomnessMsg, randomness)
+}
+
+func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) {
+ select {
+ case p.queuedRandomness <- randomness:
+ default:
+ p.Log().Debug("Dropping randomness result")
+ }
+}
+
+func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error {
+ return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare))
+}
+
+func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) {
+ select {
+ case p.queuedDKGPrivateShare <- privateShare:
+ default:
+ p.Log().Debug("Dropping DKG private share")
+ }
+}
+
+func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error {
+ return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
+}
+
+func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) {
+ select {
+ case p.queuedDKGPartialSignature <- psig:
+ default:
+ p.Log().Debug("Dropping DKG partial signature")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
@@ -474,7 +590,8 @@ type peerSet struct {
srvr p2pServer
gov governance
- peerLabels map[string]map[peerLabel]struct{}
+ peer2Labels map[string]map[peerLabel]struct{}
+ label2Peers map[peerLabel]map[string]struct{}
notaryHistory map[uint64]struct{}
dkgHistory map[uint64]struct{}
}
@@ -486,7 +603,8 @@ func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
gov: gov,
srvr: srvr,
tab: tab,
- peerLabels: make(map[string]map[peerLabel]struct{}),
+ peer2Labels: make(map[string]map[peerLabel]struct{}),
+ label2Peers: make(map[peerLabel]map[string]struct{}),
notaryHistory: make(map[uint64]struct{}),
dkgHistory: make(map[uint64]struct{}),
}
@@ -573,6 +691,21 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
+func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.label2Peers[label]))
+ for id := range ps.label2Peers[label] {
+ if p, ok := ps.peers[id]; ok {
+ if !p.knownVotes.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ }
+ return list
+}
+
// PeersWithoutNodeMeta retrieves a list of peers that do not have a
// given meta in their set of known hashes.
func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
@@ -587,6 +720,31 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
return list
}
+// TODO(sonic): finish the following dummy function.
+func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) allPeers() []*peer {
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
+ }
+ return list
+}
+
// BestPeer retrieves the known peer with the currently highest total difficulty.
func (ps *peerSet) BestPeer() *peer {
ps.lock.RLock()
@@ -746,11 +904,16 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
p.addLabel(label)
}
- if _, ok := ps.peerLabels[id]; !ok {
- ps.peerLabels[id] = make(map[peerLabel]struct{})
+ if _, ok := ps.peer2Labels[id]; !ok {
+ ps.peer2Labels[id] = make(map[peerLabel]struct{})
}
- ps.peerLabels[id][label] = struct{}{}
+ if _, ok := ps.label2Peers[label]; !ok {
+ ps.label2Peers[label] = make(map[string]struct{})
+ }
+
+ ps.peer2Labels[id][label] = struct{}{}
+ ps.label2Peers[label][id] = struct{}{}
ps.srvr.AddDirectPeer(ps.newNode(id))
}
@@ -760,11 +923,18 @@ func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
p.removeLabel(label)
}
- delete(ps.peerLabels[id], label)
+ delete(ps.peer2Labels[id], label)
- if len(ps.peerLabels[id]) == 0 {
+ if len(ps.peer2Labels[id]) == 0 {
ps.srvr.RemoveDirectPeer(ps.newNode(id))
- delete(ps.peerLabels, id)
+ delete(ps.peer2Labels, id)
+ }
+
+ if _, ok := ps.label2Peers[label]; ok {
+ delete(ps.label2Peers[label], id)
+ if len(ps.label2Peers[label]) == 0 {
+ delete(ps.label2Peers, label)
+ }
}
}