aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-06-19 22:21:20 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2015-06-30 00:51:49 +0800
commit4460dc9d1a7e12f1f0583538c7df852cd64c4fed (patch)
tree5c37059a9ce8ad8dd00407afa9de966481658704
parent2a5a55efafa39a21a5c70afcc85dfb00c6002028 (diff)
downloadgo-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar.gz
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar.bz2
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar.lz
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar.xz
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.tar.zst
go-tangerine-4460dc9d1a7e12f1f0583538c7df852cd64c4fed.zip
core: added LRU caching and added batch writing when LDB is used
-rw-r--r--core/chain_manager.go102
1 files changed, 81 insertions, 21 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 070b6b1d0..a0f945020 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -11,15 +11,18 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
- "github.com/rcrowley/go-metrics"
+ "github.com/golang/groupcache/lru"
+ "github.com/syndtr/goleveldb/leveldb"
)
var (
@@ -105,8 +108,9 @@ type ChainManager struct {
transState *state.StateDB
txState *state.ManagedState
- cache *BlockCache
- futureBlocks *BlockCache
+ cache *lru.Cache // cache is the LRU caching
+ futureBlocks *BlockCache // future blocks are blocks added for later processing
+ nextBlocks *BlockCache // next blocks is used during large inserts
quit chan struct{}
// procInterrupt must be atomically called
@@ -123,7 +127,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
genesisBlock: GenesisBlock(42, stateDb),
eventMux: mux,
quit: make(chan struct{}),
- cache: NewBlockCache(blockCacheLimit),
+ cache: lru.New(blockCacheLimit),
pow: pow,
}
// Check the genesis block given to the chain manager. If the genesis block mismatches block number 0
@@ -168,7 +172,7 @@ func (bc *ChainManager) SetHead(head *types.Block) {
bc.removeBlock(block)
}
- bc.cache = NewBlockCache(blockCacheLimit)
+ bc.cache = lru.New(blockCacheLimit)
bc.currentBlock = head
bc.makeCache()
@@ -257,11 +261,13 @@ func (bc *ChainManager) setLastState() {
func (bc *ChainManager) makeCache() {
if bc.cache == nil {
- bc.cache = NewBlockCache(blockCacheLimit)
+ bc.cache = lru.New(blockCacheLimit)
}
// load in last `blockCacheLimit` - 1 blocks. Last block is the current.
- for _, block := range bc.GetBlocksFromHash(bc.currentBlock.Hash(), blockCacheLimit) {
- bc.cache.Push(block)
+ ancestors := bc.GetAncestors(bc.currentBlock, blockCacheLimit-1)
+ ancestors = append(ancestors, bc.currentBlock)
+ for _, block := range ancestors {
+ bc.cache.Add(block.Hash(), block)
}
}
@@ -274,7 +280,7 @@ func (bc *ChainManager) Reset() {
}
if bc.cache == nil {
- bc.cache = NewBlockCache(blockCacheLimit)
+ bc.cache = lru.New(blockCacheLimit)
}
// Prepare the genesis block
@@ -359,15 +365,20 @@ func (bc *ChainManager) insert(block *types.Block) {
}
func (bc *ChainManager) write(block *types.Block) {
- enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
- key := append(blockHashPre, block.Hash().Bytes()...)
- err := bc.blockDb.Put(key, enc)
- if err != nil {
- glog.Fatal("db write fail:", err)
- }
+ tstart := time.Now()
+
+ go func() {
+ enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+ key := append(blockHashPre, block.Hash().Bytes()...)
+ err := bc.blockDb.Put(key, enc)
+ if err != nil {
+ glog.Fatal("db write fail:", err)
+ }
+ }()
- // Push block to cache
- bc.cache.Push(block)
+ if glog.V(logger.Debug) {
+ glog.Infof("wrote block #%v %s. Took %v\n", block.Number(), common.PP(block.Hash().Bytes()), time.Since(tstart))
+ }
}
// Accessors
@@ -377,6 +388,16 @@ func (bc *ChainManager) Genesis() *types.Block {
// Block fetching methods
func (bc *ChainManager) HasBlock(hash common.Hash) bool {
+ if bc.cache.Contains(hash) {
+ return true
+ }
+
+ if bc.nextBlocks != nil {
+ if block := bc.nextBlocks.Get(hash); block != nil {
+ return true
+ }
+ }
+
data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...))
return len(data) != 0
}
@@ -403,11 +424,15 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) (
}
func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
- /*
- if block := self.cache.Get(hash); block != nil {
+ if block, ok := self.cache.Get(hash); ok {
+ return block.(*types.Block)
+ }
+
+ if self.nextBlocks != nil {
+ if block := self.nextBlocks.Get(hash); block != nil {
return block
}
- */
+ }
data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
if len(data) == 0 {
@@ -418,6 +443,10 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err)
return nil
}
+
+ // Add the block to the cache
+ self.cache.Add(hash, (*types.Block)(&block))
+
return (*types.Block)(&block)
}
@@ -494,6 +523,31 @@ func (self *ChainManager) procFutureBlocks() {
}
}
+func (self *ChainManager) enqueueForWrite(block *types.Block) {
+ self.nextBlocks.Push(block)
+}
+
+func (self *ChainManager) flushQueuedBlocks() {
+ db, batchWrite := self.blockDb.(*ethdb.LDBDatabase)
+ batch := new(leveldb.Batch)
+ self.nextBlocks.Each(func(i int, block *types.Block) {
+ enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+ key := append(blockHashPre, block.Hash().Bytes()...)
+ if batchWrite {
+ batch.Put(key, rle.Compress(enc))
+ } else {
+ self.blockDb.Put(key, enc)
+ }
+ })
+ if batchWrite {
+ db.LDB().Write(batch, nil)
+ }
+
+ // reset the next blocks cache
+ self.nextBlocks = nil
+
+}
+
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
@@ -503,6 +557,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock()
defer self.chainmu.Unlock()
+ self.nextBlocks = NewBlockCache(len(chain))
+
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
@@ -520,6 +576,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// Start the parallel nonce verifier.
go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
defer close(nonceQuit)
+ defer self.flushQueuedBlocks()
+
+ defer func() {
+ }()
txcount := 0
for i, block := range chain {
@@ -632,7 +692,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
}
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
- self.write(block)
+ self.enqueueForWrite(block)
// Delete from future blocks
self.futureBlocks.Delete(block.Hash())