aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_manager.go')
-rw-r--r--core/chain_manager.go186
1 files changed, 132 insertions, 54 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 2e8eb927d..4fb7506e5 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math/big"
+ "runtime"
"sync"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -83,8 +85,9 @@ type ChainManager struct {
eventMux *event.TypeMux
genesisBlock *types.Block
// Last known total difficulty
- mu sync.RWMutex
- tsmu sync.RWMutex
+ mu sync.RWMutex
+ chainmu sync.RWMutex
+ tsmu sync.RWMutex
td *big.Int
currentBlock *types.Block
@@ -99,9 +102,11 @@ type ChainManager struct {
quit chan struct{}
wg sync.WaitGroup
+
+ pow pow.PoW
}
-func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
+func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
@@ -109,6 +114,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
eventMux: mux,
quit: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
+ pow: pow,
}
bc.setLastState()
@@ -342,7 +348,7 @@ func (self *ChainManager) Export(w io.Writer) error {
last := self.currentBlock.NumberU64()
- for nr := uint64(0); nr <= last; nr++ {
+ for nr := uint64(1); nr <= last; nr++ {
block := self.GetBlockByNumber(nr)
if block == nil {
return fmt.Errorf("export failed on #%d: not found", nr)
@@ -406,9 +412,11 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) (
}
func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
- if block := self.cache.Get(hash); block != nil {
- return block
- }
+ /*
+ if block := self.cache.Get(hash); block != nil {
+ return block
+ }
+ */
data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
if len(data) == 0 {
@@ -518,6 +526,9 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.wg.Add(1)
defer self.wg.Done()
+ self.chainmu.Lock()
+ defer self.chainmu.Unlock()
+
// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
var (
queue = make([]interface{}, len(chain))
@@ -525,10 +536,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
stats struct{ queued, processed, ignored int }
tstart = time.Now()
)
+
+ // check the nonce in parallel to the block processing
+ // this speeds catching up significantly
+ nonceErrCh := make(chan error)
+ go func() {
+ nonceErrCh <- verifyNonces(self.pow, chain)
+ }()
+
for i, block := range chain {
if block == nil {
continue
}
+
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@@ -542,7 +562,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
continue
}
- block.Td = new(big.Int)
// Do not penelise on future block. We'll need a block queue eventually that will queue
// future block for future use
if err == BlockFutureErr {
@@ -559,68 +578,67 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
continue
}
- h := block.Header()
-
- glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
- glog.V(logger.Error).Infoln(err)
- glog.V(logger.Debug).Infoln(block)
+ blockErr(block, err)
return i, err
}
- self.mu.Lock()
- {
- cblock := self.currentBlock
- // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
- // not in the canonical chain.
- self.write(block)
- // Compare the TD of the last known block in the canonical chain to make sure it's greater.
- // At this point it's possible that a different chain (fork) becomes the new canonical chain.
- if block.Td.Cmp(self.td) > 0 {
- // chain fork
- if block.ParentHash() != cblock.Hash() {
- // during split we merge two different chains and create the new canonical chain
- self.merge(cblock, block)
-
- queue[i] = ChainSplitEvent{block, logs}
- queueEvent.splitCount++
- }
-
- self.setTotalDifficulty(block.Td)
- self.insert(block)
+ cblock := self.currentBlock
+ // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
+ // not in the canonical chain.
+ self.write(block)
+ // Compare the TD of the last known block in the canonical chain to make sure it's greater.
+ // At this point it's possible that a different chain (fork) becomes the new canonical chain.
+ if block.Td.Cmp(self.td) > 0 {
+ // chain fork
+ if block.ParentHash() != cblock.Hash() {
+ // during split we merge two different chains and create the new canonical chain
+ self.merge(cblock, block)
+
+ queue[i] = ChainSplitEvent{block, logs}
+ queueEvent.splitCount++
+ }
- jsonlogger.LogJson(&logger.EthChainNewHead{
- BlockHash: block.Hash().Hex(),
- BlockNumber: block.Number(),
- ChainHeadHash: cblock.Hash().Hex(),
- BlockPrevHash: block.ParentHash().Hex(),
- })
+ self.setTotalDifficulty(block.Td)
+ self.insert(block)
- self.setTransState(state.New(block.Root(), self.stateDb))
- self.txState.SetState(state.New(block.Root(), self.stateDb))
+ jsonlogger.LogJson(&logger.EthChainNewHead{
+ BlockHash: block.Hash().Hex(),
+ BlockNumber: block.Number(),
+ ChainHeadHash: cblock.Hash().Hex(),
+ BlockPrevHash: block.ParentHash().Hex(),
+ })
- queue[i] = ChainEvent{block, block.Hash(), logs}
- queueEvent.canonicalCount++
+ self.setTransState(state.New(block.Root(), self.stateDb))
+ self.txState.SetState(state.New(block.Root(), self.stateDb))
- if glog.V(logger.Debug) {
- glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
- }
- } else {
- if glog.V(logger.Detail) {
- glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
- }
+ queue[i] = ChainEvent{block, block.Hash(), logs}
+ queueEvent.canonicalCount++
- queue[i] = ChainSideEvent{block, logs}
- queueEvent.sideCount++
+ if glog.V(logger.Debug) {
+ glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
+ }
+ } else {
+ if glog.V(logger.Detail) {
+ glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
}
- self.futureBlocks.Delete(block.Hash())
+
+ queue[i] = ChainSideEvent{block, logs}
+ queueEvent.sideCount++
}
- self.mu.Unlock()
+ self.futureBlocks.Delete(block.Hash())
stats.processed++
}
+ // check and wait for the nonce error channel and
+ // make sure no nonce error was thrown in the process
+ err := <-nonceErrCh
+ if err != nil {
+ return 0, err
+ }
+
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
tend := time.Since(tstart)
start, end := chain[0], chain[len(chain)-1]
@@ -719,3 +737,63 @@ out:
}
}
}
+
+func blockErr(block *types.Block, err error) {
+ h := block.Header()
+ glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
+ glog.V(logger.Error).Infoln(err)
+ glog.V(logger.Debug).Infoln(block)
+}
+
+// verifyNonces verifies nonces of the given blocks in parallel and returns
+// an error if one of the blocks nonce verifications failed.
+func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
+ // Spawn a few workers. They listen for blocks on the in channel
+ // and send results on done. The workers will exit in the
+ // background when in is closed.
+ var (
+ in = make(chan *types.Block)
+ done = make(chan error, runtime.GOMAXPROCS(0))
+ )
+ defer close(in)
+ for i := 0; i < cap(done); i++ {
+ go verifyNonce(pow, in, done)
+ }
+ // Feed blocks to the workers, aborting at the first invalid nonce.
+ var (
+ running, i int
+ block *types.Block
+ sendin = in
+ )
+ for i < len(blocks) || running > 0 {
+ if i == len(blocks) {
+ // Disable sending to in.
+ sendin = nil
+ } else {
+ block = blocks[i]
+ i++
+ }
+ select {
+ case sendin <- block:
+ running++
+ case err := <-done:
+ running--
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// verifyNonce is a worker for the verifyNonces method. It will run until
+// in is closed.
+func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
+ for block := range in {
+ if !pow.Verify(block) {
+ done <- ValidationError("Block(#%v) nonce is invalid (= %x)", block.Number(), block.Nonce)
+ } else {
+ done <- nil
+ }
+ }
+}