diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 581 |
1 files changed, 317 insertions, 264 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 99e1c495f..4c9410d8e 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -45,8 +45,11 @@ var ( ErrNegativeValue = errors.New("Negative value") ) -const ( - maxQueued = 64 // max limit of queued txs per address +var ( + maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address + maxQueuedInTotal = uint64(8192) // Max limit of queued transactions from all accounts + maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued + evictionInterval = time.Minute // Time interval to check for evictable transactions ) type stateFn func() (*state.StateDB, error) @@ -68,10 +71,14 @@ type TxPool struct { events event.Subscription localTx *txSet mu sync.RWMutex - pending map[common.Hash]*types.Transaction // processable transactions - queue map[common.Address]map[common.Hash]*types.Transaction - wg sync.WaitGroup // for shutdown sync + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + all map[common.Hash]*types.Transaction // All transactions to allow lookups + beats map[common.Address]time.Time // Last heartbeat from each known account + + wg sync.WaitGroup // for shutdown sync + quit chan struct{} homestead bool } @@ -79,8 +86,10 @@ type TxPool struct { func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { pool := &TxPool{ config: config, - pending: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]map[common.Hash]*types.Transaction), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + all: make(map[common.Hash]*types.Transaction), + beats: make(map[common.Address]time.Time), eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, @@ -88,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat pendingState: nil, localTx: newTxSet(), events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), + quit: make(chan struct{}), } - pool.wg.Add(1) + pool.wg.Add(2) go pool.eventLoop() + go pool.expirationLoop() return pool } @@ -117,7 +128,7 @@ func (pool *TxPool) eventLoop() { pool.minGasPrice = ev.Price pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddTransactions(ev.Txs) + pool.AddBatch(ev.Txs) } } } @@ -125,12 +136,12 @@ func (pool *TxPool) eventLoop() { func (pool *TxPool) resetState() { currentState, err := pool.currentState() if err != nil { - glog.V(logger.Info).Infoln("failed to get current state: %v", err) + glog.V(logger.Error).Infof("Failed to get current state: %v", err) return } managedState := state.ManageState(currentState) if err != nil { - glog.V(logger.Info).Infoln("failed to get managed state: %v", err) + glog.V(logger.Error).Infof("Failed to get managed state: %v", err) return } pool.pendingState = managedState @@ -139,26 +150,21 @@ func (pool *TxPool) resetState() { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.validatePool() - - // Loop over the pending transactions and base the nonce of the new - // pending transaction set. - for _, tx := range pool.pending { - if addr, err := tx.From(); err == nil { - // Set the nonce. Transaction nonce can never be lower - // than the state nonce; validatePool took care of that. - if pool.pendingState.GetNonce(addr) <= tx.Nonce() { - pool.pendingState.SetNonce(addr, tx.Nonce()+1) - } - } + pool.demoteUnexecutables() + + // Update all accounts to the latest known pending nonce + for addr, list := range pool.pending { + txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway + pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.checkQueue() + pool.promoteExecutables() } func (pool *TxPool) Stop() { pool.events.Unsubscribe() + close(pool.quit) pool.wg.Wait() glog.V(logger.Info).Infoln("Transaction pool stopped") } @@ -170,47 +176,58 @@ func (pool *TxPool) State() *state.ManagedState { return pool.pendingState } +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. func (pool *TxPool) Stats() (pending int, queued int) { pool.mu.RLock() defer pool.mu.RUnlock() - pending = len(pool.pending) - for _, txs := range pool.queue { - queued += len(txs) + for _, list := range pool.pending { + pending += list.Len() + } + for _, list := range pool.queue { + queued += list.Len() } return } // Content retrieves the data content of the transaction pool, returning all the -// pending as well as queued transactions, grouped by account and nonce. -func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) { +// pending as well as queued transactions, grouped by account and sorted by nonce. +func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { pool.mu.RLock() defer pool.mu.RUnlock() - // Retrieve all the pending transactions and sort by account and by nonce - pending := make(map[common.Address]map[uint64][]*types.Transaction) - for _, tx := range pool.pending { - account, _ := tx.From() - - owned, ok := pending[account] - if !ok { - owned = make(map[uint64][]*types.Transaction) - pending[account] = owned - } - owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) - } - // Retrieve all the queued transactions and sort by account and by nonce - queued := make(map[common.Address]map[uint64][]*types.Transaction) - for account, txs := range pool.queue { - owned := make(map[uint64][]*types.Transaction) - for _, tx := range txs { - owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) - } - queued[account] = owned + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + queued := make(map[common.Address]types.Transactions) + for addr, list := range pool.queue { + queued[addr] = list.Flatten() } return pending, queued } +// Pending retrieves all currently processable transactions, groupped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) Pending() map[common.Address]types.Transactions { + pool.mu.Lock() + defer pool.mu.Unlock() + + // check queue first + pool.promoteExecutables() + + // invalidate any txs + pool.demoteUnexecutables() + + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + return pending +} + // SetLocal marks a transaction as local, skipping gas price // check against local miner minimum in the future func (pool *TxPool) SetLocal(tx *types.Transaction) { @@ -276,312 +293,348 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return nil } -// validate and queue transactions. -func (self *TxPool) add(tx *types.Transaction) error { +// add validates a transaction and inserts it into the non-executable queue for +// later pending promotion and execution. +func (pool *TxPool) add(tx *types.Transaction) error { + // If the transaction is alreayd known, discard it hash := tx.Hash() - - if self.pending[hash] != nil { - return fmt.Errorf("Known transaction (%x)", hash[:4]) + if pool.all[hash] != nil { + return fmt.Errorf("Known transaction: %x", hash[:4]) } - err := self.validateTx(tx) - if err != nil { + // Otherwise ensure basic validation passes and queue it up + if err := pool.validateTx(tx); err != nil { return err } - self.queueTx(hash, tx) + pool.enqueueTx(hash, tx) + // Print a log message if low enough level is set if glog.V(logger.Debug) { - var toname string + rcpt := "[NEW_CONTRACT]" if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" + rcpt = common.Bytes2Hex(to[:4]) } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) + from, _ := tx.From() // from already verified during tx validation + glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash) } - return nil } -// queueTx will queue an unknown transaction -func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { +// enqueueTx inserts a new transaction into the non-executable transaction queue. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { + // Try to insert the transaction into the future queue from, _ := tx.From() // already validated - if self.queue[from] == nil { - self.queue[from] = make(map[common.Hash]*types.Transaction) + if pool.queue[from] == nil { + pool.queue[from] = newTxList(false) + } + inserted, old := pool.queue[from].Add(tx) + if !inserted { + return // An older transaction was better, discard this + } + // Discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } - self.queue[from][hash] = tx + pool.all[hash] = tx } -// addTx will add a transaction to the pending (processable queue) list of transactions -func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { - // init delayed since tx pool could have been started before any state sync +// promoteTx adds a transaction to the pending (processable) list of transactions. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { + // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } + // Try to insert the transaction into the pending queue + if pool.pending[addr] == nil { + pool.pending[addr] = newTxList(true) + } + list := pool.pending[addr] - if _, ok := pool.pending[hash]; !ok { - pool.pending[hash] = tx - - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, tx.Nonce()+1) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. - go pool.eventMux.Post(TxPreEvent{tx}) + inserted, old := list.Add(tx) + if !inserted { + // An older transaction was better, discard this + delete(pool.all, hash) + return + } + // Otherwise discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } + pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) + + // Set the potentially new pending nonce and notify any subsystems of the new tx + pool.beats[addr] = time.Now() + pool.pendingState.SetNonce(addr, tx.Nonce()+1) + go pool.eventMux.Post(TxPreEvent{tx}) } // Add queues a single transaction in the pool if it is valid. -func (self *TxPool) Add(tx *types.Transaction) error { - self.mu.Lock() - defer self.mu.Unlock() +func (pool *TxPool) Add(tx *types.Transaction) error { + pool.mu.Lock() + defer pool.mu.Unlock() - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { return err } - self.checkQueue() + pool.promoteExecutables() + return nil } -// AddTransactions attempts to queue all valid transactions in txs. -func (self *TxPool) AddTransactions(txs []*types.Transaction) { - self.mu.Lock() - defer self.mu.Unlock() +// AddBatch attempts to queue a batch of transactions. +func (pool *TxPool) AddBatch(txs []*types.Transaction) { + pool.mu.Lock() + defer pool.mu.Unlock() for _, tx := range txs { - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { glog.V(logger.Debug).Infoln("tx error:", err) - } else { - h := tx.Hash() - glog.V(logger.Debug).Infof("tx %x\n", h[:4]) } } - - // check and validate the queue - self.checkQueue() + pool.promoteExecutables() } -// GetTransaction returns a transaction if it is contained in the pool +// Get returns a transaction if it is contained in the pool // and nil otherwise. -func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { - tp.mu.RLock() - defer tp.mu.RUnlock() - - // check the txs first - if tx, ok := tp.pending[hash]; ok { - return tx - } - // check queue - for _, txs := range tp.queue { - if tx, ok := txs[hash]; ok { - return tx - } - } - return nil -} +func (pool *TxPool) Get(hash common.Hash) *types.Transaction { + pool.mu.RLock() + defer pool.mu.RUnlock() -// GetTransactions returns all currently processable transactions. -// The returned slice may be modified by the caller. -func (self *TxPool) GetTransactions() (txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() + return pool.all[hash] +} - // check queue first - self.checkQueue() - // invalidate any txs - self.validatePool() +// Remove removes the transaction with the given hash from the pool. +func (pool *TxPool) Remove(hash common.Hash) { + pool.mu.Lock() + defer pool.mu.Unlock() - txs = make(types.Transactions, len(self.pending)) - i := 0 - for _, tx := range self.pending { - txs[i] = tx - i++ - } - return txs + pool.removeTx(hash) } -// GetQueuedTransactions returns all non-processable transactions. -func (self *TxPool) GetQueuedTransactions() types.Transactions { - self.mu.RLock() - defer self.mu.RUnlock() +// RemoveBatch removes all given transactions from the pool. +func (pool *TxPool) RemoveBatch(txs types.Transactions) { + pool.mu.Lock() + defer pool.mu.Unlock() - var ret types.Transactions - for _, txs := range self.queue { - for _, tx := range txs { - ret = append(ret, tx) - } + for _, tx := range txs { + pool.removeTx(tx.Hash()) } - sort.Sort(types.TxByNonce(ret)) - return ret } -// RemoveTransactions removes all given transactions from the pool. -func (self *TxPool) RemoveTransactions(txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() - for _, tx := range txs { - self.removeTx(tx.Hash()) +// removeTx removes a single transaction from the queue, moving all subsequent +// transactions back to the future queue. +func (pool *TxPool) removeTx(hash common.Hash) { + // Fetch the transaction we wish to delete + tx, ok := pool.all[hash] + if !ok { + return } -} + addr, _ := tx.From() // already validated during insertion -// RemoveTx removes the transaction with the given hash from the pool. -func (pool *TxPool) RemoveTx(hash common.Hash) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.removeTx(hash) -} + // Remove it from the list of known transactions + delete(pool.all, hash) -func (pool *TxPool) removeTx(hash common.Hash) { - // delete from pending pool - delete(pool.pending, hash) - // delete from queue - for address, txs := range pool.queue { - if _, ok := txs[hash]; ok { - if len(txs) == 1 { - // if only one tx, remove entire address entry. - delete(pool.queue, address) + // Remove the transaction from the pending lists and reset the account nonce + if pending := pool.pending[addr]; pending != nil { + if removed, invalids := pending.Remove(tx); removed { + // If no more transactions are left, remove the list + if pending.Empty() { + delete(pool.pending, addr) + delete(pool.beats, addr) } else { - delete(txs, hash) + // Otherwise postpone any invalidated transactions + for _, tx := range invalids { + pool.enqueueTx(tx.Hash(), tx) + } + } + // Update the account nonce if needed + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, tx.Nonce()) } - break + } + } + // Transaction is in the future queue + if future := pool.queue[addr]; future != nil { + future.Remove(tx) + if future.Empty() { + delete(pool.queue, addr) } } } -// checkQueue moves transactions that have become processable to main pool. -func (pool *TxPool) checkQueue() { - // init delayed since tx pool could have been started before any state sync +// promoteExecutables moves transactions that have become processable from the +// future queue to the set of pending transactions. During this process, all +// invalidated transactions (low nonce, low balance) are deleted. +func (pool *TxPool) promoteExecutables() { + // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } + // Retrieve the current state to allow nonce and balance checking + state, err := pool.currentState() + if err != nil { + glog.Errorf("Could not get current state: %v", err) + return + } + // Iterate over all accounts and promote any executable transactions + queued := uint64(0) - var promote txQueue - for address, txs := range pool.queue { - currentState, err := pool.currentState() - if err != nil { - glog.Errorf("could not get current state: %v", err) - return + for addr, list := range pool.queue { + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(state.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Removed old queued transaction: %v", tx) + } + delete(pool.all, tx.Hash()) } - balance := currentState.GetBalance(address) - - var ( - guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state) - trueNonce = currentState.GetNonce(address) // nonce known by the last state - ) - promote = promote[:0] - for hash, tx := range txs { - // Drop processed or out of fund transactions - if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 { - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) - } - delete(txs, hash) - continue + // Drop all transactions that are too costly (low balance) + drops, _ := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { + if glog.V(logger.Core) { + glog.Infof("Removed unpayable queued transaction: %v", tx) } - // Collect the remaining transactions for the next pass. - promote = append(promote, txQueueEntry{hash, address, tx}) + delete(pool.all, tx.Hash()) } - // Find the next consecutive nonce range starting at the current account nonce, - // pushing the guessed nonce forward if we add consecutive transactions. - sort.Sort(promote) - for i, entry := range promote { - // If we reached a gap in the nonces, enforce transaction limit and stop - if entry.Nonce() > guessedNonce { - if len(promote)-i > maxQueued { - if glog.V(logger.Debug) { - glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:])) - } - for _, drop := range promote[i+maxQueued:] { - delete(txs, drop.hash) - } - } - break + // Gather all executable transactions and promote them + for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Promoting queued transaction: %v", tx) } - // Otherwise promote the transaction and move the guess nonce if needed - pool.addTx(entry.hash, address, entry.Transaction) - delete(txs, entry.hash) - - if entry.Nonce() == guessedNonce { - guessedNonce++ + pool.promoteTx(addr, tx.Hash(), tx) + } + // Drop all transactions over the allowed limit + for _, tx := range list.Cap(int(maxQueuedPerAccount)) { + if glog.V(logger.Core) { + glog.Infof("Removed cap-exceeding queued transaction: %v", tx) } + delete(pool.all, tx.Hash()) } + queued += uint64(list.Len()) + // Delete the entire queue entry if it became empty. - if len(txs) == 0 { - delete(pool.queue, address) + if list.Empty() { + delete(pool.queue, addr) + } + } + // If we've queued more transactions than the hard limit, drop oldest ones + if queued > maxQueuedInTotal { + // Sort all accounts with queued transactions by heartbeat + addresses := make(addresssByHeartbeat, 0, len(pool.queue)) + for addr, _ := range pool.queue { + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + } + sort.Sort(addresses) + + // Drop transactions until the total is below the limit + for drop := queued - maxQueuedInTotal; drop > 0; { + addr := addresses[len(addresses)-1] + list := pool.queue[addr.address] + + addresses = addresses[:len(addresses)-1] + + // Drop all transactions if they are less than the overflow + if size := uint64(list.Len()); size <= drop { + for _, tx := range list.Flatten() { + pool.removeTx(tx.Hash()) + } + drop -= size + continue + } + // Otherwise drop only last few transactions + txs := list.Flatten() + for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + pool.removeTx(txs[i].Hash()) + drop-- + } } } } -// validatePool removes invalid and processed transactions from the main pool. -// If a transaction is removed for being invalid (e.g. out of funds), all sub- -// sequent (Still valid) transactions are moved back into the future queue. This -// is important to prevent a drained account from DOSing the network with non -// executable transactions. -func (pool *TxPool) validatePool() { +// demoteUnexecutables removes invalid and processed transactions from the pools +// executable/pending queue and any subsequent transactions that become unexecutable +// are moved back into the future queue. +func (pool *TxPool) demoteUnexecutables() { + // Retrieve the current state to allow nonce and balance checking state, err := pool.currentState() if err != nil { glog.V(logger.Info).Infoln("failed to get current state: %v", err) return } - balanceCache := make(map[common.Address]*big.Int) - - // Clean up the pending pool, accumulating invalid nonces - gaps := make(map[common.Address]uint64) + // Iterate over all accounts and demote any non-executable transactions + for addr, list := range pool.pending { + nonce := state.GetNonce(addr) - for hash, tx := range pool.pending { - sender, _ := tx.From() // err already checked - - // Perform light nonce and balance validation - balance := balanceCache[sender] - if balance == nil { - balance = state.GetBalance(sender) - balanceCache[sender] = balance + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(nonce) { + if glog.V(logger.Core) { + glog.Infof("Removed old pending transaction: %v", tx) + } + delete(pool.all, tx.Hash()) } - if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 { - // Remove an already past it invalidated transaction + // Drop all transactions that are too costly (low balance), and queue any invalids back for later + drops, invalids := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) + glog.Infof("Removed unpayable pending transaction: %v", tx) } - delete(pool.pending, hash) - - // Track the smallest invalid nonce to postpone subsequent transactions - if !past { - if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev { - gaps[sender] = tx.Nonce() - } + delete(pool.all, tx.Hash()) + } + for _, tx := range invalids { + if glog.V(logger.Core) { + glog.Infof("Demoting pending transaction: %v", tx) } + pool.enqueueTx(tx.Hash(), tx) + } + // Delete the entire queue entry if it became empty. + if list.Empty() { + delete(pool.pending, addr) + delete(pool.beats, addr) } } - // Move all transactions after a gap back to the future queue - if len(gaps) > 0 { - for hash, tx := range pool.pending { - sender, _ := tx.From() - if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap { - if glog.V(logger.Core) { - glog.Infof("postponed tx (%v) due to introduced gap\n", tx) +} + +// expirationLoop is a loop that periodically iterates over all accounts with +// queued transactions and drop all that have been inactive for a prolonged amount +// of time. +func (pool *TxPool) expirationLoop() { + defer pool.wg.Done() + + evict := time.NewTicker(evictionInterval) + defer evict.Stop() + + for { + select { + case <-evict.C: + pool.mu.Lock() + for addr := range pool.queue { + if time.Since(pool.beats[addr]) > maxQueuedLifetime { + for _, tx := range pool.queue[addr].Flatten() { + pool.removeTx(tx.Hash()) + } } - pool.queueTx(hash, tx) - delete(pool.pending, hash) } + pool.mu.Unlock() + + case <-pool.quit: + return } } } -type txQueue []txQueueEntry - -type txQueueEntry struct { - hash common.Hash - addr common.Address - *types.Transaction +// addressByHeartbeat is an account address tagged with its last activity timestamp. +type addressByHeartbeat struct { + address common.Address + heartbeat time.Time } -func (q txQueue) Len() int { return len(q) } -func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } -func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() } +type addresssByHeartbeat []addressByHeartbeat + +func (a addresssByHeartbeat) Len() int { return len(a) } +func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } +func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // txSet represents a set of transaction hashes in which entries // are automatically dropped after txSetDuration time |