aboutsummaryrefslogtreecommitdiffstats
path: root/miner
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2018-08-15 19:09:17 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-15 19:09:17 +0800
commit040aa2bb101e5e602308b24812bfbf2451b21174 (patch)
treebf0ab668ce5a2c3b7f9c2f239a8df1672893e0af /miner
parente598ae5c010a9bc445fb3f106db9ae712e1a326e (diff)
downloadgo-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar.gz
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar.bz2
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar.lz
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar.xz
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.tar.zst
go-tangerine-040aa2bb101e5e602308b24812bfbf2451b21174.zip
miner: streaming uncle blocks (#17320)
* miner: stream uncle block * miner: polish
Diffstat (limited to 'miner')
-rw-r--r--miner/worker.go99
-rw-r--r--miner/worker_test.go76
2 files changed, 125 insertions, 50 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 81a63c29a..fae480c84 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -213,8 +213,9 @@ type worker struct {
running int32 // The indicator whether the consensus engine is running or not.
// Test hooks
- newTaskHook func(*task) // Method to call upon receiving a new sealing task
- fullTaskInterval func() // Method to call before pushing the full sealing task
+ newTaskHook func(*task) // Method to call upon receiving a new sealing task
+ skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
+ fullTaskHook func() // Method to call before pushing the full sealing task
}
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
@@ -329,8 +330,32 @@ func (w *worker) mainLoop() {
w.commitNewWork()
case ev := <-w.chainSideCh:
+ if _, exist := w.possibleUncles[ev.Block.Hash()]; exist {
+ continue
+ }
// Add side block to possible uncle block set.
w.possibleUncles[ev.Block.Hash()] = ev.Block
+ // If our mining block contains less than 2 uncle blocks,
+ // add the new uncle block if valid and regenerate a mining block.
+ if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
+ start := time.Now()
+ if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
+ var uncles []*types.Header
+ w.current.uncles.Each(func(item interface{}) bool {
+ hash, ok := item.(common.Hash)
+ if !ok {
+ return false
+ }
+ uncle, exist := w.possibleUncles[hash]
+ if !exist {
+ return false
+ }
+ uncles = append(uncles, uncle.Header())
+ return true
+ })
+ w.commit(uncles, nil, true, start)
+ }
+ }
case ev := <-w.txsCh:
// Apply transactions to the pending state if we're not mining.
@@ -378,6 +403,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) {
res *task
)
+ if w.skipSealHook != nil && w.skipSealHook(t) {
+ return
+ }
+
if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil {
log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(),
"elapsed", common.PrettyDuration(time.Since(t.createdAt)))
@@ -637,30 +666,9 @@ func (w *worker) commitNewWork() {
delete(w.possibleUncles, hash)
}
- var (
- emptyBlock, fullBlock *types.Block
- emptyState, fullState *state.StateDB
- )
-
// Create an empty block based on temporary copied state for sealing in advance without waiting block
// execution finished.
- emptyState = env.state.Copy()
- if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil {
- log.Error("Failed to finalize block for temporary sealing", "err", err)
- } else {
- // Push empty work in advance without applying pending transaction.
- // The reason is transactions execution can cost a lot and sealer need to
- // take advantage of this part time.
- if w.isRunning() {
- select {
- case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}:
- log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
- case <-w.exitCh:
- log.Info("Worker has exited")
- return
- }
- }
- }
+ w.commit(uncles, nil, false, tstart)
// Fill the block with all available pending transactions.
pending, err := w.eth.TxPool().Pending()
@@ -676,31 +684,38 @@ func (w *worker) commitNewWork() {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending)
env.commitTransactions(w.mux, txs, w.chain, w.coinbase)
- // Create the full block to seal with the consensus engine
- fullState = env.state.Copy()
- if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil {
- log.Error("Failed to finalize block for sealing", "err", err)
- return
- }
+ w.commit(uncles, w.fullTaskHook, true, tstart)
+}
+
+// commit runs any post-transaction state modifications, assembles the final block
+// and commits new work if consensus engine is running.
+func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
- cpy := make([]*types.Receipt, len(env.receipts))
- for i, l := range env.receipts {
- cpy[i] = new(types.Receipt)
- *cpy[i] = *l
+ receipts := make([]*types.Receipt, len(w.current.receipts))
+ for i, l := range w.current.receipts {
+ receipts[i] = new(types.Receipt)
+ *receipts[i] = *l
+ }
+ s := w.current.state.Copy()
+ block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
+ if err != nil {
+ return err
}
- // We only care about logging if we're actually mining.
if w.isRunning() {
- if w.fullTaskInterval != nil {
- w.fullTaskInterval()
+ if interval != nil {
+ interval()
}
-
select {
- case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}:
- w.unconfirmed.Shift(fullBlock.NumberU64() - 1)
- log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
+ case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
+ w.unconfirmed.Shift(block.NumberU64() - 1)
+ log.Info("Commit new mining work", "number", block.Number(), "txs", w.current.tcount, "uncles", len(uncles),
+ "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
}
}
- w.updateSnapshot()
+ if update {
+ w.updateSnapshot()
+ }
+ return nil
}
diff --git a/miner/worker_test.go b/miner/worker_test.go
index 5823a608e..408c47e3b 100644
--- a/miner/worker_test.go
+++ b/miner/worker_test.go
@@ -59,7 +59,7 @@ func init() {
ethashChainConfig = params.TestChainConfig
cliqueChainConfig = params.TestChainConfig
cliqueChainConfig.Clique = &params.CliqueConfig{
- Period: 1,
+ Period: 10,
Epoch: 30000,
}
tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
@@ -74,6 +74,7 @@ type testWorkerBackend struct {
txPool *core.TxPool
chain *core.BlockChain
testTxFeed event.Feed
+ uncleBlock *types.Block
}
func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) *testWorkerBackend {
@@ -93,15 +94,19 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
default:
t.Fatal("unexpect consensus engine type")
}
- gspec.MustCommit(db)
+ genesis := gspec.MustCommit(db)
chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)
+ blocks, _ := core.GenerateChain(chainConfig, genesis, engine, db, 1, func(i int, gen *core.BlockGen) {
+ gen.SetCoinbase(acc1Addr)
+ })
return &testWorkerBackend{
- db: db,
- chain: chain,
- txPool: txpool,
+ db: db,
+ chain: chain,
+ txPool: txpool,
+ uncleBlock: blocks[0],
}
}
@@ -188,7 +193,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
taskCh <- struct{}{}
}
}
- w.fullTaskInterval = func() {
+ w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
@@ -202,11 +207,66 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
w.start()
for i := 0; i < 2; i += 1 {
- to := time.NewTimer(time.Second)
select {
case <-taskCh:
- case <-to.C:
+ case <-time.NewTimer(time.Second).C:
t.Error("new task timeout")
}
}
}
+
+func TestStreamUncleBlock(t *testing.T) {
+ ethash := ethash.NewFaker()
+ defer ethash.Close()
+
+ w, b := newTestWorker(t, ethashChainConfig, ethash)
+ defer w.close()
+
+ var taskCh = make(chan struct{})
+
+ taskIndex := 0
+ w.newTaskHook = func(task *task) {
+ if task.block.NumberU64() == 1 {
+ if taskIndex == 2 {
+ has := task.block.Header().UncleHash
+ want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()})
+ if has != want {
+ t.Errorf("uncle hash mismatch, has %s, want %s", has.Hex(), want.Hex())
+ }
+ }
+ taskCh <- struct{}{}
+ taskIndex += 1
+ }
+ }
+ w.skipSealHook = func(task *task) bool {
+ return true
+ }
+ w.fullTaskHook = func() {
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ // Ensure worker has finished initialization
+ for {
+ b := w.pendingBlock()
+ if b != nil && b.NumberU64() == 1 {
+ break
+ }
+ }
+
+ w.start()
+ // Ignore the first two works
+ for i := 0; i < 2; i += 1 {
+ select {
+ case <-taskCh:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("new task timeout")
+ }
+ }
+ b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}})
+
+ select {
+ case <-taskCh:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("new task timeout")
+ }
+}