aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/chain_manager.go97
-rw-r--r--core/chain_manager_test.go3
-rw-r--r--core/transaction_pool.go4
3 files changed, 44 insertions, 60 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 808ccd201..c89aae3f0 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -11,10 +11,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@@ -23,7 +21,6 @@ import (
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/hashicorp/golang-lru"
- "github.com/syndtr/goleveldb/leveldb"
)
var (
@@ -40,6 +37,7 @@ const (
blockCacheLimit = 256
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
+ checkpointLimit = 200
)
// CalcDifficulty is the difficulty adjustment algorithm. It returns
@@ -101,6 +99,7 @@ type ChainManager struct {
chainmu sync.RWMutex
tsmu sync.RWMutex
+ checkpoint int // checkpoint counts towards the new checkpoint
td *big.Int
currentBlock *types.Block
lastBlockHash common.Hash
@@ -109,9 +108,8 @@ type ChainManager struct {
transState *state.StateDB
txState *state.ManagedState
- cache *lru.Cache // cache is the LRU caching
- futureBlocks *lru.Cache // future blocks are blocks added for later processing
- pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db
+ cache *lru.Cache // cache is the LRU caching
+ futureBlocks *lru.Cache // future blocks are blocks added for later processing
quit chan struct{}
// procInterrupt must be atomically called
@@ -240,15 +238,40 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb
}
+func (bc *ChainManager) recover() bool {
+ data, _ := bc.blockDb.Get([]byte("checkpoint"))
+ if len(data) != 0 {
+ block := bc.GetBlock(common.BytesToHash(data))
+ if block != nil {
+ err := bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+ if err != nil {
+ glog.Fatalln("db write err:", err)
+ }
+
+ bc.currentBlock = block
+ bc.lastBlockHash = block.Hash()
+ return true
+ }
+ }
+ return false
+}
+
func (bc *ChainManager) setLastState() {
data, _ := bc.blockDb.Get([]byte("LastBlock"))
if len(data) != 0 {
block := bc.GetBlock(common.BytesToHash(data))
if block != nil {
+ bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes())
+
bc.currentBlock = block
bc.lastBlockHash = block.Hash()
} else {
- glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync")
+ glog.Infof("LastBlock (%x) not found. Recovering...\n", data)
+ if bc.recover() {
+ glog.Infof("Recover successful")
+ } else {
+ glog.Fatalf("Recover failed. Please report")
+ }
}
} else {
bc.Reset()
@@ -357,6 +380,16 @@ func (bc *ChainManager) insert(block *types.Block) {
glog.Fatal("db write fail:", err)
}
+ bc.checkpoint++
+ if bc.checkpoint > checkpointLimit {
+ err = bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes())
+ if err != nil {
+ glog.Fatal("db write fail:", err)
+ }
+
+ bc.checkpoint = 0
+ }
+
bc.currentBlock = block
bc.lastBlockHash = block.Hash()
}
@@ -387,12 +420,6 @@ func (bc *ChainManager) HasBlock(hash common.Hash) bool {
return true
}
- if bc.pendingBlocks != nil {
- if _, exist := bc.pendingBlocks.Get(hash); exist {
- return true
- }
- }
-
data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...))
return len(data) != 0
}
@@ -423,12 +450,6 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
return block.(*types.Block)
}
- if self.pendingBlocks != nil {
- if block, _ := self.pendingBlocks.Get(hash); block != nil {
- return block.(*types.Block)
- }
- }
-
data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
if len(data) == 0 {
return nil
@@ -519,31 +540,6 @@ func (self *ChainManager) procFutureBlocks() {
}
}
-func (self *ChainManager) enqueueForWrite(block *types.Block) {
- self.pendingBlocks.Add(block.Hash(), block)
-}
-
-func (self *ChainManager) flushQueuedBlocks() {
- db, batchWrite := self.blockDb.(*ethdb.LDBDatabase)
- batch := new(leveldb.Batch)
- for _, key := range self.pendingBlocks.Keys() {
- b, _ := self.pendingBlocks.Get(key)
- block := b.(*types.Block)
-
- enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
- key := append(blockHashPre, block.Hash().Bytes()...)
- if batchWrite {
- batch.Put(key, rle.Compress(enc))
- } else {
- self.blockDb.Put(key, enc)
- }
- }
-
- if batchWrite {
- db.LDB().Write(batch, nil)
- }
-}
-
type writeStatus byte
const (
@@ -586,15 +582,7 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr
status = sideStatTy
}
- if queued {
- // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
- // not in the canonical chain.
- self.mu.Lock()
- self.enqueueForWrite(block)
- self.mu.Unlock()
- } else {
- self.write(block)
- }
+ self.write(block)
// Delete from future blocks
self.futureBlocks.Remove(block.Hash())
@@ -610,8 +598,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock()
defer self.chainmu.Unlock()
- self.pendingBlocks, _ = lru.New(len(chain))
-
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
@@ -629,7 +615,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// Start the parallel nonce verifier.
go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
defer close(nonceQuit)
- defer self.flushQueuedBlocks()
txcount := 0
for i, block := range chain {
diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go
index 8b3ea9e85..6869bc746 100644
--- a/core/chain_manager_test.go
+++ b/core/chain_manager_test.go
@@ -109,8 +109,7 @@ func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) {
bman.bc.mu.Lock()
{
- bman.bc.enqueueForWrite(block)
- //bman.bc.write(block)
+ bman.bc.write(block)
}
bman.bc.mu.Unlock()
}
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 6a7012c65..ac9027755 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -65,7 +65,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
gasLimit: gasLimitFn,
minGasPrice: new(big.Int),
pendingState: state.ManageState(currentStateFn()),
- events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}),
+ events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}),
}
go pool.eventLoop()
@@ -80,7 +80,7 @@ func (pool *TxPool) eventLoop() {
pool.mu.Lock()
switch ev := ev.(type) {
- case ChainEvent:
+ case ChainHeadEvent:
pool.resetState()
case GasPriceChanged:
pool.minGasPrice = ev.Price