From ace78f3a2804f2f4953f3ad9b945a4dbea221ac7 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 2 Nov 2018 10:40:58 +0800 Subject: dex: implement PullBlocks/PullVotes (#1) --- dex/peer.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) (limited to 'dex/peer.go') diff --git a/dex/peer.go b/dex/peer.go index c005cec16..2fe8cac08 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -24,6 +24,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg" @@ -77,6 +78,8 @@ const ( maxQueuedRandomnesses = 16 maxQueuedDKGPrivateShare = 16 maxQueuedDKGParitialSignature = 16 + maxQueuedPullBlocks = 128 + maxQueuedPullVotes = 128 handshakeTimeout = 5 * time.Second @@ -141,6 +144,8 @@ type peer struct { queuedRandomnesses chan *coreTypes.BlockRandomnessResult queuedDKGPrivateShares chan *dkgTypes.PrivateShare queuedDKGPartialSignatures chan *dkgTypes.PartialSignature + queuedPullBlocks chan coreCommon.Hashes + queuedPullVotes chan coreTypes.Position term chan struct{} // Termination channel to stop the broadcaster } @@ -169,7 +174,9 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare), queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), - term: make(chan struct{}), + queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), + queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes), + term: make(chan struct{}), } } @@ -232,6 +239,16 @@ func (p *peer) broadcast() { return } p.Log().Trace("Broadcast DKG partial signature") + case hashes := <-p.queuedPullBlocks: + if err := p.SendPullBlocks(hashes); err != nil { + return + } + p.Log().Trace("Pulling Blocks", "hashes", hashes) + case pos := <-p.queuedPullVotes: + if err := p.SendPullVotes(pos); err != nil { + return + } + p.Log().Trace("Pulling Votes", "position", pos) case <-p.term: return } @@ -472,6 +489,30 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) { } } +func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error { + return p2p.Send(p.rw, PullBlocksMsg, hashes) +} + +func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) { + select { + case p.queuedPullBlocks <- hashes: + default: + p.Log().Debug("Dropping Pull Blocks") + } +} + +func (p *peer) SendPullVotes(pos coreTypes.Position) error { + return p2p.Send(p.rw, PullVotesMsg, pos) +} + +func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { + select { + case p.queuedPullVotes <- pos: + default: + p.Log().Debug("Dropping Pull Votes") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) @@ -693,6 +734,18 @@ func (ps *peerSet) Len() int { return len(ps.peers) } +// Peers retrieves all of the peers. +func (ps *peerSet) Peers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + // PeersWithoutBlock retrieves a list of peers that do not have a given block in // their set of known hashes. func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { -- cgit v1.2.3