aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-04-13 15:32:28 +0800
committerGitHub <noreply@github.com>2017-04-13 15:32:28 +0800
commitd5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1 (patch)
treeed0e0307d30878371d3cd390c64f3a0a703ef515
parent5e29f4be935ff227bbf07a0c6e80e8809f5e0202 (diff)
parentb27589517a5bf7f88944603b201fa1f7c0b33abf (diff)
downloadgo-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar.gz
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar.bz2
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar.lz
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar.xz
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.tar.zst
go-tangerine-d5d910e8b68f6c6b29ca85f5a9fa1b72b2cc08c1.zip
Merge pull request #14323 from fjl/ethash-verify-headers-fix
consensus/ethash: simplify concurrency in VerifyHeaders
-rw-r--r--consensus/ethash/consensus.go129
-rw-r--r--core/dao_test.go8
2 files changed, 52 insertions, 85 deletions
diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go
index 4b6e779d5..4f1ab8702 100644
--- a/consensus/ethash/consensus.go
+++ b/consensus/ethash/consensus.go
@@ -22,7 +22,6 @@ import (
"fmt"
"math/big"
"runtime"
- "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -46,7 +45,6 @@ var (
// codebase, inherently breaking if the engine is swapped out. Please put common
// error types into the consensus package.
var (
- errInvalidChain = errors.New("invalid header chain")
errLargeBlockTime = errors.New("timestamp too big")
errZeroBlockTime = errors.New("timestamp equals parent's")
errTooManyUncles = errors.New("too many uncles")
@@ -90,111 +88,80 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He
// a results channel to retrieve the async verifications.
func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
// If we're running a full engine faking, accept any input as valid
- if ethash.fakeFull {
+ if ethash.fakeFull || len(headers) == 0 {
abort, results := make(chan struct{}), make(chan error, len(headers))
for i := 0; i < len(headers); i++ {
results <- nil
}
return abort, results
}
+
// Spawn as many workers as allowed threads
workers := runtime.GOMAXPROCS(0)
if len(headers) < workers {
workers = len(headers)
}
- // Create a task channel and spawn the verifiers
- type result struct {
- index int
- err error
- }
- inputs := make(chan int, workers)
- outputs := make(chan result, len(headers))
- var badblock uint64
+ // Create a task channel and spawn the verifiers
+ var (
+ inputs = make(chan int)
+ done = make(chan int, workers)
+ errors = make([]error, len(headers))
+ abort = make(chan struct{})
+ )
for i := 0; i < workers; i++ {
go func() {
for index := range inputs {
- // If we've found a bad block already before this, stop validating
- if bad := atomic.LoadUint64(&badblock); bad != 0 && bad <= headers[index].Number.Uint64() {
- outputs <- result{index: index, err: errInvalidChain}
- continue
- }
- // We need to look up the first parent
- var parent *types.Header
- if index == 0 {
- parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
- } else if headers[index-1].Hash() == headers[index].ParentHash {
- parent = headers[index-1]
- }
- // Ensure the validation is useful and execute it
- var failure error
- switch {
- case chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()-1) != nil:
- outputs <- result{index: index, err: nil}
- case parent == nil:
- failure = consensus.ErrUnknownAncestor
- outputs <- result{index: index, err: failure}
- default:
- failure = ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
- outputs <- result{index: index, err: failure}
- }
- // If a validation failure occurred, mark subsequent blocks invalid
- if failure != nil {
- number := headers[index].Number.Uint64()
- if prev := atomic.LoadUint64(&badblock); prev == 0 || prev > number {
- // This two step atomic op isn't thread-safe in that `badblock` might end
- // up slightly higher than the block number of the first failure (if many
- // workers try to write at the same time), but it's fine as we're mostly
- // interested to avoid large useless work, we don't care about 1-2 extra
- // runs. Doing "full thread safety" would involve mutexes, which would be
- // a noticeable sync overhead on the fast spinning worker routines.
- atomic.StoreUint64(&badblock, number)
- }
- }
+ errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index)
+ done <- index
}
}()
}
- // Feed item indices to the workers until done, sorting and feeding the results to the caller
- dones := make([]bool, len(headers))
- errors := make([]error, len(headers))
-
- abort := make(chan struct{})
- returns := make(chan error, len(headers))
+ errorsOut := make(chan error, len(headers))
go func() {
defer close(inputs)
-
- input, output := 0, 0
- for i := 0; i < len(headers)*2; i++ {
- var res result
-
- // If there are tasks left, push to workers
- if input < len(headers) {
- select {
- case inputs <- input:
- input++
- continue
- case <-abort:
- return
- case res = <-outputs:
+ var (
+ in, out = 0, 0
+ checked = make([]bool, len(headers))
+ inputs = inputs
+ )
+ for {
+ select {
+ case inputs <- in:
+ if in++; in == len(headers) {
+ // Reached end of headers. Stop sending to workers.
+ inputs = nil
}
- } else {
- // Otherwise keep waiting for results
- select {
- case <-abort:
- return
- case res = <-outputs:
+ case index := <-done:
+ for checked[index] = true; checked[out]; out++ {
+ errorsOut <- errors[out]
+ if out == len(headers)-1 {
+ return
+ }
}
- }
- // A result arrived, save and propagate if next
- dones[res.index], errors[res.index] = true, res.err
- for output < len(headers) && dones[output] {
- returns <- errors[output]
- output++
+ case <-abort:
+ return
}
}
}()
- return abort, returns
+ return abort, errorsOut
+}
+
+func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error {
+ var parent *types.Header
+ if index == 0 {
+ parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
+ } else if headers[index-1].Hash() == headers[index].ParentHash {
+ parent = headers[index-1]
+ }
+ if parent == nil {
+ return consensus.ErrUnknownAncestor
+ }
+ if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
+ return nil // known block
+ }
+ return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
}
// VerifyUncles verifies that the given block's uncles conform to the consensus
diff --git a/core/dao_test.go b/core/dao_test.go
index cb6e54f8f..bc9f3f394 100644
--- a/core/dao_test.go
+++ b/core/dao_test.go
@@ -62,7 +62,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db)
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
- blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1))
+ blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
}
@@ -83,7 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db)
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
- blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1))
+ blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
}
@@ -105,7 +105,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db)
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
- blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1))
+ blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
}
@@ -121,7 +121,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db)
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
- blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1))
+ blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
}