diff options
-rw-r--r-- | core/chain_manager.go | 97 | ||||
-rw-r--r-- | core/chain_manager_test.go | 3 | ||||
-rw-r--r-- | core/transaction_pool.go | 4 |
3 files changed, 44 insertions, 60 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 808ccd201..c89aae3f0 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -11,10 +11,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -23,7 +21,6 @@ import ( "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" "github.com/hashicorp/golang-lru" - "github.com/syndtr/goleveldb/leveldb" ) var ( @@ -40,6 +37,7 @@ const ( blockCacheLimit = 256 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 + checkpointLimit = 200 ) // CalcDifficulty is the difficulty adjustment algorithm. It returns @@ -101,6 +99,7 @@ type ChainManager struct { chainmu sync.RWMutex tsmu sync.RWMutex + checkpoint int // checkpoint counts towards the new checkpoint td *big.Int currentBlock *types.Block lastBlockHash common.Hash @@ -109,9 +108,8 @@ type ChainManager struct { transState *state.StateDB txState *state.ManagedState - cache *lru.Cache // cache is the LRU caching - futureBlocks *lru.Cache // future blocks are blocks added for later processing - pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db + cache *lru.Cache // cache is the LRU caching + futureBlocks *lru.Cache // future blocks are blocks added for later processing quit chan struct{} // procInterrupt must be atomically called @@ -240,15 +238,40 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } +func (bc *ChainManager) recover() bool { + data, _ := bc.blockDb.Get([]byte("checkpoint")) + if len(data) != 0 { + block := bc.GetBlock(common.BytesToHash(data)) + if block != nil { + err := bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) + if err != nil { + glog.Fatalln("db write err:", err) + } + + bc.currentBlock = block + bc.lastBlockHash = block.Hash() + return true + } + } + return false +} + func (bc *ChainManager) setLastState() { data, _ := bc.blockDb.Get([]byte("LastBlock")) if len(data) != 0 { block := bc.GetBlock(common.BytesToHash(data)) if block != nil { + bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes()) + bc.currentBlock = block bc.lastBlockHash = block.Hash() } else { - glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync") + glog.Infof("LastBlock (%x) not found. Recovering...\n", data) + if bc.recover() { + glog.Infof("Recover successful") + } else { + glog.Fatalf("Recover failed. Please report") + } } } else { bc.Reset() @@ -357,6 +380,16 @@ func (bc *ChainManager) insert(block *types.Block) { glog.Fatal("db write fail:", err) } + bc.checkpoint++ + if bc.checkpoint > checkpointLimit { + err = bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes()) + if err != nil { + glog.Fatal("db write fail:", err) + } + + bc.checkpoint = 0 + } + bc.currentBlock = block bc.lastBlockHash = block.Hash() } @@ -387,12 +420,6 @@ func (bc *ChainManager) HasBlock(hash common.Hash) bool { return true } - if bc.pendingBlocks != nil { - if _, exist := bc.pendingBlocks.Get(hash); exist { - return true - } - } - data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...)) return len(data) != 0 } @@ -423,12 +450,6 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block { return block.(*types.Block) } - if self.pendingBlocks != nil { - if block, _ := self.pendingBlocks.Get(hash); block != nil { - return block.(*types.Block) - } - } - data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...)) if len(data) == 0 { return nil @@ -519,31 +540,6 @@ func (self *ChainManager) procFutureBlocks() { } } -func (self *ChainManager) enqueueForWrite(block *types.Block) { - self.pendingBlocks.Add(block.Hash(), block) -} - -func (self *ChainManager) flushQueuedBlocks() { - db, batchWrite := self.blockDb.(*ethdb.LDBDatabase) - batch := new(leveldb.Batch) - for _, key := range self.pendingBlocks.Keys() { - b, _ := self.pendingBlocks.Get(key) - block := b.(*types.Block) - - enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) - key := append(blockHashPre, block.Hash().Bytes()...) - if batchWrite { - batch.Put(key, rle.Compress(enc)) - } else { - self.blockDb.Put(key, enc) - } - } - - if batchWrite { - db.LDB().Write(batch, nil) - } -} - type writeStatus byte const ( @@ -586,15 +582,7 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr status = sideStatTy } - if queued { - // Write block to database. Eventually we'll have to improve on this and throw away blocks that are - // not in the canonical chain. - self.mu.Lock() - self.enqueueForWrite(block) - self.mu.Unlock() - } else { - self.write(block) - } + self.write(block) // Delete from future blocks self.futureBlocks.Remove(block.Hash()) @@ -610,8 +598,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - self.pendingBlocks, _ = lru.New(len(chain)) - // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -629,7 +615,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Start the parallel nonce verifier. go verifyNonces(self.pow, chain, nonceQuit, nonceDone) defer close(nonceQuit) - defer self.flushQueuedBlocks() txcount := 0 for i, block := range chain { diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 8b3ea9e85..6869bc746 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -109,8 +109,7 @@ func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) { bman.bc.mu.Lock() { - bman.bc.enqueueForWrite(block) - //bman.bc.write(block) + bman.bc.write(block) } bman.bc.mu.Unlock() } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 6a7012c65..ac9027755 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -65,7 +65,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( gasLimit: gasLimitFn, minGasPrice: new(big.Int), pendingState: state.ManageState(currentStateFn()), - events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}), + events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}), } go pool.eventLoop() @@ -80,7 +80,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Lock() switch ev := ev.(type) { - case ChainEvent: + case ChainHeadEvent: pool.resetState() case GasPriceChanged: pool.minGasPrice = ev.Price |