diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 55 |
1 files changed, 51 insertions, 4 deletions
diff --git a/dex/handler.go b/dex/handler.go index 3a5a81f50..245b31807 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -77,6 +77,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + minTxReceiver = 3 + finalizedBlockChanSize = 128 recordChanSize = 10240 @@ -1004,17 +1006,62 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { - var txset = make(map[*peer]types.Transactions) + round := pm.blockchain.CurrentBlock().Round() + label := peerLabel{ + set: notaryset, + round: round, + } + + // Send to at most `notaryReceiverNum`. + // If we don't have many notary peers, + // send to at least `minTxReceiver` notary peers. (set notaryReceiverNum = minTxReceiver) + notaryPeers := pm.peers.PeersWithLabel(label) + notaryReceiverNum := int(math.Sqrt(float64(len(notaryPeers)))) + if notaryReceiverNum < minTxReceiver { + notaryReceiverNum = minTxReceiver + } + + // Send to at most `maxReceiver` peers (including notary peers). + // If we don't have many peers, + // send to a least `minTxReceiver` peers. (set maxReceiver = minTxReceiver) + peers := pm.peers.Peers() + maxReceiver := int(math.Sqrt(float64(len(peers)))) + if maxReceiver < minTxReceiver { + maxReceiver = minTxReceiver + } + var txset = make(map[*peer]types.Transactions) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { - peers := pm.peers.PeersWithoutTx(tx.Hash()) + receivers := make(map[*peer]struct{}) + + // notary peers first + for _, peer := range notaryPeers { + if !peer.knownTxs.Contains(tx.Hash()) { + receivers[peer] = struct{}{} + } + if len(receivers) >= notaryReceiverNum { + break + } + } + for _, peer := range peers { + if len(receivers) >= maxReceiver { + break + } + + // not add to receivers yet and not known the tx + if _, ok := receivers[peer]; !ok && !peer.knownTxs.Contains(tx.Hash()) { + receivers[peer] = struct{}{} + } + } + + for peer := range receivers { txset[peer] = append(txset[peer], tx) } - log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(receivers)) } - // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { peer.AsyncSendTransactions(txs) } |