aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-25 17:18:52 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:48 +0800
commit959f70572cd688ccc6fd3e48fec94a73847f08f3 (patch)
tree377850ebdce95bdd64e8fe361391c6083e92280f /dex/peer.go
parentcccee95ab66e9e4d6e6a3897b2e543221a27e44c (diff)
downloaddexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar.gz
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar.bz2
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar.lz
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar.xz
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.tar.zst
dexon-959f70572cd688ccc6fd3e48fec94a73847f08f3.zip
dex: let peer able to send notary node info
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go44
1 files changed, 41 insertions, 3 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 887cc200d..31861d707 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -40,6 +40,7 @@ var (
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
@@ -56,6 +57,8 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
+ maxQueuedInfos = 1024
+
handshakeTimeout = 5 * time.Second
)
@@ -88,9 +91,11 @@ type peer struct {
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -102,16 +107,18 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
+ knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
-// broadcast is a write loop that multiplexes block propagations, announcements
-// and transaction broadcasts into the remote peer. The goal is to have an async
-// writer that does not lock up node internals.
+// broadcast is a write loop that multiplexes block propagations, announcements,
+// transaction and notary node infos broadcasts into the remote peer.
+// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
select {
@@ -133,6 +140,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
+ case info := <-p.queuedInfos:
+ if err := p.SendNotaryNodeInfo(info); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast notary node info")
+
case <-p.term:
return
}
@@ -194,6 +207,13 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
+func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
+ for p.knownInfos.Cardinality() >= maxKnownInfos {
+ p.knownInfos.Pop()
+ }
+ p.knownInfos.Add(hash)
+}
+
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
@@ -216,6 +236,24 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
+// SendNotaryNodeInfo sends the info to the peer and includes the hashes
+// in its info hash set for future reference.
+func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
+ return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+}
+
+// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// remote peer. If the peer's broadcast queue is full, the event is silently
+// dropped.
+func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+ select {
+ case p.queuedInfos <- info:
+ p.knownInfos.Add(info.Hash())
+ default:
+ p.Log().Debug("Dropping notary node info propagation")
+ }
+}
+
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {