aboutsummaryrefslogtreecommitdiffstats
path: root/miner/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go159
1 files changed, 92 insertions, 67 deletions
diff --git a/miner/worker.go b/miner/worker.go
index dab192c24..bf24970f5 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -41,6 +41,14 @@ import (
const (
resultQueueSize = 10
miningLogAtDepth = 5
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+ // chainSideChanSize is the size of channel listening to ChainSideEvent.
+ chainSideChanSize = 10
)
// Agent can register themself with the worker
@@ -63,7 +71,6 @@ type Work struct {
family *set.Set // family set (used for checking uncle invalidity)
uncles *set.Set // uncle set
tcount int // tx count in cycle
- failedTxs types.Transactions
Block *types.Block // the new block
@@ -87,9 +94,14 @@ type worker struct {
mu sync.Mutex
// update loop
- mux *event.TypeMux
- events *event.TypeMuxSubscription
- wg sync.WaitGroup
+ mux *event.TypeMux
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
+ chainHeadCh chan core.ChainHeadEvent
+ chainHeadSub event.Subscription
+ chainSideCh chan core.ChainSideEvent
+ chainSideSub event.Subscription
+ wg sync.WaitGroup
agents map[Agent]struct{}
recv chan *Result
@@ -113,8 +125,6 @@ type worker struct {
// atomic status counters
mining int32
atWork int32
-
- fullValidation bool
}
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
@@ -123,6 +133,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine: engine,
eth: eth,
mux: mux,
+ txCh: make(chan core.TxPreEvent, txChanSize),
+ chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+ chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
@@ -131,9 +144,12 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
coinbase: coinbase,
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
- fullValidation: false,
}
- worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
+ // Subscribe TxPreEvent for tx pool
+ worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
+ // Subscribe events for blockchain
+ worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
+ worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
go worker.update()
go worker.wait()
@@ -225,27 +241,43 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
- for event := range self.events.Chan() {
+ defer self.txSub.Unsubscribe()
+ defer self.chainHeadSub.Unsubscribe()
+ defer self.chainSideSub.Unsubscribe()
+
+ for {
// A real event arrived, process interesting content
- switch ev := event.Data.(type) {
- case core.ChainHeadEvent:
+ select {
+ // Handle ChainHeadEvent
+ case <-self.chainHeadCh:
self.commitNewWork()
- case core.ChainSideEvent:
+
+ // Handle ChainSideEvent
+ case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
- case core.TxPreEvent:
+
+ // Handle TxPreEvent
+ case ev := <-self.txCh:
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
-
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
- txset := types.NewTransactionsByPriceAndNonce(txs)
+ txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.currentMu.Unlock()
}
+
+ // System stopped
+ case <-self.txSub.Err():
+ return
+ case <-self.chainHeadSub.Err():
+ return
+ case <-self.chainSideSub.Err():
+ return
}
}
}
@@ -262,53 +294,38 @@ func (self *worker) wait() {
block := result.Block
work := result.Work
- if self.fullValidation {
- if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil {
- log.Error("Mined invalid block", "err", err)
- continue
- }
- go self.mux.Post(core.NewMinedBlockEvent{Block: block})
- } else {
- work.state.CommitTo(self.chainDb, self.config.IsEIP158(block.Number()))
- stat, err := self.chain.WriteBlock(block)
- if err != nil {
- log.Error("Failed writing block to chain", "err", err)
- continue
- }
- // update block hash since it is now available and not when the receipt/log of individual transactions were created
- for _, r := range work.receipts {
- for _, l := range r.Logs {
- l.BlockHash = block.Hash()
- }
- }
- for _, log := range work.state.Logs() {
- log.BlockHash = block.Hash()
+ // Update the block hash in all logs since it is now available and not when the
+ // receipt/log of individual transactions were created.
+ for _, r := range work.receipts {
+ for _, l := range r.Logs {
+ l.BlockHash = block.Hash()
}
-
- // check if canon block and write transactions
- if stat == core.CanonStatTy {
- // This puts transactions in a extra db for rpc
- core.WriteTxLookupEntries(self.chainDb, block)
- // Write map map bloom filters
- core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts)
- // implicit by posting ChainHeadEvent
- mustCommitNewWork = false
- }
-
- // broadcast before waiting for validation
- go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
- self.mux.Post(core.NewMinedBlockEvent{Block: block})
- self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
-
- if stat == core.CanonStatTy {
- self.mux.Post(core.ChainHeadEvent{Block: block})
- self.mux.Post(logs)
- }
- if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
- log.Warn("Failed writing block receipts", "err", err)
- }
- }(block, work.state.Logs(), work.receipts)
}
+ for _, log := range work.state.Logs() {
+ log.BlockHash = block.Hash()
+ }
+ stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
+ if err != nil {
+ log.Error("Failed writing block to chain", "err", err)
+ continue
+ }
+ // check if canon block and write transactions
+ if stat == core.CanonStatTy {
+ // implicit by posting ChainHeadEvent
+ mustCommitNewWork = false
+ }
+ // Broadcast the block and announce chain insertion event
+ self.mux.Post(core.NewMinedBlockEvent{Block: block})
+ var (
+ events []interface{}
+ logs = work.state.Logs()
+ )
+ events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+ if stat == core.CanonStatTy {
+ events = append(events, core.ChainHeadEvent{Block: block})
+ }
+ self.chain.PostChainEvents(events, logs)
+
// Insert the block into the set of pending ones to wait for confirmations
self.unconfirmed.Insert(block.NumberU64(), block.Hash())
@@ -432,11 +449,9 @@ func (self *worker) commitNewWork() {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
- txs := types.NewTransactionsByPriceAndNonce(pending)
+ txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
- self.eth.TxPool().RemoveBatch(work.failedTxs)
-
// compute uncles for the new block.
var (
uncles []*types.Header
@@ -521,6 +536,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
+ case core.ErrNonceTooLow:
+ // New head notification data race between the transaction pool and miner, shift
+ log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
+ txs.Shift()
+
+ case core.ErrNonceTooHigh:
+ // Reorg notification data race between the transaction pool and miner, skip account =
+ log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
+ txs.Pop()
+
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
@@ -528,10 +553,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
txs.Shift()
default:
- // Pop the current failed transaction without shifting in the next from the account
- log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err)
- env.failedTxs = append(env.failedTxs, tx)
- txs.Pop()
+ // Strange error, discard the transaction and get the next in line (note, the
+ // nonce-too-high clause will prevent us from executing in vain).
+ log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
+ txs.Shift()
}
}