diff options
author | Sonic <sonic@cobinhood.com> | 2018-10-12 15:02:33 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:23:39 +0800 |
commit | 046ea228bf3892a01b8de060390e766483a69249 (patch) | |
tree | fd03cfde6234c4be590a1fcd6e6c69afea03a705 /dex/peer.go | |
parent | 8250fd5357e8a9c4b634cf289489ce2c4ec3470c (diff) | |
download | go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.gz go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.bz2 go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.lz go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.xz go-tangerine-046ea228bf3892a01b8de060390e766483a69249.tar.zst go-tangerine-046ea228bf3892a01b8de060390e766483a69249.zip |
dex: network: implement the network interface
Diffstat (limited to 'dex/peer.go')
-rw-r--r-- | dex/peer.go | 230 |
1 files changed, 200 insertions, 30 deletions
diff --git a/dex/peer.go b/dex/peer.go index 05947b456..e7c4f5d53 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -24,6 +24,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/p2p" @@ -104,31 +105,45 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownMetas mapset.Set // Set of node metas known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - term chan struct{} // Termination channel to stop the broadcaster + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownVotes mapset.Set + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedLatticeBlock chan *coreTypes.Block + queuedVote chan *coreTypes.Vote + queuedAgreement chan *coreTypes.AgreementResult + queuedRandomness chan *coreTypes.BlockRandomnessResult + queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare + queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - labels: mapset.NewSet(), - id: p.ID().String(), - knownTxs: mapset.NewSet(), - knownMetas: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - term: make(chan struct{}), + Peer: p, + rw: rw, + version: version, + labels: mapset.NewSet(), + id: p.ID().String(), + knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + knownVotes: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedLatticeBlock: make(chan *coreTypes.Block, 16), + queuedVote: make(chan *coreTypes.Vote, 16), + queuedAgreement: make(chan *coreTypes.AgreementResult, 16), + queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16), + queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16), + queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16), + term: make(chan struct{}), } } @@ -161,7 +176,36 @@ func (p *peer) broadcast() { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - + case block := <-p.queuedLatticeBlock: + if err := p.SendLatticeBlock(block); err != nil { + return + } + p.Log().Trace("Broadcast lattice block") + case vote := <-p.queuedVote: + if err := p.SendVote(vote); err != nil { + return + } + p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote)) + case agreement := <-p.queuedAgreement: + if err := p.SendAgreement(agreement); err != nil { + return + } + p.Log().Trace("Broadcast agreement") + case randomness := <-p.queuedRandomness: + if err := p.SendRandomness(randomness); err != nil { + return + } + p.Log().Trace("Broadcast randomness") + case privateShare := <-p.queuedDKGPrivateShare: + if err := p.SendDKGPrivateShare(privateShare); err != nil { + return + } + p.Log().Trace("Broadcast DKG private share") + case psig := <-p.queuedDKGPartialSignature: + if err := p.SendDKGPartialSignature(psig); err != nil { + return + } + p.Log().Trace("Broadcast DKG partial signature") case <-p.term: return } @@ -326,6 +370,78 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } } +func (p *peer) SendLatticeBlock(block *coreTypes.Block) error { + return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block)) +} + +func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) { + select { + case p.queuedLatticeBlock <- block: + default: + p.Log().Debug("Dropping lattice block propagation") + } +} + +func (p *peer) SendVote(vote *coreTypes.Vote) error { + return p2p.Send(p.rw, VoteMsg, vote) +} + +func (p *peer) AsyncSendVote(vote *coreTypes.Vote) { + select { + case p.queuedVote <- vote: + default: + p.Log().Debug("Dropping vote propagation") + } +} + +func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error { + return p2p.Send(p.rw, AgreementMsg, agreement) +} + +func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { + select { + case p.queuedAgreement <- agreement: + default: + p.Log().Debug("Dropping agreement result") + } +} + +func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error { + return p2p.Send(p.rw, RandomnessMsg, randomness) +} + +func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) { + select { + case p.queuedRandomness <- randomness: + default: + p.Log().Debug("Dropping randomness result") + } +} + +func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error { + return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare)) +} + +func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) { + select { + case p.queuedDKGPrivateShare <- privateShare: + default: + p.Log().Debug("Dropping DKG private share") + } +} + +func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error { + return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) +} + +func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) { + select { + case p.queuedDKGPartialSignature <- psig: + default: + p.Log().Debug("Dropping DKG partial signature") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) @@ -474,7 +590,8 @@ type peerSet struct { srvr p2pServer gov governance - peerLabels map[string]map[peerLabel]struct{} + peer2Labels map[string]map[peerLabel]struct{} + label2Peers map[peerLabel]map[string]struct{} notaryHistory map[uint64]struct{} dkgHistory map[uint64]struct{} } @@ -486,7 +603,8 @@ func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { gov: gov, srvr: srvr, tab: tab, - peerLabels: make(map[string]map[peerLabel]struct{}), + peer2Labels: make(map[string]map[peerLabel]struct{}), + label2Peers: make(map[peerLabel]map[string]struct{}), notaryHistory: make(map[uint64]struct{}), dkgHistory: make(map[uint64]struct{}), } @@ -573,6 +691,21 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } +func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.label2Peers[label])) + for id := range ps.label2Peers[label] { + if p, ok := ps.peers[id]; ok { + if !p.knownVotes.Contains(hash) { + list = append(list, p) + } + } + } + return list +} + // PeersWithoutNodeMeta retrieves a list of peers that do not have a // given meta in their set of known hashes. func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { @@ -587,6 +720,31 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { return list } +// TODO(sonic): finish the following dummy function. +func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) allPeers() []*peer { + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + // BestPeer retrieves the known peer with the currently highest total difficulty. func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() @@ -746,11 +904,16 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) { p.addLabel(label) } - if _, ok := ps.peerLabels[id]; !ok { - ps.peerLabels[id] = make(map[peerLabel]struct{}) + if _, ok := ps.peer2Labels[id]; !ok { + ps.peer2Labels[id] = make(map[peerLabel]struct{}) } - ps.peerLabels[id][label] = struct{}{} + if _, ok := ps.label2Peers[label]; !ok { + ps.label2Peers[label] = make(map[string]struct{}) + } + + ps.peer2Labels[id][label] = struct{}{} + ps.label2Peers[label][id] = struct{}{} ps.srvr.AddDirectPeer(ps.newNode(id)) } @@ -760,11 +923,18 @@ func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { p.removeLabel(label) } - delete(ps.peerLabels[id], label) + delete(ps.peer2Labels[id], label) - if len(ps.peerLabels[id]) == 0 { + if len(ps.peer2Labels[id]) == 0 { ps.srvr.RemoveDirectPeer(ps.newNode(id)) - delete(ps.peerLabels, id) + delete(ps.peer2Labels, id) + } + + if _, ok := ps.label2Peers[label]; ok { + delete(ps.label2Peers[label], id) + if len(ps.label2Peers[label]) == 0 { + delete(ps.label2Peers, label) + } } } |