From 959f70572cd688ccc6fd3e48fec94a73847f08f3 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 25 Sep 2018 17:18:52 +0800 Subject: dex: let peer able to send notary node info --- dex/peer.go | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/dex/peer.go b/dex/peer.go index 887cc200d..31861d707 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -40,6 +40,7 @@ var ( const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownInfos = 1024 // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might @@ -56,6 +57,8 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 + maxQueuedInfos = 1024 + handshakeTimeout = 5 * time.Second ) @@ -88,9 +91,11 @@ type peer struct { knownTxs mapset.Set // Set of transaction hashes known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownInfos mapset.Set // Set of infos known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer term chan struct{} // Termination channel to stop the broadcaster } @@ -102,16 +107,18 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), knownTxs: mapset.NewSet(), knownBlocks: mapset.NewSet(), + knownInfos: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos), term: make(chan struct{}), } } -// broadcast is a write loop that multiplexes block propagations, announcements -// and transaction broadcasts into the remote peer. The goal is to have an async -// writer that does not lock up node internals. +// broadcast is a write loop that multiplexes block propagations, announcements, +// transaction and notary node infos broadcasts into the remote peer. +// The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { select { @@ -133,6 +140,12 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + case info := <-p.queuedInfos: + if err := p.SendNotaryNodeInfo(info); err != nil { + return + } + p.Log().Trace("Broadcast notary node info") + case <-p.term: return } @@ -194,6 +207,13 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } +func (p *peer) MarkNotaryNodeInfo(hash common.Hash) { + for p.knownInfos.Cardinality() >= maxKnownInfos { + p.knownInfos.Pop() + } + p.knownInfos.Add(hash) +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { @@ -216,6 +236,24 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } +// SendNotaryNodeInfo sends the info to the peer and includes the hashes +// in its info hash set for future reference. +func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error { + return p2p.Send(p.rw, NotaryNodeInfoMsg, info) +} + +// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) { + select { + case p.queuedInfos <- info: + p.knownInfos.Add(info.Hash()) + default: + p.Log().Debug("Dropping notary node info propagation") + } +} + // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { -- cgit v1.2.3