diff options
-rw-r--r-- | dex/handler.go | 55 | ||||
-rw-r--r-- | dex/peer.go | 15 |
2 files changed, 51 insertions, 19 deletions
diff --git a/dex/handler.go b/dex/handler.go index 3a5a81f50..245b31807 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -77,6 +77,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + minTxReceiver = 3 + finalizedBlockChanSize = 128 recordChanSize = 10240 @@ -1004,17 +1006,62 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { - var txset = make(map[*peer]types.Transactions) + round := pm.blockchain.CurrentBlock().Round() + label := peerLabel{ + set: notaryset, + round: round, + } + + // Send to at most `notaryReceiverNum`. + // If we don't have many notary peers, + // send to at least `minTxReceiver` notary peers. (set notaryReceiverNum = minTxReceiver) + notaryPeers := pm.peers.PeersWithLabel(label) + notaryReceiverNum := int(math.Sqrt(float64(len(notaryPeers)))) + if notaryReceiverNum < minTxReceiver { + notaryReceiverNum = minTxReceiver + } + + // Send to at most `maxReceiver` peers (including notary peers). + // If we don't have many peers, + // send to a least `minTxReceiver` peers. (set maxReceiver = minTxReceiver) + peers := pm.peers.Peers() + maxReceiver := int(math.Sqrt(float64(len(peers)))) + if maxReceiver < minTxReceiver { + maxReceiver = minTxReceiver + } + var txset = make(map[*peer]types.Transactions) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { - peers := pm.peers.PeersWithoutTx(tx.Hash()) + receivers := make(map[*peer]struct{}) + + // notary peers first + for _, peer := range notaryPeers { + if !peer.knownTxs.Contains(tx.Hash()) { + receivers[peer] = struct{}{} + } + if len(receivers) >= notaryReceiverNum { + break + } + } + for _, peer := range peers { + if len(receivers) >= maxReceiver { + break + } + + // not add to receivers yet and not known the tx + if _, ok := receivers[peer]; !ok && !peer.knownTxs.Contains(tx.Hash()) { + receivers[peer] = struct{}{} + } + } + + for peer := range receivers { txset[peer] = append(txset[peer], tx) } - log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(receivers)) } - // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { peer.AsyncSendTransactions(txs) } diff --git a/dex/peer.go b/dex/peer.go index f92fce130..562cbfaca 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -854,21 +854,6 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { return list } -// PeersWithoutTx retrieves a list of peers that do not have a given transaction -// in their set of known hashes. -func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.knownTxs.Contains(hash) { - list = append(list, p) - } - } - return list -} - func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() |