diff options
Diffstat (limited to 'miner/worker.go')
-rw-r--r-- | miner/worker.go | 721 |
1 files changed, 398 insertions, 323 deletions
diff --git a/miner/worker.go b/miner/worker.go index ae695f019..81a63c29a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -32,16 +32,14 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) const ( - resultQueueSize = 10 - miningLogAtDepth = 5 - + // resultQueueSize is the size of channel listening to sealing result. + resultQueueSize = 10 // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 @@ -49,17 +47,10 @@ const ( chainHeadChanSize = 10 // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 + miningLogAtDepth = 5 ) -// Agent can register themselves with the worker -type Agent interface { - AssignTask(*Package) - DeliverTo(chan<- *Package) - Start() - Stop() -} - -// Env is the workers current environment and holds all of the current state information. +// Env is the worker's current environment and holds all of the current state information. type Env struct { config *params.ChainConfig signer types.Signer @@ -74,25 +65,124 @@ type Env struct { header *types.Header txs []*types.Transaction receipts []*types.Receipt +} - createdAt time.Time +func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { + snap := env.state.Snapshot() + + receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}) + if err != nil { + env.state.RevertToSnapshot(snap) + return err, nil + } + env.txs = append(env.txs, tx) + env.receipts = append(env.receipts, receipt) + + return nil, receipt.Logs } -// Package contains all information for consensus engine sealing and result submitting. -type Package struct { - Receipts []*types.Receipt - State *state.StateDB - Block *types.Block +func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + } + + var coalescedLogs []*types.Log + + for { + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !env.config.IsEIP155(env.header.Number) { + log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block) + + txs.Pop() + continue + } + // Start executing the transaction + env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) + + err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) + switch err { + case core.ErrGasLimitReached: + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() + + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + txs.Shift() + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() + } + } + + if len(coalescedLogs) > 0 || env.tcount > 0 { + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + go func(logs []*types.Log, tcount int) { + if len(logs) > 0 { + mux.Post(core.PendingLogsEvent{Logs: logs}) + } + if tcount > 0 { + mux.Post(core.PendingStateEvent{}) + } + }(cpy, env.tcount) + } } -// worker is the main object which takes care of applying messages to the new state +// task contains all information for consensus engine sealing and result submitting. +type task struct { + receipts []*types.Receipt + state *state.StateDB + block *types.Block + createdAt time.Time +} + +// worker is the main object which takes care of submitting new work to consensus engine +// and gathering the sealing result. type worker struct { config *params.ChainConfig engine consensus.Engine + eth Backend + chain *core.BlockChain - mu sync.Mutex - - // update loop + // Subscriptions mux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription @@ -101,31 +191,30 @@ type worker struct { chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - agents map[Agent]struct{} - recv chan *Package + // Channels + newWork chan struct{} + taskCh chan *task + resultCh chan *task + exitCh chan struct{} - eth Backend - chain *core.BlockChain - proc core.Validator - chainDb ethdb.Database + current *Env // An environment for current running cycle. + possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. + unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. + mu sync.RWMutex // The lock used to protect the coinbase and extra fields coinbase common.Address extra []byte - currentMu sync.Mutex - current *Env - - snapshotMu sync.RWMutex + snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot snapshotBlock *types.Block snapshotState *state.StateDB - uncleMu sync.Mutex - possibleUncles map[common.Hash]*types.Block - - unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations - // atomic status counters running int32 // The indicator whether the consensus engine is running or not. + + // Test hooks + newTaskHook func(*task) // Method to call upon receiving a new sealing task + fullTaskInterval func() // Method to call before pushing the full sealing task } func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { @@ -134,220 +223,274 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, engine: engine, eth: eth, mux: mux, - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - chainDb: eth.ChainDb(), - recv: make(chan *Package, resultQueueSize), chain: eth.BlockChain(), - proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWork: make(chan struct{}, 1), + taskCh: make(chan *task), + resultCh: make(chan *task, resultQueueSize), + exitCh: make(chan struct{}), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) - go worker.update() - go worker.wait() - worker.commitNewWork() + go worker.mainLoop() + go worker.resultLoop() + go worker.taskLoop() + // Submit first work to initialize pending state. + worker.newWork <- struct{}{} return worker } -func (self *worker) setEtherbase(addr common.Address) { - self.mu.Lock() - defer self.mu.Unlock() - self.coinbase = addr +// setEtherbase sets the etherbase used to initialize the block coinbase field. +func (w *worker) setEtherbase(addr common.Address) { + w.mu.Lock() + defer w.mu.Unlock() + w.coinbase = addr } -func (self *worker) setExtra(extra []byte) { - self.mu.Lock() - defer self.mu.Unlock() - self.extra = extra +// setExtra sets the content used to initialize the block extra field. +func (w *worker) setExtra(extra []byte) { + w.mu.Lock() + defer w.mu.Unlock() + w.extra = extra } -func (self *worker) pending() (*types.Block, *state.StateDB) { +// pending returns the pending state and corresponding block. +func (w *worker) pending() (*types.Block, *state.StateDB) { // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + if w.snapshotState == nil { + return nil, nil + } + return w.snapshotBlock, w.snapshotState.Copy() } -func (self *worker) pendingBlock() *types.Block { +// pendingBlock returns pending block. +func (w *worker) pendingBlock() *types.Block { // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + return w.snapshotBlock } -func (self *worker) start() { - self.mu.Lock() - defer self.mu.Unlock() - atomic.StoreInt32(&self.running, 1) - for agent := range self.agents { - agent.Start() - } +// start sets the running status as 1 and triggers new work submitting. +func (w *worker) start() { + atomic.StoreInt32(&w.running, 1) + w.newWork <- struct{}{} } -func (self *worker) stop() { - self.mu.Lock() - defer self.mu.Unlock() - - atomic.StoreInt32(&self.running, 0) - for agent := range self.agents { - agent.Stop() - } +// stop sets the running status as 0. +func (w *worker) stop() { + atomic.StoreInt32(&w.running, 0) } -func (self *worker) isRunning() bool { - return atomic.LoadInt32(&self.running) == 1 +// isRunning returns an indicator whether worker is running or not. +func (w *worker) isRunning() bool { + return atomic.LoadInt32(&w.running) == 1 } -func (self *worker) register(agent Agent) { - self.mu.Lock() - defer self.mu.Unlock() - self.agents[agent] = struct{}{} - agent.DeliverTo(self.recv) - if self.isRunning() { - agent.Start() +// close terminates all background threads maintained by the worker and cleans up buffered channels. +// Note the worker does not support being closed multiple times. +func (w *worker) close() { + close(w.exitCh) + // Clean up buffered channels + for empty := false; !empty; { + select { + case <-w.resultCh: + default: + empty = true + } } } -func (self *worker) unregister(agent Agent) { - self.mu.Lock() - defer self.mu.Unlock() - delete(self.agents, agent) - agent.Stop() -} - -func (self *worker) update() { - defer self.txsSub.Unsubscribe() - defer self.chainHeadSub.Unsubscribe() - defer self.chainSideSub.Unsubscribe() +// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. +func (w *worker) mainLoop() { + defer w.txsSub.Unsubscribe() + defer w.chainHeadSub.Unsubscribe() + defer w.chainSideSub.Unsubscribe() for { - // A real event arrived, process interesting content select { - // Handle ChainHeadEvent - case <-self.chainHeadCh: - self.commitNewWork() - - // Handle ChainSideEvent - case ev := <-self.chainSideCh: - self.uncleMu.Lock() - self.possibleUncles[ev.Block.Hash()] = ev.Block - self.uncleMu.Unlock() - - // Handle NewTxsEvent - case ev := <-self.txsCh: + case <-w.newWork: + // Submit a work when the worker is created or started. + w.commitNewWork() + + case <-w.chainHeadCh: + // Resubmit a work for new cycle once worker receives chain head event. + w.commitNewWork() + + case ev := <-w.chainSideCh: + // Add side block to possible uncle block set. + w.possibleUncles[ev.Block.Hash()] = ev.Block + + case ev := <-w.txsCh: // Apply transactions to the pending state if we're not mining. // // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if !self.isRunning() { - self.currentMu.Lock() + if !w.isRunning() && w.current != nil { + w.mu.Lock() + coinbase := w.coinbase + w.mu.Unlock() + txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { - acc, _ := types.Sender(self.current.signer, tx) + acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) - self.updateSnapshot() - self.currentMu.Unlock() + txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs) + w.current.commitTransactions(w.mux, txset, w.chain, coinbase) + w.updateSnapshot() } else { // If we're mining, but nothing is being processed, wake on new transactions - if self.config.Clique != nil && self.config.Clique.Period == 0 { - self.commitNewWork() + if w.config.Clique != nil && w.config.Clique.Period == 0 { + w.commitNewWork() } } // System stopped - case <-self.txsSub.Err(): + case <-w.exitCh: return - case <-self.chainHeadSub.Err(): + case <-w.txsSub.Err(): return - case <-self.chainSideSub.Err(): + case <-w.chainHeadSub.Err(): + return + case <-w.chainSideSub.Err(): return } } } -func (self *worker) wait() { +// seal pushes a sealing task to consensus engine and submits the result. +func (w *worker) seal(t *task, stop <-chan struct{}) { + var ( + err error + res *task + ) + + if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { + log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), + "elapsed", common.PrettyDuration(time.Since(t.createdAt))) + res = t + } else { + if err != nil { + log.Warn("Block sealing failed", "err", err) + } + res = nil + } + select { + case w.resultCh <- res: + case <-w.exitCh: + } +} + +// taskLoop is a standalone goroutine to fetch sealing task from the generator and +// push them to consensus engine. +func (w *worker) taskLoop() { + var stopCh chan struct{} + + // interrupt aborts the in-flight sealing task. + interrupt := func() { + if stopCh != nil { + close(stopCh) + stopCh = nil + } + } for { - for result := range self.recv { + select { + case task := <-w.taskCh: + if w.newTaskHook != nil { + w.newTaskHook(task) + } + interrupt() + stopCh = make(chan struct{}) + go w.seal(task, stopCh) + case <-w.exitCh: + interrupt() + return + } + } +} +// resultLoop is a standalone goroutine to handle sealing result submitting +// and flush relative data to the database. +func (w *worker) resultLoop() { + for { + select { + case result := <-w.resultCh: if result == nil { continue } - block := result.Block + block := result.block // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. - for _, r := range result.Receipts { + for _, r := range result.receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } - for _, log := range result.State.Logs() { + for _, log := range result.state.Logs() { log.BlockHash = block.Hash() } - self.currentMu.Lock() - stat, err := self.chain.WriteBlockWithState(block, result.Receipts, result.State) - self.currentMu.Unlock() + // Commit block and state to database. + stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } // Broadcast the block and announce chain insertion event - self.mux.Post(core.NewMinedBlockEvent{Block: block}) + w.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} - logs = result.State.Logs() + logs = result.state.Logs() ) - events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - if stat == core.CanonStatTy { + switch stat { + case core.CanonStatTy: + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) events = append(events, core.ChainHeadEvent{Block: block}) + case core.SideStatTy: + events = append(events, core.ChainSideEvent{Block: block}) } - self.chain.PostChainEvents(events, logs) + w.chain.PostChainEvents(events, logs) - // Insert the block into the set of pending ones to wait for confirmations - self.unconfirmed.Insert(block.NumberU64(), block.Hash()) - } - } -} + // Insert the block into the set of pending ones to resultLoop for confirmations + w.unconfirmed.Insert(block.NumberU64(), block.Hash()) -// push sends a new work task to currently live miner agents. -func (self *worker) push(p *Package) { - for agent := range self.agents { - agent.AssignTask(p) + case <-w.exitCh: + return + } } } // makeCurrent creates a new environment for the current cycle. -func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error { - state, err := self.chain.StateAt(parent.Root()) +func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { + state, err := w.chain.StateAt(parent.Root()) if err != nil { return err } env := &Env{ - config: self.config, - signer: types.NewEIP155Signer(self.config.ChainID), + config: w.config, + signer: types.NewEIP155Signer(w.config.ChainID), state: state, ancestors: mapset.NewSet(), family: mapset.NewSet(), uncles: mapset.NewSet(), header: header, - createdAt: time.Now(), } // when 08 is processed ancestors contain 07 (quick block) - for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) { + for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { for _, uncle := range ancestor.Uncles() { env.family.Add(uncle.Hash()) } @@ -357,20 +500,63 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error // Keep track of transactions which return errors so they can be removed env.tcount = 0 - self.current = env + w.current = env return nil } -func (self *worker) commitNewWork() { - self.mu.Lock() - defer self.mu.Unlock() - self.uncleMu.Lock() - defer self.uncleMu.Unlock() - self.currentMu.Lock() - defer self.currentMu.Unlock() +// commitUncle adds the given block to uncle block set, returns error if failed to add. +func (w *worker) commitUncle(env *Env, uncle *types.Header) error { + hash := uncle.Hash() + if env.uncles.Contains(hash) { + return fmt.Errorf("uncle not unique") + } + if !env.ancestors.Contains(uncle.ParentHash) { + return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4]) + } + if env.family.Contains(hash) { + return fmt.Errorf("uncle already in family (%x)", hash) + } + env.uncles.Add(uncle.Hash()) + return nil +} + +// updateSnapshot updates pending snapshot block and state. +// Note this function assumes the current variable is thread safe. +func (w *worker) updateSnapshot() { + w.snapshotMu.Lock() + defer w.snapshotMu.Unlock() + + var uncles []*types.Header + w.current.uncles.Each(func(item interface{}) bool { + hash, ok := item.(common.Hash) + if !ok { + return false + } + uncle, exist := w.possibleUncles[hash] + if !exist { + return false + } + uncles = append(uncles, uncle.Header()) + return true + }) + + w.snapshotBlock = types.NewBlock( + w.current.header, + w.current.txs, + uncles, + w.current.receipts, + ) + + w.snapshotState = w.current.state.Copy() +} + +// commitNewWork generates several new sealing tasks based on the parent block. +func (w *worker) commitNewWork() { + w.mu.RLock() + defer w.mu.RUnlock() tstart := time.Now() - parent := self.chain.CurrentBlock() + parent := w.chain.CurrentBlock() tstamp := tstart.Unix() if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 { @@ -388,28 +574,28 @@ func (self *worker) commitNewWork() { ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent), - Extra: self.extra, + Extra: w.extra, Time: big.NewInt(tstamp), } // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) - if self.isRunning() { - if self.coinbase == (common.Address{}) { + if w.isRunning() { + if w.coinbase == (common.Address{}) { log.Error("Refusing to mine without etherbase") return } - header.Coinbase = self.coinbase + header.Coinbase = w.coinbase } - if err := self.engine.Prepare(self.chain, header); err != nil { + if err := w.engine.Prepare(w.chain, header); err != nil { log.Error("Failed to prepare header for mining", "err", err) return } // If we are care about TheDAO hard-fork check whether to override the extra-data or not - if daoBlock := self.config.DAOForkBlock; daoBlock != nil { + if daoBlock := w.config.DAOForkBlock; daoBlock != nil { // Check whether the block is among the fork extra-override range limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { // Depending whether we support or oppose the fork, override differently - if self.config.DAOForkSupport { + if w.config.DAOForkSupport { header.Extra = common.CopyBytes(params.DAOForkBlockExtra) } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data @@ -417,14 +603,14 @@ func (self *worker) commitNewWork() { } } // Could potentially happen if starting to mine in an odd state. - err := self.makeCurrent(parent, header) + err := w.makeCurrent(parent, header) if err != nil { log.Error("Failed to create mining context", "err", err) return } // Create the current work task and check any fork transitions needed - env := self.current - if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { + env := w.current + if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(env.state) } @@ -433,11 +619,11 @@ func (self *worker) commitNewWork() { uncles []*types.Header badUncles []common.Hash ) - for hash, uncle := range self.possibleUncles { + for hash, uncle := range w.possibleUncles { if len(uncles) == 2 { break } - if err := self.commitUncle(env, uncle.Header()); err != nil { + if err := w.commitUncle(env, uncle.Header()); err != nil { log.Trace("Bad uncle found and will be removed", "hash", hash) log.Trace(fmt.Sprint(uncle)) @@ -448,184 +634,73 @@ func (self *worker) commitNewWork() { } } for _, hash := range badUncles { - delete(self.possibleUncles, hash) + delete(w.possibleUncles, hash) } var ( - emptyBlock *types.Block - fullBlock *types.Block + emptyBlock, fullBlock *types.Block + emptyState, fullState *state.StateDB ) // Create an empty block based on temporary copied state for sealing in advance without waiting block // execution finished. - emptyState := env.state.Copy() - if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil { + emptyState = env.state.Copy() + if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil { log.Error("Failed to finalize block for temporary sealing", "err", err) } else { // Push empty work in advance without applying pending transaction. // The reason is transactions execution can cost a lot and sealer need to // take advantage of this part time. - if self.isRunning() { - log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) - self.push(&Package{nil, emptyState, emptyBlock}) + if w.isRunning() { + select { + case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}: + log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) + case <-w.exitCh: + log.Info("Worker has exited") + return + } } } // Fill the block with all available pending transactions. - pending, err := self.eth.TxPool().Pending() + pending, err := w.eth.TxPool().Pending() if err != nil { log.Error("Failed to fetch pending transactions", "err", err) return } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - env.commitTransactions(self.mux, txs, self.chain, self.coinbase) + // Short circuit if there is no available pending transactions + if len(pending) == 0 { + w.updateSnapshot() + return + } + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) + env.commitTransactions(w.mux, txs, w.chain, w.coinbase) // Create the full block to seal with the consensus engine - if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil { + fullState = env.state.Copy() + if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } - // We only care about logging if we're actually mining. - if self.isRunning() { - log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) - self.unconfirmed.Shift(fullBlock.NumberU64() - 1) - self.push(&Package{env.receipts, env.state, fullBlock}) - } - self.updateSnapshot() -} - -func (self *worker) commitUncle(env *Env, uncle *types.Header) error { - hash := uncle.Hash() - if env.uncles.Contains(hash) { - return fmt.Errorf("uncle not unique") - } - if !env.ancestors.Contains(uncle.ParentHash) { - return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4]) - } - if env.family.Contains(hash) { - return fmt.Errorf("uncle already in family (%x)", hash) - } - env.uncles.Add(uncle.Hash()) - return nil -} - -func (self *worker) updateSnapshot() { - self.snapshotMu.Lock() - defer self.snapshotMu.Unlock() - - var uncles []*types.Header - self.current.uncles.Each(func(item interface{}) bool { - if header, ok := item.(*types.Header); ok { - uncles = append(uncles, header) - return true - } - return false - }) - - self.snapshotBlock = types.NewBlock( - self.current.header, - self.current.txs, - uncles, - self.current.receipts, - ) - self.snapshotState = self.current.state.Copy() -} - -func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { - if env.gasPool == nil { - env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + // Deep copy receipts here to avoid interaction between different tasks. + cpy := make([]*types.Receipt, len(env.receipts)) + for i, l := range env.receipts { + cpy[i] = new(types.Receipt) + *cpy[i] = *l } - - var coalescedLogs []*types.Log - - for { - // If we don't have enough gas for any further transactions then we're done - if env.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(env.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !env.config.IsEIP155(env.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block) - - txs.Pop() - continue - } - // Start executing the transaction - env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) - - err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) - switch err { - case core.ErrGasLimitReached: - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - - case core.ErrNonceTooLow: - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case core.ErrNonceTooHigh: - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case nil: - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - env.tcount++ - txs.Shift() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() + // We only care about logging if we're actually mining. + if w.isRunning() { + if w.fullTaskInterval != nil { + w.fullTaskInterval() } - } - if len(coalescedLogs) > 0 || env.tcount > 0 { - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l + select { + case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}: + w.unconfirmed.Shift(fullBlock.NumberU64() - 1) + log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + case <-w.exitCh: + log.Info("Worker has exited") } - go func(logs []*types.Log, tcount int) { - if len(logs) > 0 { - mux.Post(core.PendingLogsEvent{Logs: logs}) - } - if tcount > 0 { - mux.Post(core.PendingStateEvent{}) - } - }(cpy, env.tcount) - } -} - -func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { - snap := env.state.Snapshot() - - receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}) - if err != nil { - env.state.RevertToSnapshot(snap) - return err, nil } - env.txs = append(env.txs, tx) - env.receipts = append(env.receipts, receipt) - - return nil, receipt.Logs + w.updateSnapshot() } |