diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/chain_manager.go | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 070b6b1d0..a0f945020 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -11,15 +11,18 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" - "github.com/rcrowley/go-metrics" + "github.com/golang/groupcache/lru" + "github.com/syndtr/goleveldb/leveldb" ) var ( @@ -105,8 +108,9 @@ type ChainManager struct { transState *state.StateDB txState *state.ManagedState - cache *BlockCache - futureBlocks *BlockCache + cache *lru.Cache // cache is the LRU caching + futureBlocks *BlockCache // future blocks are blocks added for later processing + nextBlocks *BlockCache // next blocks is used during large inserts quit chan struct{} // procInterrupt must be atomically called @@ -123,7 +127,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow genesisBlock: GenesisBlock(42, stateDb), eventMux: mux, quit: make(chan struct{}), - cache: NewBlockCache(blockCacheLimit), + cache: lru.New(blockCacheLimit), pow: pow, } // Check the genesis block given to the chain manager. If the genesis block mismatches block number 0 @@ -168,7 +172,7 @@ func (bc *ChainManager) SetHead(head *types.Block) { bc.removeBlock(block) } - bc.cache = NewBlockCache(blockCacheLimit) + bc.cache = lru.New(blockCacheLimit) bc.currentBlock = head bc.makeCache() @@ -257,11 +261,13 @@ func (bc *ChainManager) setLastState() { func (bc *ChainManager) makeCache() { if bc.cache == nil { - bc.cache = NewBlockCache(blockCacheLimit) + bc.cache = lru.New(blockCacheLimit) } // load in last `blockCacheLimit` - 1 blocks. Last block is the current. - for _, block := range bc.GetBlocksFromHash(bc.currentBlock.Hash(), blockCacheLimit) { - bc.cache.Push(block) + ancestors := bc.GetAncestors(bc.currentBlock, blockCacheLimit-1) + ancestors = append(ancestors, bc.currentBlock) + for _, block := range ancestors { + bc.cache.Add(block.Hash(), block) } } @@ -274,7 +280,7 @@ func (bc *ChainManager) Reset() { } if bc.cache == nil { - bc.cache = NewBlockCache(blockCacheLimit) + bc.cache = lru.New(blockCacheLimit) } // Prepare the genesis block @@ -359,15 +365,20 @@ func (bc *ChainManager) insert(block *types.Block) { } func (bc *ChainManager) write(block *types.Block) { - enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) - key := append(blockHashPre, block.Hash().Bytes()...) - err := bc.blockDb.Put(key, enc) - if err != nil { - glog.Fatal("db write fail:", err) - } + tstart := time.Now() + + go func() { + enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) + key := append(blockHashPre, block.Hash().Bytes()...) + err := bc.blockDb.Put(key, enc) + if err != nil { + glog.Fatal("db write fail:", err) + } + }() - // Push block to cache - bc.cache.Push(block) + if glog.V(logger.Debug) { + glog.Infof("wrote block #%v %s. Took %v\n", block.Number(), common.PP(block.Hash().Bytes()), time.Since(tstart)) + } } // Accessors @@ -377,6 +388,16 @@ func (bc *ChainManager) Genesis() *types.Block { // Block fetching methods func (bc *ChainManager) HasBlock(hash common.Hash) bool { + if bc.cache.Contains(hash) { + return true + } + + if bc.nextBlocks != nil { + if block := bc.nextBlocks.Get(hash); block != nil { + return true + } + } + data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...)) return len(data) != 0 } @@ -403,11 +424,15 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) ( } func (self *ChainManager) GetBlock(hash common.Hash) *types.Block { - /* - if block := self.cache.Get(hash); block != nil { + if block, ok := self.cache.Get(hash); ok { + return block.(*types.Block) + } + + if self.nextBlocks != nil { + if block := self.nextBlocks.Get(hash); block != nil { return block } - */ + } data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...)) if len(data) == 0 { @@ -418,6 +443,10 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block { glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err) return nil } + + // Add the block to the cache + self.cache.Add(hash, (*types.Block)(&block)) + return (*types.Block)(&block) } @@ -494,6 +523,31 @@ func (self *ChainManager) procFutureBlocks() { } } +func (self *ChainManager) enqueueForWrite(block *types.Block) { + self.nextBlocks.Push(block) +} + +func (self *ChainManager) flushQueuedBlocks() { + db, batchWrite := self.blockDb.(*ethdb.LDBDatabase) + batch := new(leveldb.Batch) + self.nextBlocks.Each(func(i int, block *types.Block) { + enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) + key := append(blockHashPre, block.Hash().Bytes()...) + if batchWrite { + batch.Put(key, rle.Compress(enc)) + } else { + self.blockDb.Put(key, enc) + } + }) + if batchWrite { + db.LDB().Write(batch, nil) + } + + // reset the next blocks cache + self.nextBlocks = nil + +} + // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned // it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { @@ -503,6 +557,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() + self.nextBlocks = NewBlockCache(len(chain)) + // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -520,6 +576,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Start the parallel nonce verifier. go verifyNonces(self.pow, chain, nonceQuit, nonceDone) defer close(nonceQuit) + defer self.flushQueuedBlocks() + + defer func() { + }() txcount := 0 for i, block := range chain { @@ -632,7 +692,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { } // Write block to database. Eventually we'll have to improve on this and throw away blocks that are // not in the canonical chain. - self.write(block) + self.enqueueForWrite(block) // Delete from future blocks self.futureBlocks.Delete(block.Hash()) |