diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 17:18:52 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:21:31 +0800 |
commit | d461a77d0c7c34c4d686c22f0c9d4bb77fd17569 (patch) | |
tree | 595a18cd318e74acd97f514a3f125987400906aa /dex/peer.go | |
parent | 4e6652836d06258baa5dd6388ffb98b654cebfec (diff) | |
download | go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar.gz go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar.bz2 go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar.lz go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar.xz go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.tar.zst go-tangerine-d461a77d0c7c34c4d686c22f0c9d4bb77fd17569.zip |
dex: let peer able to send notary node info
Diffstat (limited to 'dex/peer.go')
-rw-r--r-- | dex/peer.go | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/dex/peer.go b/dex/peer.go index 887cc200d..31861d707 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -40,6 +40,7 @@ var ( const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownInfos = 1024 // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might @@ -56,6 +57,8 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 + maxQueuedInfos = 1024 + handshakeTimeout = 5 * time.Second ) @@ -88,9 +91,11 @@ type peer struct { knownTxs mapset.Set // Set of transaction hashes known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownInfos mapset.Set // Set of infos known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer term chan struct{} // Termination channel to stop the broadcaster } @@ -102,16 +107,18 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), knownTxs: mapset.NewSet(), knownBlocks: mapset.NewSet(), + knownInfos: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos), term: make(chan struct{}), } } -// broadcast is a write loop that multiplexes block propagations, announcements -// and transaction broadcasts into the remote peer. The goal is to have an async -// writer that does not lock up node internals. +// broadcast is a write loop that multiplexes block propagations, announcements, +// transaction and notary node infos broadcasts into the remote peer. +// The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { select { @@ -133,6 +140,12 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + case info := <-p.queuedInfos: + if err := p.SendNotaryNodeInfo(info); err != nil { + return + } + p.Log().Trace("Broadcast notary node info") + case <-p.term: return } @@ -194,6 +207,13 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } +func (p *peer) MarkNotaryNodeInfo(hash common.Hash) { + for p.knownInfos.Cardinality() >= maxKnownInfos { + p.knownInfos.Pop() + } + p.knownInfos.Add(hash) +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { @@ -216,6 +236,24 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } +// SendNotaryNodeInfo sends the info to the peer and includes the hashes +// in its info hash set for future reference. +func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error { + return p2p.Send(p.rw, NotaryNodeInfoMsg, info) +} + +// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) { + select { + case p.queuedInfos <- info: + p.knownInfos.Add(info.Hash()) + default: + p.Log().Debug("Dropping notary node info propagation") + } +} + // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { |