diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 152 |
1 files changed, 102 insertions, 50 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 8e2d1b31d..16f774265 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -99,7 +99,9 @@ type stateFn func() (*state.StateDB, error) // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { - NoLocals bool // Whether local transaction handling should be disabled + NoLocals bool // Whether local transaction handling should be disabled + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -115,6 +117,9 @@ type TxPoolConfig struct { // DefaultTxPoolConfig contains the default configurations for the transaction // pool. var DefaultTxPoolConfig = TxPoolConfig{ + Journal: "transactions.rlp", + Rejournal: time.Hour, + PriceLimit: 1, PriceBump: 10, @@ -130,6 +135,10 @@ var DefaultTxPoolConfig = TxPoolConfig{ // unreasonable or unworkable. func (config *TxPoolConfig) sanitize() TxPoolConfig { conf := *config + if conf.Rejournal < time.Second { + log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) + conf.Rejournal = time.Second + } if conf.PriceLimit < 1 { log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit) conf.PriceLimit = DefaultTxPoolConfig.PriceLimit @@ -157,18 +166,19 @@ type TxPool struct { gasPrice *big.Int eventMux *event.TypeMux events *event.TypeMuxSubscription - locals *accountSet signer types.Signer mu sync.RWMutex + locals *accountSet // Set of local transaction to exepmt from evicion rules + journal *txJournal // Journal of local transaction to back up to disk + pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all map[common.Hash]*types.Transaction // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - wg sync.WaitGroup // for shutdown sync - quit chan struct{} + wg sync.WaitGroup // for shutdown sync homestead bool } @@ -194,32 +204,48 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), - quit: make(chan struct{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) pool.resetState() - // Start the various events loops and return - pool.wg.Add(2) - go pool.eventLoop() - go pool.expirationLoop() + // If local transactions and journaling is enabled, load from disk + if !config.NoLocals && config.Journal != "" { + pool.journal = newTxJournal(config.Journal) + + if err := pool.journal.load(pool.AddLocal); err != nil { + log.Warn("Failed to load transaction journal", "err", err) + } + if err := pool.journal.rotate(pool.local()); err != nil { + log.Warn("Failed to rotate transaction journal", "err", err) + } + } + // Start the event loop and return + pool.wg.Add(1) + go pool.loop() return pool } -func (pool *TxPool) eventLoop() { +// loop is the transaction pool's main event loop, waiting for and reacting to +// outside blockchain events as well as for various reporting and transaction +// eviction events. +func (pool *TxPool) loop() { defer pool.wg.Done() - // Start a ticker and keep track of interesting pool stats to report + // Start the stats reporting and transaction eviction tickers var prevPending, prevQueued, prevStales int report := time.NewTicker(statsReportInterval) defer report.Stop() - // Track chain events. When a chain events occurs (new chain canon block) - // we need to know the new state. The new state will help us determine - // the nonces in the managed state + evict := time.NewTicker(evictionInterval) + defer evict.Stop() + + journal := time.NewTicker(pool.config.Rejournal) + defer journal.Stop() + + // Keep waiting for and reacting to the various events for { select { // Handle any events fired by the system @@ -253,6 +279,31 @@ func (pool *TxPool) eventLoop() { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) prevPending, prevQueued, prevStales = pending, queued, stales } + + // Handle inactive account transaction eviction + case <-evict.C: + pool.mu.Lock() + for addr := range pool.queue { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed + if time.Since(pool.beats[addr]) > pool.config.Lifetime { + for _, tx := range pool.queue[addr].Flatten() { + pool.removeTx(tx.Hash()) + } + } + } + pool.mu.Unlock() + + // Handle local transaction journal rotation + case <-journal.C: + if pool.journal != nil { + if err := pool.journal.rotate(pool.local()); err != nil { + log.Warn("Failed to rotate local tx journal", "err", err) + } + } } } } @@ -284,9 +335,11 @@ func (pool *TxPool) resetState() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { pool.events.Unsubscribe() - close(pool.quit) pool.wg.Wait() + if pool.journal != nil { + pool.journal.close() + } log.Info("Transaction pool stopped") } @@ -373,6 +426,22 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } +// local retrieves all currently known local transactions, groupped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) local() map[common.Address]types.Transactions { + txs := make(map[common.Address]types.Transactions) + for addr := range pool.locals.accounts { + if pending := pool.pending[addr]; pending != nil { + txs[addr] = append(txs[addr], pending.Flatten()...) + } + if queued := pool.queue[addr]; queued != nil { + txs[addr] = append(txs[addr], queued.Flatten()...) + } + } + return txs +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { @@ -473,18 +542,22 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { } pool.all[tx.Hash()] = tx pool.priced.Put(tx) + pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue and potentially mark local + // New transaction isn't replacing a pending one, push into queue replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } + // Mark local addresses and journal local transactions if local { pool.locals.add(from) } + pool.journalTx(from, tx) + log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } @@ -515,6 +588,18 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er return old != nil, nil } +// journalTx adds the specified transaction to the local disk journal if it is +// deemed to have been sent from a local account. +func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { + // Only journal if it's enabled and the transaction is local + if pool.journal == nil || !pool.locals.contains(from) { + return + } + if err := pool.journal.insert(tx); err != nil { + log.Warn("Failed to journal local transaction", "err", err) + } +} + // promoteTx adds a transaction to the pending (processable) list of transactions. // // Note, this method assumes the pool lock is held! @@ -910,39 +995,6 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { } } -// expirationLoop is a loop that periodically iterates over all accounts with -// queued transactions and drop all that have been inactive for a prolonged amount -// of time. -func (pool *TxPool) expirationLoop() { - defer pool.wg.Done() - - evict := time.NewTicker(evictionInterval) - defer evict.Stop() - - for { - select { - case <-evict.C: - pool.mu.Lock() - for addr := range pool.queue { - // Skip local transactions from the eviction mechanism - if pool.locals.contains(addr) { - continue - } - // Any non-locals old enough should be removed - if time.Since(pool.beats[addr]) > pool.config.Lifetime { - for _, tx := range pool.queue[addr].Flatten() { - pool.removeTx(tx.Hash()) - } - } - } - pool.mu.Unlock() - - case <-pool.quit: - return - } - } -} - // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.Address @@ -955,7 +1007,7 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// accountSet is simply a set of addresses to check for existance, and a signer +// accountSet is simply a set of addresses to check for existence, and a signer // capable of deriving addresses from transactions. type accountSet struct { accounts map[common.Address]struct{} |