aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/handler.go55
-rw-r--r--dex/peer.go15
2 files changed, 51 insertions, 19 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 3a5a81f50..245b31807 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -77,6 +77,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ minTxReceiver = 3
+
finalizedBlockChanSize = 128
recordChanSize = 10240
@@ -1004,17 +1006,62 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
- var txset = make(map[*peer]types.Transactions)
+ round := pm.blockchain.CurrentBlock().Round()
+ label := peerLabel{
+ set: notaryset,
+ round: round,
+ }
+
+ // Send to at most `notaryReceiverNum`.
+ // If we don't have many notary peers,
+ // send to at least `minTxReceiver` notary peers. (set notaryReceiverNum = minTxReceiver)
+ notaryPeers := pm.peers.PeersWithLabel(label)
+ notaryReceiverNum := int(math.Sqrt(float64(len(notaryPeers))))
+ if notaryReceiverNum < minTxReceiver {
+ notaryReceiverNum = minTxReceiver
+ }
+
+ // Send to at most `maxReceiver` peers (including notary peers).
+ // If we don't have many peers,
+ // send to a least `minTxReceiver` peers. (set maxReceiver = minTxReceiver)
+ peers := pm.peers.Peers()
+ maxReceiver := int(math.Sqrt(float64(len(peers))))
+ if maxReceiver < minTxReceiver {
+ maxReceiver = minTxReceiver
+ }
+ var txset = make(map[*peer]types.Transactions)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
- peers := pm.peers.PeersWithoutTx(tx.Hash())
+ receivers := make(map[*peer]struct{})
+
+ // notary peers first
+ for _, peer := range notaryPeers {
+ if !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ if len(receivers) >= notaryReceiverNum {
+ break
+ }
+ }
+
for _, peer := range peers {
+ if len(receivers) >= maxReceiver {
+ break
+ }
+
+ // not add to receivers yet and not known the tx
+ if _, ok := receivers[peer]; !ok && !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ }
+
+ for peer := range receivers {
txset[peer] = append(txset[peer], tx)
}
- log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(receivers))
}
- // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
diff --git a/dex/peer.go b/dex/peer.go
index f92fce130..562cbfaca 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -854,21 +854,6 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
return list
}
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownTxs.Contains(hash) {
- list = append(list, p)
- }
- }
- return list
-}
-
func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()