diff options
Diffstat (limited to 'miner/worker.go')
-rw-r--r-- | miner/worker.go | 159 |
1 files changed, 92 insertions, 67 deletions
diff --git a/miner/worker.go b/miner/worker.go index dab192c24..bf24970f5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -41,6 +41,14 @@ import ( const ( resultQueueSize = 10 miningLogAtDepth = 5 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // chainSideChanSize is the size of channel listening to ChainSideEvent. + chainSideChanSize = 10 ) // Agent can register themself with the worker @@ -63,7 +71,6 @@ type Work struct { family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set tcount int // tx count in cycle - failedTxs types.Transactions Block *types.Block // the new block @@ -87,9 +94,14 @@ type worker struct { mu sync.Mutex // update loop - mux *event.TypeMux - events *event.TypeMuxSubscription - wg sync.WaitGroup + mux *event.TypeMux + txCh chan core.TxPreEvent + txSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + chainSideCh chan core.ChainSideEvent + chainSideSub event.Subscription + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -113,8 +125,6 @@ type worker struct { // atomic status counters mining int32 atWork int32 - - fullValidation bool } func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { @@ -123,6 +133,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, + txCh: make(chan core.TxPreEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -131,9 +144,12 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), - fullValidation: false, } - worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + // Subscribe TxPreEvent for tx pool + worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) go worker.update() go worker.wait() @@ -225,27 +241,43 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - for event := range self.events.Chan() { + defer self.txSub.Unsubscribe() + defer self.chainHeadSub.Unsubscribe() + defer self.chainSideSub.Unsubscribe() + + for { // A real event arrived, process interesting content - switch ev := event.Data.(type) { - case core.ChainHeadEvent: + select { + // Handle ChainHeadEvent + case <-self.chainHeadCh: self.commitNewWork() - case core.ChainSideEvent: + + // Handle ChainSideEvent + case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - case core.TxPreEvent: + + // Handle TxPreEvent + case ev := <-self.txCh: // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} - txset := types.NewTransactionsByPriceAndNonce(txs) + txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.currentMu.Unlock() } + + // System stopped + case <-self.txSub.Err(): + return + case <-self.chainHeadSub.Err(): + return + case <-self.chainSideSub.Err(): + return } } } @@ -262,53 +294,38 @@ func (self *worker) wait() { block := result.Block work := result.Work - if self.fullValidation { - if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil { - log.Error("Mined invalid block", "err", err) - continue - } - go self.mux.Post(core.NewMinedBlockEvent{Block: block}) - } else { - work.state.CommitTo(self.chainDb, self.config.IsEIP158(block.Number())) - stat, err := self.chain.WriteBlock(block) - if err != nil { - log.Error("Failed writing block to chain", "err", err) - continue - } - // update block hash since it is now available and not when the receipt/log of individual transactions were created - for _, r := range work.receipts { - for _, l := range r.Logs { - l.BlockHash = block.Hash() - } - } - for _, log := range work.state.Logs() { - log.BlockHash = block.Hash() + // Update the block hash in all logs since it is now available and not when the + // receipt/log of individual transactions were created. + for _, r := range work.receipts { + for _, l := range r.Logs { + l.BlockHash = block.Hash() } - - // check if canon block and write transactions - if stat == core.CanonStatTy { - // This puts transactions in a extra db for rpc - core.WriteTxLookupEntries(self.chainDb, block) - // Write map map bloom filters - core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts) - // implicit by posting ChainHeadEvent - mustCommitNewWork = false - } - - // broadcast before waiting for validation - go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { - self.mux.Post(core.NewMinedBlockEvent{Block: block}) - self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - - if stat == core.CanonStatTy { - self.mux.Post(core.ChainHeadEvent{Block: block}) - self.mux.Post(logs) - } - if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil { - log.Warn("Failed writing block receipts", "err", err) - } - }(block, work.state.Logs(), work.receipts) } + for _, log := range work.state.Logs() { + log.BlockHash = block.Hash() + } + stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) + if err != nil { + log.Error("Failed writing block to chain", "err", err) + continue + } + // check if canon block and write transactions + if stat == core.CanonStatTy { + // implicit by posting ChainHeadEvent + mustCommitNewWork = false + } + // Broadcast the block and announce chain insertion event + self.mux.Post(core.NewMinedBlockEvent{Block: block}) + var ( + events []interface{} + logs = work.state.Logs() + ) + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + if stat == core.CanonStatTy { + events = append(events, core.ChainHeadEvent{Block: block}) + } + self.chain.PostChainEvents(events, logs) + // Insert the block into the set of pending ones to wait for confirmations self.unconfirmed.Insert(block.NumberU64(), block.Hash()) @@ -432,11 +449,9 @@ func (self *worker) commitNewWork() { log.Error("Failed to fetch pending transactions", "err", err) return } - txs := types.NewTransactionsByPriceAndNonce(pending) + txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) work.commitTransactions(self.mux, txs, self.chain, self.coinbase) - self.eth.TxPool().RemoveBatch(work.failedTxs) - // compute uncles for the new block. var ( uncles []*types.Header @@ -521,6 +536,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB log.Trace("Gas limit exceeded for current block", "sender", from) txs.Pop() + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + case nil: // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) @@ -528,10 +553,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB txs.Shift() default: - // Pop the current failed transaction without shifting in the next from the account - log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err) - env.failedTxs = append(env.failedTxs, tx) - txs.Pop() + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() } } |