aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/blockchain.go455
-rw-r--r--core/blockchain_test.go258
-rw-r--r--core/genesis.go17
-rw-r--r--core/headerchain.go75
-rw-r--r--core/rawdb/accessors_chain.go198
-rw-r--r--core/rawdb/accessors_indexes.go8
-rw-r--r--core/rawdb/accessors_metadata.go12
-rw-r--r--core/rawdb/database.go311
-rw-r--r--core/rawdb/freezer.go382
-rw-r--r--core/rawdb/freezer_table.go561
-rw-r--r--core/rawdb/freezer_table_test.go609
-rw-r--r--core/rawdb/schema.go27
-rw-r--r--core/rawdb/table.go44
-rw-r--r--core/state/database.go2
-rw-r--r--core/state/sync.go2
15 files changed, 2753 insertions, 208 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 61b809319..7ab6806c2 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -63,6 +63,8 @@ var (
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
+
+ errInsertionInterrupted = errors.New("insertion is interrupted")
)
const (
@@ -91,7 +93,10 @@ const (
// - Version 6
// The following incompatible database changes were added:
// * Transaction lookup information stores the corresponding block number instead of block hash
- BlockChainVersion uint64 = 6
+ // - Version 7
+ // The following incompatible database changes were added:
+ // * Use freezer as the ancient database to maintain all ancient data
+ BlockChainVersion uint64 = 7
)
// CacheConfig contains the configuration values for the trie caching/pruning
@@ -138,7 +143,6 @@ type BlockChain struct {
chainmu sync.RWMutex // blockchain insertion lock
- checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
@@ -161,8 +165,9 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config
- badBlocks *lru.Cache // Bad block cache
- shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
+ badBlocks *lru.Cache // Bad block cache
+ shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
+ terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
}
// NewBlockChain returns a fully initialised block chain using information
@@ -213,9 +218,82 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
+ // Initialize the chain with ancient data if it isn't empty.
+ if bc.empty() {
+ if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
+ var (
+ start = time.Now()
+ logged time.Time
+ )
+ for i := uint64(0); i < frozen; i++ {
+ // Inject hash<->number mapping.
+ hash := rawdb.ReadCanonicalHash(bc.db, i)
+ if hash == (common.Hash{}) {
+ return nil, errors.New("broken ancient database")
+ }
+ rawdb.WriteHeaderNumber(bc.db, hash, i)
+
+ // Inject txlookup indexes.
+ block := rawdb.ReadBlock(bc.db, hash, i)
+ if block == nil {
+ return nil, errors.New("broken ancient database")
+ }
+ rawdb.WriteTxLookupEntries(bc.db, block)
+
+ // If we've spent too much time already, notify the user of what we're doing
+ if time.Since(logged) > 8*time.Second {
+ log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
+ logged = time.Now()
+ }
+ }
+ hash := rawdb.ReadCanonicalHash(bc.db, frozen-1)
+ rawdb.WriteHeadHeaderHash(bc.db, hash)
+ rawdb.WriteHeadFastBlockHash(bc.db, hash)
+
+ // The first thing the node will do is reconstruct the verification data for
+ // the head block (ethash cache or clique voting snapshot). Might as well do
+ // it in advance.
+ bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true)
+
+ log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
+ }
+ }
if err := bc.loadLastState(); err != nil {
return nil, err
}
+ if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
+ var (
+ needRewind bool
+ low uint64
+ )
+ // The head full block may be rolled back to a very low height due to
+ // blockchain repair. If the head full block is even lower than the ancient
+ // chain, truncate the ancient store.
+ fullBlock := bc.CurrentBlock()
+ if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
+ needRewind = true
+ low = fullBlock.NumberU64()
+ }
+ // In fast sync, it may happen that ancient data has been written to the
+ // ancient store, but the LastFastBlock has not been updated, truncate the
+ // extra data here.
+ fastBlock := bc.CurrentFastBlock()
+ if fastBlock != nil && fastBlock.NumberU64() < frozen-1 {
+ needRewind = true
+ if fastBlock.NumberU64() < low || low == 0 {
+ low = fastBlock.NumberU64()
+ }
+ }
+ if needRewind {
+ var hashes []common.Hash
+ previous := bc.CurrentHeader().Number.Uint64()
+ for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
+ hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
+ }
+ bc.Rollback(hashes)
+ log.Warn("Truncate ancient chain", "from", previous, "to", low)
+ }
+ }
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
@@ -243,6 +321,20 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}
+// empty returns an indicator whether the blockchain is empty.
+// Note, it's a special case that we connect a non-empty ancient
+// database with an empty node, so that we can plugin the ancient
+// into node seamlessly.
+func (bc *BlockChain) empty() bool {
+ genesis := bc.genesisBlock.Hash()
+ for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} {
+ if hash != genesis {
+ return false
+ }
+ }
+ return true
+}
+
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState() error {
@@ -267,6 +359,7 @@ func (bc *BlockChain) loadLastState() error {
if err := bc.repair(&currentBlock); err != nil {
return err
}
+ rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
@@ -312,12 +405,57 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
+ updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
+ // Rewind the block chain, ensuring we don't end up with a stateless head block
+ if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
+ newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ if newHeadBlock == nil {
+ newHeadBlock = bc.genesisBlock
+ } else {
+ if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
+ // Rewound state missing, rolled back to before pivot, reset to genesis
+ newHeadBlock = bc.genesisBlock
+ }
+ }
+ rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
+ bc.currentBlock.Store(newHeadBlock)
+ }
+
+ // Rewind the fast block in a simpleton way to the target head
+ if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
+ newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
+ // If either blocks reached nil, reset to the genesis state
+ if newHeadFastBlock == nil {
+ newHeadFastBlock = bc.genesisBlock
+ }
+ rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
+ bc.currentFastBlock.Store(newHeadFastBlock)
+ }
+ }
+
// Rewind the header chain, deleting all block bodies until then
- delFn := func(db ethdb.Writer, hash common.Hash, num uint64) {
- rawdb.DeleteBody(db, hash, num)
+ delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
+ // Ignore the error here since light client won't hit this path
+ frozen, _ := bc.db.Ancients()
+ if num+1 <= frozen {
+ // Truncate all relative data(header, total difficulty, body, receipt
+ // and canonical hash) from ancient store.
+ if err := bc.db.TruncateAncients(num + 1); err != nil {
+ log.Crit("Failed to truncate ancient data", "number", num, "err", err)
+ }
+
+ // Remove the hash <-> number mapping from the active store.
+ rawdb.DeleteHeaderNumber(db, hash)
+ } else {
+ // Remove relative body and receipts from the active store.
+ // The header, total difficulty and canonical hash will be
+ // removed in the hc.SetHead function.
+ rawdb.DeleteBody(db, hash, num)
+ rawdb.DeleteReceipts(db, hash, num)
+ }
+ // Todo(rjl493456442) txlookup, bloombits, etc
}
- bc.hc.SetHead(head, delFn)
- currentHeader := bc.hc.CurrentHeader()
+ bc.hc.SetHead(head, updateFn, delFn)
// Clear out any stale content from the caches
bc.bodyCache.Purge()
@@ -326,33 +464,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.blockCache.Purge()
bc.futureBlocks.Purge()
- // Rewind the block chain, ensuring we don't end up with a stateless head block
- if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
- bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
- }
- if currentBlock := bc.CurrentBlock(); currentBlock != nil {
- if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
- // Rewound state missing, rolled back to before pivot, reset to genesis
- bc.currentBlock.Store(bc.genesisBlock)
- }
- }
- // Rewind the fast block in a simpleton way to the target head
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
- bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
- }
- // If either blocks reached nil, reset to the genesis state
- if currentBlock := bc.CurrentBlock(); currentBlock == nil {
- bc.currentBlock.Store(bc.genesisBlock)
- }
- if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
- bc.currentFastBlock.Store(bc.genesisBlock)
- }
- currentBlock := bc.CurrentBlock()
- currentFastBlock := bc.CurrentFastBlock()
-
- rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
- rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash())
-
return bc.loadLastState()
}
@@ -780,96 +891,272 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
- bc.currentFastBlock.Store(newFastBlock)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
+ bc.currentFastBlock.Store(newFastBlock)
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
- bc.currentBlock.Store(newBlock)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
+ bc.currentBlock.Store(newBlock)
}
}
+ // Truncate ancient data which exceeds the current header.
+ //
+ // Notably, it can happen that system crashes without truncating the ancient data
+ // but the head indicator has been updated in the active store. Regarding this issue,
+ // system will self recovery by truncating the extra data during the setup phase.
+ if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
+ log.Crit("Truncate ancient store failed", "err", err)
+ }
+}
+
+// truncateAncient rewinds the blockchain to the specified header and deletes all
+// data in the ancient store that exceeds the specified header.
+func (bc *BlockChain) truncateAncient(head uint64) error {
+ frozen, err := bc.db.Ancients()
+ if err != nil {
+ return err
+ }
+ // Short circuit if there is no data to truncate in ancient store.
+ if frozen <= head+1 {
+ return nil
+ }
+ // Truncate all the data in the freezer beyond the specified head
+ if err := bc.db.TruncateAncients(head + 1); err != nil {
+ return err
+ }
+ // Clear out any stale content from the caches
+ bc.hc.headerCache.Purge()
+ bc.hc.tdCache.Purge()
+ bc.hc.numberCache.Purge()
+
+ // Clear out any stale content from the caches
+ bc.bodyCache.Purge()
+ bc.bodyRLPCache.Purge()
+ bc.receiptsCache.Purge()
+ bc.blockCache.Purge()
+ bc.futureBlocks.Purge()
+
+ log.Info("Rewind ancient data", "number", head)
+ return nil
}
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
-func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
bc.wg.Add(1)
defer bc.wg.Done()
+ var (
+ ancientBlocks, liveBlocks types.Blocks
+ ancientReceipts, liveReceipts []types.Receipts
+ )
// Do a sanity check that the provided chain is actually ordered and linked
- for i := 1; i < len(blockChain); i++ {
- if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
- log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
- "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
- return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
- blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
+ for i := 0; i < len(blockChain); i++ {
+ if i != 0 {
+ if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
+ log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
+ "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
+ return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
+ blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
+ }
+ }
+ if blockChain[i].NumberU64() <= ancientLimit {
+ ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i])
+ } else {
+ liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i])
}
}
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
- bytes = 0
- batch = bc.db.NewBatch()
+ size = 0
)
- for i, block := range blockChain {
- receipts := receiptChain[i]
- // Short circuit insertion if shutting down or processing failed
- if atomic.LoadInt32(&bc.procInterrupt) == 1 {
- return 0, nil
+ // updateHead updates the head fast sync block if the inserted blocks are better
+ // and returns a indicator whether the inserted blocks are canonical.
+ updateHead := func(head *types.Block) bool {
+ var isCanonical bool
+ bc.chainmu.Lock()
+ if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
+ currentFastBlock := bc.CurrentFastBlock()
+ if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
+ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
+ bc.currentFastBlock.Store(head)
+ isCanonical = true
+ }
+ }
+ bc.chainmu.Unlock()
+ return isCanonical
+ }
+ // writeAncient writes blockchain and corresponding receipt chain into ancient store.
+ //
+ // this function only accepts canonical chain data. All side chain will be reverted
+ // eventually.
+ writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ var (
+ previous = bc.CurrentFastBlock()
+ batch = bc.db.NewBatch()
+ )
+ // If any error occurs before updating the head or we are inserting a side chain,
+ // all the data written this time wll be rolled back.
+ defer func() {
+ if previous != nil {
+ if err := bc.truncateAncient(previous.NumberU64()); err != nil {
+ log.Crit("Truncate ancient store failed", "err", err)
+ }
+ }
+ }()
+ var deleted types.Blocks
+ for i, block := range blockChain {
+ // Short circuit insertion if shutting down or processing failed
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ return 0, errInsertionInterrupted
+ }
+ // Short circuit insertion if it is required(used in testing only)
+ if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
+ return i, errors.New("insertion is terminated for testing purpose")
+ }
+ // Short circuit if the owner header is unknown
+ if !bc.HasHeader(block.Hash(), block.NumberU64()) {
+ return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ }
+ var (
+ start = time.Now()
+ logged = time.Now()
+ count int
+ )
+ // Migrate all ancient blocks. This can happen if someone upgrades from Geth
+ // 1.8.x to 1.9.x mid-fast-sync. Perhaps we can get rid of this path in the
+ // long term.
+ for {
+ // We can ignore the error here since light client won't hit this code path.
+ frozen, _ := bc.db.Ancients()
+ if frozen >= block.NumberU64() {
+ break
+ }
+ h := rawdb.ReadCanonicalHash(bc.db, frozen)
+ b := rawdb.ReadBlock(bc.db, h, frozen)
+ size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, frozen, bc.chainConfig), rawdb.ReadTd(bc.db, h, frozen))
+ count += 1
+
+ // Always keep genesis block in active database.
+ if b.NumberU64() != 0 {
+ deleted = append(deleted, b)
+ }
+ if time.Since(logged) > 8*time.Second {
+ log.Info("Migrating ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
+ logged = time.Now()
+ }
+ }
+ if count > 0 {
+ log.Info("Migrated ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
+ }
+ // Flush data into ancient database.
+ size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
+ rawdb.WriteTxLookupEntries(batch, block)
+
+ stats.processed++
}
- // Short circuit if the owner header is unknown
- if !bc.HasHeader(block.Hash(), block.NumberU64()) {
- return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ // Flush all tx-lookup index data.
+ size += batch.ValueSize()
+ if err := batch.Write(); err != nil {
+ return 0, err
}
- // Skip if the entire data is already known
- if bc.HasBlock(block.Hash(), block.NumberU64()) {
- stats.ignored++
- continue
+ batch.Reset()
+
+ // Sync the ancient store explicitly to ensure all data has been flushed to disk.
+ if err := bc.db.Sync(); err != nil {
+ return 0, err
}
- // Compute all the non-consensus fields of the receipts
- if err := receipts.DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
- return i, fmt.Errorf("failed to derive receipts data: %v", err)
+ if !updateHead(blockChain[len(blockChain)-1]) {
+ return 0, errors.New("side blocks can't be accepted as the ancient chain data")
}
- // Write all the data out into the database
- rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
- rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
- rawdb.WriteTxLookupEntries(batch, block)
+ previous = nil // disable rollback explicitly
- stats.processed++
+ // Wipe out canonical block data.
+ for _, block := range append(deleted, blockChain...) {
+ rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
+ rawdb.DeleteCanonicalHash(batch, block.NumberU64())
+ }
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ batch.Reset()
- if batch.ValueSize() >= ethdb.IdealBatchSize {
- if err := batch.Write(); err != nil {
- return 0, err
+ // Wipe out side chain too.
+ for _, block := range append(deleted, blockChain...) {
+ for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
+ rawdb.DeleteBlock(batch, hash, block.NumberU64())
}
- bytes += batch.ValueSize()
- batch.Reset()
}
- }
- if batch.ValueSize() > 0 {
- bytes += batch.ValueSize()
if err := batch.Write(); err != nil {
return 0, err
}
+ return 0, nil
}
+ // writeLive writes blockchain and corresponding receipt chain into active store.
+ writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ batch := bc.db.NewBatch()
+ for i, block := range blockChain {
+ // Short circuit insertion if shutting down or processing failed
+ if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+ return 0, errInsertionInterrupted
+ }
+ // Short circuit if the owner header is unknown
+ if !bc.HasHeader(block.Hash(), block.NumberU64()) {
+ return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ }
+ if bc.HasBlock(block.Hash(), block.NumberU64()) {
+ stats.ignored++
+ continue
+ }
+ // Write all the data out into the database
+ rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
+ rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
+ rawdb.WriteTxLookupEntries(batch, block)
- // Update the head fast sync block if better
- bc.chainmu.Lock()
- head := blockChain[len(blockChain)-1]
- if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
- currentFastBlock := bc.CurrentFastBlock()
- if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
- rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
- bc.currentFastBlock.Store(head)
+ stats.processed++
+ if batch.ValueSize() >= ethdb.IdealBatchSize {
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ size += batch.ValueSize()
+ batch.Reset()
+ }
+ }
+ if batch.ValueSize() > 0 {
+ size += batch.ValueSize()
+ if err := batch.Write(); err != nil {
+ return 0, err
+ }
+ }
+ updateHead(blockChain[len(blockChain)-1])
+ return 0, nil
+ }
+ // Write downloaded chain data and corresponding receipt chain data.
+ if len(ancientBlocks) > 0 {
+ if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
+ if err == errInsertionInterrupted {
+ return 0, nil
+ }
+ return n, err
+ }
+ }
+ if len(liveBlocks) > 0 {
+ if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
+ if err == errInsertionInterrupted {
+ return 0, nil
+ }
+ return n, err
}
}
- bc.chainmu.Unlock()
+ head := blockChain[len(blockChain)-1]
context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
- "size", common.StorageSize(bytes),
+ "size", common.StorageSize(size),
}
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 70e3207f5..8dfcda6d4 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -18,8 +18,10 @@ package core
import (
"fmt"
+ "io/ioutil"
"math/big"
"math/rand"
+ "os"
"sync"
"testing"
"time"
@@ -33,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/params"
)
@@ -639,7 +640,27 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
- if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
+ if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ // Freezer style fast import the chain.
+ frdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.Remove(frdir)
+ ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer db: %v", err)
+ }
+ gspec.MustCommit(ancientDb)
+ ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer ancient.Stop()
+
+ if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// Iterate over all chain data components, and cross reference
@@ -647,26 +668,35 @@ func TestFastVsFullChains(t *testing.T) {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
if ftd, atd := fast.GetTdByHash(hash), archive.GetTdByHash(hash); ftd.Cmp(atd) != 0 {
- t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd)
+ t.Errorf("block #%d [%x]: td mismatch: fastdb %v, archivedb %v", num, hash, ftd, atd)
+ }
+ if antd, artd := ancient.GetTdByHash(hash), archive.GetTdByHash(hash); antd.Cmp(artd) != 0 {
+ t.Errorf("block #%d [%x]: td mismatch: ancientdb %v, archivedb %v", num, hash, antd, artd)
}
if fheader, aheader := fast.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); fheader.Hash() != aheader.Hash() {
- t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader)
+ t.Errorf("block #%d [%x]: header mismatch: fastdb %v, archivedb %v", num, hash, fheader, aheader)
}
- if fblock, ablock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash); fblock.Hash() != ablock.Hash() {
- t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock)
- } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) {
- t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions())
- } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) {
- t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles())
+ if anheader, arheader := ancient.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); anheader.Hash() != arheader.Hash() {
+ t.Errorf("block #%d [%x]: header mismatch: ancientdb %v, archivedb %v", num, hash, anheader, arheader)
}
- if freceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), archive.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
- t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts)
+ if fblock, arblock, anblock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash), ancient.GetBlockByHash(hash); fblock.Hash() != arblock.Hash() || anblock.Hash() != arblock.Hash() {
+ t.Errorf("block #%d [%x]: block mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock, anblock, arblock)
+ } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(arblock.Transactions()) || types.DeriveSha(anblock.Transactions()) != types.DeriveSha(arblock.Transactions()) {
+ t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions())
+ } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) {
+ t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles())
+ }
+ if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
+ t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}
}
// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
- t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash)
+ t.Errorf("block #%d: canonical hash mismatch: fastdb %v, archivedb %v", i, fhash, ahash)
+ }
+ if anhash, arhash := rawdb.ReadCanonicalHash(ancientDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); anhash != arhash {
+ t.Errorf("block #%d: canonical hash mismatch: ancientdb %v, archivedb %v", i, anhash, arhash)
}
}
}
@@ -686,6 +716,20 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)
+ // makeDb creates a db instance for testing.
+ makeDb := func() (ethdb.Database, func()) {
+ dir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.Remove(dir)
+ db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer db: %v", err)
+ }
+ gspec.MustCommit(db)
+ return db, func() { os.RemoveAll(dir) }
+ }
// Configure a subchain to roll back
remove := []common.Hash{}
for _, block := range blocks[height/2:] {
@@ -704,9 +748,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
}
}
// Import the chain as an archive node and ensure all pointers are updated
- archiveDb := rawdb.NewMemoryDatabase()
- gspec.MustCommit(archiveDb)
-
+ archiveDb, delfn := makeDb()
+ defer delfn()
archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
@@ -718,8 +761,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
assert(t, "archive", archive, height/2, height/2, height/2)
// Import the chain as a non-archive node and ensure all pointers are updated
- fastDb := rawdb.NewMemoryDatabase()
- gspec.MustCommit(fastDb)
+ fastDb, delfn := makeDb()
+ defer delfn()
fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer fast.Stop()
@@ -730,17 +773,35 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
- if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
+ if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
assert(t, "fast", fast, height, height, 0)
fast.Rollback(remove)
assert(t, "fast", fast, height/2, height/2, 0)
- // Import the chain as a light node and ensure all pointers are updated
- lightDb := rawdb.NewMemoryDatabase()
- gspec.MustCommit(lightDb)
+ // Import the chain as a ancient-first node and ensure all pointers are updated
+ ancientDb, delfn := makeDb()
+ defer delfn()
+ ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer ancient.Stop()
+
+ if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ assert(t, "ancient", ancient, height, height, 0)
+ ancient.Rollback(remove)
+ assert(t, "ancient", ancient, height/2, height/2, 0)
+ if frozen, err := ancientDb.Ancients(); err != nil || frozen != height/2+1 {
+ t.Fatalf("failed to truncate ancient store, want %v, have %v", height/2+1, frozen)
+ }
+ // Import the chain as a light node and ensure all pointers are updated
+ lightDb, delfn := makeDb()
+ defer delfn()
light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
if n, err := light.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
@@ -918,7 +979,7 @@ func TestLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- db = memorydb.New()
+ db = rawdb.NewMemoryDatabase()
// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
@@ -1040,7 +1101,7 @@ func TestSideLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- db = memorydb.New()
+ db = rawdb.NewMemoryDatabase()
// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
@@ -1564,6 +1625,119 @@ func TestLargeReorgTrieGC(t *testing.T) {
}
}
+func TestBlockchainRecovery(t *testing.T) {
+ // Configure and generate a sample block chain
+ var (
+ gendb = rawdb.NewMemoryDatabase()
+ key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ funds = big.NewInt(1000000000)
+ gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
+ genesis = gspec.MustCommit(gendb)
+ )
+ height := uint64(1024)
+ blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)
+
+ // Import the chain as a ancient-first node and ensure all pointers are updated
+ frdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.Remove(frdir)
+ ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer db: %v", err)
+ }
+ gspec.MustCommit(ancientDb)
+ ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+
+ headers := make([]*types.Header, len(blocks))
+ for i, block := range blocks {
+ headers[i] = block.Header()
+ }
+ if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ ancient.Stop()
+
+ // Destroy head fast block manually
+ midBlock := blocks[len(blocks)/2]
+ rawdb.WriteHeadFastBlockHash(ancientDb, midBlock.Hash())
+
+ // Reopen broken blockchain again
+ ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer ancient.Stop()
+ if num := ancient.CurrentBlock().NumberU64(); num != 0 {
+ t.Errorf("head block mismatch: have #%v, want #%v", num, 0)
+ }
+ if num := ancient.CurrentFastBlock().NumberU64(); num != midBlock.NumberU64() {
+ t.Errorf("head fast-block mismatch: have #%v, want #%v", num, midBlock.NumberU64())
+ }
+ if num := ancient.CurrentHeader().Number.Uint64(); num != midBlock.NumberU64() {
+ t.Errorf("head header mismatch: have #%v, want #%v", num, midBlock.NumberU64())
+ }
+}
+
+func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
+ // Configure and generate a sample block chain
+ var (
+ gendb = rawdb.NewMemoryDatabase()
+ key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ funds = big.NewInt(1000000000)
+ gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
+ genesis = gspec.MustCommit(gendb)
+ )
+ height := uint64(1024)
+ blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)
+
+ // Import the chain as a ancient-first node and ensure all pointers are updated
+ frdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.Remove(frdir)
+ ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer db: %v", err)
+ }
+ gspec.MustCommit(ancientDb)
+ ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer ancient.Stop()
+
+ headers := make([]*types.Header, len(blocks))
+ for i, block := range blocks {
+ headers[i] = block.Header()
+ }
+ if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ // Abort ancient receipt chain insertion deliberately
+ ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
+ return number == blocks[len(blocks)/2].NumberU64()
+ }
+ previousFastBlock := ancient.CurrentFastBlock()
+ if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
+ t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
+ }
+ if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
+ t.Fatalf("failed to truncate ancient data")
+ }
+ ancient.terminateInsert = nil
+ if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
+ t.Fatalf("failed to insert ancient recept chain after rollback")
+ }
+}
+
// Tests that importing a very large side fork, which is larger than the canon chain,
// but where the difficulty per block is kept low: this means that it will not
// overtake the 'canon' chain until after it's passed canon by about 200 blocks.
@@ -1719,10 +1893,18 @@ func testInsertKnownChainData(t *testing.T, typ string) {
b.SetCoinbase(common.Address{1})
b.OffsetTime(-9) // A higher difficulty
})
-
// Import the shared chain and the original canonical one
- chaindb := rawdb.NewMemoryDatabase()
+ dir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.Remove(dir)
+ chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "")
+ if err != nil {
+ t.Fatalf("failed to create temp freezer db: %v", err)
+ }
new(Genesis).MustCommit(chaindb)
+ defer os.RemoveAll(dir)
chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
if err != nil {
@@ -1764,7 +1946,7 @@ func testInsertKnownChainData(t *testing.T, typ string) {
if err != nil {
return err
}
- _, err = chain.InsertReceiptChain(blocks, receipts)
+ _, err = chain.InsertReceiptChain(blocks, receipts, 0)
return err
}
asserter = func(t *testing.T, block *types.Block) {
@@ -1819,18 +2001,16 @@ func testInsertKnownChainData(t *testing.T, typ string) {
// The head shouldn't change.
asserter(t, blocks3[len(blocks3)-1])
- if typ != "headers" {
- // Rollback the heavier chain and re-insert the longer chain again
- for i := 0; i < len(blocks3); i++ {
- rollback = append(rollback, blocks3[i].Hash())
- }
- chain.Rollback(rollback)
+ // Rollback the heavier chain and re-insert the longer chain again
+ for i := 0; i < len(blocks3); i++ {
+ rollback = append(rollback, blocks3[i].Hash())
+ }
+ chain.Rollback(rollback)
- if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
- t.Fatalf("failed to insert chain data: %v", err)
- }
- asserter(t, blocks2[len(blocks2)-1])
+ if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
+ t.Fatalf("failed to insert chain data: %v", err)
}
+ asserter(t, blocks2[len(blocks2)-1])
}
// getLongAndShortChains returns two chains,
@@ -2019,14 +2199,12 @@ func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) {
numTxs = 1000
numBlocks = 1
)
-
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce))
}
dataFn := func(nonce uint64) []byte {
return nil
}
-
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
@@ -2044,7 +2222,6 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
dataFn := func(nonce uint64) []byte {
return nil
}
-
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
@@ -2062,6 +2239,5 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
dataFn := func(nonce uint64) []byte {
return nil
}
-
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
diff --git a/core/genesis.go b/core/genesis.go
index 1f34a3a9e..830fb033b 100644
--- a/core/genesis.go
+++ b/core/genesis.go
@@ -170,6 +170,22 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, genesis *Genesis, constant
return genesis.Config, block.Hash(), err
}
+ // We have the genesis block in database(perhaps in ancient database)
+ // but the corresponding state is missing.
+ header := rawdb.ReadHeader(db, stored, 0)
+ if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0)); err != nil {
+ if genesis == nil {
+ genesis = DefaultGenesisBlock()
+ }
+ // Ensure the stored genesis matches with the given one.
+ hash := genesis.ToBlock(nil).Hash()
+ if hash != stored {
+ return genesis.Config, hash, &GenesisMismatchError{stored, hash}
+ }
+ block, err := genesis.Commit(db)
+ return genesis.Config, block.Hash(), err
+ }
+
// Check whether the genesis block is already written.
if genesis != nil {
hash := genesis.ToBlock(nil).Hash()
@@ -277,6 +293,7 @@ func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) {
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil)
rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db, block.Hash())
+ rawdb.WriteHeadFastBlockHash(db, block.Hash())
rawdb.WriteHeadHeaderHash(db, block.Hash())
config := g.Config
diff --git a/core/headerchain.go b/core/headerchain.go
index d0c1987fb..cdd64bb50 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -274,9 +274,14 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCa
return i, errors.New("aborted")
}
// If the header's already known, skip it, otherwise store
- if hc.HasHeader(header.Hash(), header.Number.Uint64()) {
- stats.ignored++
- continue
+ hash := header.Hash()
+ if hc.HasHeader(hash, header.Number.Uint64()) {
+ externTd := hc.GetTd(hash, header.Number.Uint64())
+ localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
+ if externTd == nil || externTd.Cmp(localTd) <= 0 {
+ stats.ignored++
+ continue
+ }
}
if err := writeHeader(header); err != nil {
return i, err
@@ -453,33 +458,56 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
hc.currentHeaderHash = head.Hash()
}
-// DeleteCallback is a callback function that is called by SetHead before
-// each header is deleted.
-type DeleteCallback func(ethdb.Writer, common.Hash, uint64)
+type (
+ // UpdateHeadBlocksCallback is a callback function that is called by SetHead
+ // before head header is updated.
+ UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header)
+
+ // DeleteBlockContentCallback is a callback function that is called by SetHead
+ // before each header is deleted.
+ DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64)
+)
// SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set.
-func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
- height := uint64(0)
-
- if hdr := hc.CurrentHeader(); hdr != nil {
- height = hdr.Number.Uint64()
- }
- batch := hc.chainDb.NewBatch()
+func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) {
+ var (
+ parentHash common.Hash
+ batch = hc.chainDb.NewBatch()
+ )
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
- hash := hdr.Hash()
- num := hdr.Number.Uint64()
+ hash, num := hdr.Hash(), hdr.Number.Uint64()
+
+ // Rewind block chain to new head.
+ parent := hc.GetHeader(hdr.ParentHash, num-1)
+ if parent == nil {
+ parent = hc.genesisHeader
+ }
+ parentHash = hdr.ParentHash
+ // Notably, since geth has the possibility for setting the head to a low
+ // height which is even lower than ancient head.
+ // In order to ensure that the head is always no higher than the data in
+ // the database(ancient store or active store), we need to update head
+ // first then remove the relative data from the database.
+ //
+ // Update head first(head fast block, head full block) before deleting the data.
+ if updateFn != nil {
+ updateFn(hc.chainDb, parent)
+ }
+ // Update head header then.
+ rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash)
+
+ // Remove the relative data from the database.
if delFn != nil {
delFn(batch, hash, num)
}
+ // Rewind header chain to new head.
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
+ rawdb.DeleteCanonicalHash(batch, num)
- hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
- }
- // Roll back the canonical chain numbering
- for i := height; i > head; i-- {
- rawdb.DeleteCanonicalHash(batch, i)
+ hc.currentHeader.Store(parent)
+ hc.currentHeaderHash = parentHash
}
batch.Write()
@@ -487,13 +515,6 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
hc.headerCache.Purge()
hc.tdCache.Purge()
hc.numberCache.Purge()
-
- if hc.CurrentHeader() == nil {
- hc.currentHeader.Store(hc.genesisHeader)
- }
- hc.currentHeaderHash = hc.CurrentHeader().Hash()
-
- rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash)
}
// SetGenesis sets a new genesis block header for the chain
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index cc0591a4c..fab7ca56c 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -31,7 +31,17 @@ import (
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
- data, _ := db.Get(headerHashKey(number))
+ data, _ := db.Ancient(freezerHashTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(headerHashKey(number))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerHashTable, number)
+ }
+ }
if len(data) == 0 {
return common.Hash{}
}
@@ -39,21 +49,38 @@ func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
}
// WriteCanonicalHash stores the hash assigned to a canonical block number.
-func WriteCanonicalHash(db ethdb.Writer, hash common.Hash, number uint64) {
+func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil {
log.Crit("Failed to store number to hash mapping", "err", err)
}
}
// DeleteCanonicalHash removes the number to hash canonical mapping.
-func DeleteCanonicalHash(db ethdb.Writer, number uint64) {
+func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) {
if err := db.Delete(headerHashKey(number)); err != nil {
log.Crit("Failed to delete number to hash mapping", "err", err)
}
}
+// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
+// both canonical and reorged forks included.
+func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash {
+ prefix := headerKeyPrefix(number)
+
+ hashes := make([]common.Hash, 0, 1)
+ it := db.NewIteratorWithPrefix(prefix)
+ defer it.Release()
+
+ for it.Next() {
+ if key := it.Key(); len(key) == len(prefix)+32 {
+ hashes = append(hashes, common.BytesToHash(key[len(key)-32:]))
+ }
+ }
+ return hashes
+}
+
// ReadHeaderNumber returns the header number assigned to a hash.
-func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 {
+func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
data, _ := db.Get(headerNumberKey(hash))
if len(data) != 8 {
return nil
@@ -62,8 +89,24 @@ func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 {
return &number
}
+// WriteHeaderNumber stores the hash->number mapping.
+func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
+ key := headerNumberKey(hash)
+ enc := encodeBlockNumber(number)
+ if err := db.Put(key, enc); err != nil {
+ log.Crit("Failed to store hash to number mapping", "err", err)
+ }
+}
+
+// DeleteHeaderNumber removes hash->number mapping.
+func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) {
+ if err := db.Delete(headerNumberKey(hash)); err != nil {
+ log.Crit("Failed to delete hash to number mapping", "err", err)
+ }
+}
+
// ReadHeadHeaderHash retrieves the hash of the current canonical head header.
-func ReadHeadHeaderHash(db ethdb.Reader) common.Hash {
+func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash {
data, _ := db.Get(headHeaderKey)
if len(data) == 0 {
return common.Hash{}
@@ -72,14 +115,14 @@ func ReadHeadHeaderHash(db ethdb.Reader) common.Hash {
}
// WriteHeadHeaderHash stores the hash of the current canonical head header.
-func WriteHeadHeaderHash(db ethdb.Writer, hash common.Hash) {
+func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headHeaderKey, hash.Bytes()); err != nil {
log.Crit("Failed to store last header's hash", "err", err)
}
}
// ReadHeadBlockHash retrieves the hash of the current canonical head block.
-func ReadHeadBlockHash(db ethdb.Reader) common.Hash {
+func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash {
data, _ := db.Get(headBlockKey)
if len(data) == 0 {
return common.Hash{}
@@ -88,14 +131,14 @@ func ReadHeadBlockHash(db ethdb.Reader) common.Hash {
}
// WriteHeadBlockHash stores the head block's hash.
-func WriteHeadBlockHash(db ethdb.Writer, hash common.Hash) {
+func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headBlockKey, hash.Bytes()); err != nil {
log.Crit("Failed to store last block's hash", "err", err)
}
}
// ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block.
-func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash {
+func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash {
data, _ := db.Get(headFastBlockKey)
if len(data) == 0 {
return common.Hash{}
@@ -104,7 +147,7 @@ func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash {
}
// WriteHeadFastBlockHash stores the hash of the current fast-sync head block.
-func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) {
+func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil {
log.Crit("Failed to store last fast block's hash", "err", err)
}
@@ -112,7 +155,7 @@ func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) {
// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow
// reporting correct numbers across restarts.
-func ReadFastTrieProgress(db ethdb.Reader) uint64 {
+func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 {
data, _ := db.Get(fastTrieProgressKey)
if len(data) == 0 {
return 0
@@ -122,7 +165,7 @@ func ReadFastTrieProgress(db ethdb.Reader) uint64 {
// WriteFastTrieProgress stores the fast sync trie process counter to support
// retrieving it across restarts.
-func WriteFastTrieProgress(db ethdb.Writer, count uint64) {
+func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) {
if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil {
log.Crit("Failed to store fast sync trie progress", "err", err)
}
@@ -130,12 +173,25 @@ func WriteFastTrieProgress(db ethdb.Writer, count uint64) {
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
- data, _ := db.Get(headerKey(number, hash))
+ data, _ := db.Ancient(freezerHeaderTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(headerKey(number, hash))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerHeaderTable, number)
+ }
+ }
return data
}
// HasHeader verifies the existence of a block header corresponding to the hash.
func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool {
+ if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
+ return true
+ }
if has, err := db.Has(headerKey(number, hash)); !has || err != nil {
return false
}
@@ -158,30 +214,27 @@ func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header
// WriteHeader stores a block header into the database and also stores the hash-
// to-number mapping.
-func WriteHeader(db ethdb.Writer, header *types.Header) {
- // Write the hash -> number mapping
+func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) {
var (
- hash = header.Hash()
- number = header.Number.Uint64()
- encoded = encodeBlockNumber(number)
+ hash = header.Hash()
+ number = header.Number.Uint64()
)
- key := headerNumberKey(hash)
- if err := db.Put(key, encoded); err != nil {
- log.Crit("Failed to store hash to number mapping", "err", err)
- }
+ // Write the hash -> number mapping
+ WriteHeaderNumber(db, hash, number)
+
// Write the encoded header
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
}
- key = headerKey(number, hash)
+ key := headerKey(number, hash)
if err := db.Put(key, data); err != nil {
log.Crit("Failed to store header", "err", err)
}
}
// DeleteHeader removes all block header data associated with a hash.
-func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
deleteHeaderWithoutNumber(db, hash, number)
if err := db.Delete(headerNumberKey(hash)); err != nil {
log.Crit("Failed to delete hash to number mapping", "err", err)
@@ -190,7 +243,7 @@ func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) {
// deleteHeaderWithoutNumber removes only the block header but does not remove
// the hash to number mapping.
-func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) {
+func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(headerKey(number, hash)); err != nil {
log.Crit("Failed to delete header", "err", err)
}
@@ -198,12 +251,22 @@ func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64)
// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
- data, _ := db.Get(blockBodyKey(number, hash))
+ data, _ := db.Ancient(freezerBodiesTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(blockBodyKey(number, hash))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerBodiesTable, number)
+ }
+ }
return data
}
// WriteBodyRLP stores an RLP encoded block body into the database.
-func WriteBodyRLP(db ethdb.Writer, hash common.Hash, number uint64, rlp rlp.RawValue) {
+func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp rlp.RawValue) {
if err := db.Put(blockBodyKey(number, hash), rlp); err != nil {
log.Crit("Failed to store block body", "err", err)
}
@@ -211,6 +274,9 @@ func WriteBodyRLP(db ethdb.Writer, hash common.Hash, number uint64, rlp rlp.RawV
// HasBody verifies the existence of a block body corresponding to the hash.
func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool {
+ if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
+ return true
+ }
if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil {
return false
}
@@ -232,7 +298,7 @@ func ReadBody(db ethdb.Reader, hash common.Hash, number uint64) *types.Body {
}
// WriteBody stores a block body into the database.
-func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Body) {
+func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *types.Body) {
data, err := rlp.EncodeToBytes(body)
if err != nil {
log.Crit("Failed to RLP encode body", "err", err)
@@ -241,7 +307,7 @@ func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Bod
}
// DeleteBody removes all block body data associated with a hash.
-func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
log.Crit("Failed to delete block body", "err", err)
}
@@ -249,7 +315,17 @@ func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) {
// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
- data, _ := db.Get(headerTDKey(number, hash))
+ data, _ := db.Ancient(freezerDifficultyTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(headerTDKey(number, hash))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerDifficultyTable, number)
+ }
+ }
return data
}
@@ -268,7 +344,7 @@ func ReadTd(db ethdb.Reader, hash common.Hash, number uint64) *big.Int {
}
// WriteTd stores the total difficulty of a block into the database.
-func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) {
+func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) {
data, err := rlp.EncodeToBytes(td)
if err != nil {
log.Crit("Failed to RLP encode block total difficulty", "err", err)
@@ -279,7 +355,7 @@ func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) {
}
// DeleteTd removes all block total difficulty data associated with a hash.
-func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(headerTDKey(number, hash)); err != nil {
log.Crit("Failed to delete block total difficulty", "err", err)
}
@@ -288,6 +364,9 @@ func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) {
// HasReceipts verifies the existence of all the transaction receipts belonging
// to a block.
func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
+ if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
+ return true
+ }
if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil {
return false
}
@@ -296,7 +375,17 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
- data, _ := db.Get(blockReceiptsKey(number, hash))
+ data, _ := db.Ancient(freezerReceiptTable, number)
+ if len(data) == 0 {
+ data, _ = db.Get(blockReceiptsKey(number, hash))
+ // In the background freezer is moving data from leveldb to flatten files.
+ // So during the first check for ancient db, the data is not yet in there,
+ // but when we reach into leveldb, the data was already moved. That would
+ // result in a not found error.
+ if len(data) == 0 {
+ data, _ = db.Ancient(freezerReceiptTable, number)
+ }
+ }
return data
}
@@ -348,7 +437,7 @@ func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *para
}
// WriteReceipts stores all the transaction receipts belonging to a block.
-func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts types.Receipts) {
+func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts types.Receipts) {
// Convert the receipts into their storage form and serialize them
storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
@@ -365,7 +454,7 @@ func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts ty
}
// DeleteReceipts removes all receipt data associated with a block hash.
-func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
log.Crit("Failed to delete block receipts", "err", err)
}
@@ -390,22 +479,53 @@ func ReadBlock(db ethdb.Reader, hash common.Hash, number uint64) *types.Block {
}
// WriteBlock serializes a block into the database, header and body separately.
-func WriteBlock(db ethdb.Writer, block *types.Block) {
+func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteBody(db, block.Hash(), block.NumberU64(), block.Body())
WriteHeader(db, block.Header())
}
+// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
+func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int {
+ // Encode all block components to RLP format.
+ headerBlob, err := rlp.EncodeToBytes(block.Header())
+ if err != nil {
+ log.Crit("Failed to RLP encode block header", "err", err)
+ }
+ bodyBlob, err := rlp.EncodeToBytes(block.Body())
+ if err != nil {
+ log.Crit("Failed to RLP encode body", "err", err)
+ }
+ storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
+ for i, receipt := range receipts {
+ storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
+ }
+ receiptBlob, err := rlp.EncodeToBytes(storageReceipts)
+ if err != nil {
+ log.Crit("Failed to RLP encode block receipts", "err", err)
+ }
+ tdBlob, err := rlp.EncodeToBytes(td)
+ if err != nil {
+ log.Crit("Failed to RLP encode block total difficulty", "err", err)
+ }
+ // Write all blob to flatten files.
+ err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob)
+ if err != nil {
+ log.Crit("Failed to write block data to ancient store", "err", err)
+ }
+ return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength
+}
+
// DeleteBlock removes all block data associated with a hash.
-func DeleteBlock(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)
DeleteHeader(db, hash, number)
DeleteBody(db, hash, number)
DeleteTd(db, hash, number)
}
-// deleteBlockWithoutNumber removes all block data associated with a hash, except
+// DeleteBlockWithoutNumber removes all block data associated with a hash, except
// the hash to number mapping.
-func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) {
+func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)
deleteHeaderWithoutNumber(db, hash, number)
DeleteBody(db, hash, number)
diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go
index 423145a76..ed1f1bca6 100644
--- a/core/rawdb/accessors_indexes.go
+++ b/core/rawdb/accessors_indexes.go
@@ -54,7 +54,7 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
// WriteTxLookupEntries stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
-func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) {
+func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) {
for _, tx := range block.Transactions() {
if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil {
log.Crit("Failed to store transaction lookup entry", "err", err)
@@ -63,7 +63,7 @@ func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) {
}
// DeleteTxLookupEntry removes all transaction data associated with a hash.
-func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) {
+func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) {
db.Delete(txLookupKey(hash))
}
@@ -117,13 +117,13 @@ func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig)
// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
-func ReadBloomBits(db ethdb.Reader, bit uint, section uint64, head common.Hash) ([]byte, error) {
+func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) {
return db.Get(bloomBitsKey(bit, section, head))
}
// WriteBloomBits stores the compressed bloom bits vector belonging to the given
// section and bit index.
-func WriteBloomBits(db ethdb.Writer, bit uint, section uint64, head common.Hash, bits []byte) {
+func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) {
if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil {
log.Crit("Failed to store bloom bits", "err", err)
}
diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go
index 1361b0d73..f8d09fbdd 100644
--- a/core/rawdb/accessors_metadata.go
+++ b/core/rawdb/accessors_metadata.go
@@ -27,7 +27,7 @@ import (
)
// ReadDatabaseVersion retrieves the version number of the database.
-func ReadDatabaseVersion(db ethdb.Reader) *uint64 {
+func ReadDatabaseVersion(db ethdb.KeyValueReader) *uint64 {
var version uint64
enc, _ := db.Get(databaseVerisionKey)
@@ -42,7 +42,7 @@ func ReadDatabaseVersion(db ethdb.Reader) *uint64 {
}
// WriteDatabaseVersion stores the version number of the database
-func WriteDatabaseVersion(db ethdb.Writer, version uint64) {
+func WriteDatabaseVersion(db ethdb.KeyValueWriter, version uint64) {
enc, err := rlp.EncodeToBytes(version)
if err != nil {
log.Crit("Failed to encode database version", "err", err)
@@ -53,7 +53,7 @@ func WriteDatabaseVersion(db ethdb.Writer, version uint64) {
}
// ReadChainConfig retrieves the consensus settings based on the given genesis hash.
-func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig {
+func ReadChainConfig(db ethdb.KeyValueReader, hash common.Hash) *params.ChainConfig {
data, _ := db.Get(configKey(hash))
if len(data) == 0 {
return nil
@@ -67,7 +67,7 @@ func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig {
}
// WriteChainConfig writes the chain config settings to the database.
-func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig) {
+func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.ChainConfig) {
if cfg == nil {
return
}
@@ -81,13 +81,13 @@ func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig
}
// ReadPreimage retrieves a single preimage of the provided hash.
-func ReadPreimage(db ethdb.Reader, hash common.Hash) []byte {
+func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte {
data, _ := db.Get(preimageKey(hash))
return data
}
// WritePreimages writes the provided set of preimages to the database.
-func WritePreimages(db ethdb.Writer, preimages map[common.Hash][]byte) {
+func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) {
for hash, preimage := range preimages {
if err := db.Put(preimageKey(hash), preimage); err != nil {
log.Crit("Failed to store trie preimage", "err", err)
diff --git a/core/rawdb/database.go b/core/rawdb/database.go
index b4c5dea70..353b7dce6 100644
--- a/core/rawdb/database.go
+++ b/core/rawdb/database.go
@@ -17,15 +17,168 @@
package rawdb
import (
+ "bytes"
+ "errors"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/olekukonko/tablewriter"
)
+// freezerdb is a database wrapper that enabled freezer data retrievals.
+type freezerdb struct {
+ ethdb.KeyValueStore
+ ethdb.AncientStore
+}
+
+// Close implements io.Closer, closing both the fast key-value store as well as
+// the slow ancient tables.
+func (frdb *freezerdb) Close() error {
+ var errs []error
+ if err := frdb.KeyValueStore.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ if err := frdb.AncientStore.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ if len(errs) != 0 {
+ return fmt.Errorf("%v", errs)
+ }
+ return nil
+}
+
+// nofreezedb is a database wrapper that disables freezer data retrievals.
+type nofreezedb struct {
+ ethdb.KeyValueStore
+}
+
+// HasAncient returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) HasAncient(kind string, number uint64) (bool, error) {
+ return false, errNotSupported
+}
+
+// Ancient returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) {
+ return nil, errNotSupported
+}
+
+// Ancients returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) Ancients() (uint64, error) {
+ return 0, errNotSupported
+}
+
+// AncientSize returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
+ return 0, errNotSupported
+}
+
+// AppendAncient returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
+ return errNotSupported
+}
+
+// TruncateAncients returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) TruncateAncients(items uint64) error {
+ return errNotSupported
+}
+
+// Sync returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) Sync() error {
+ return errNotSupported
+}
+
// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
- return db
+ return &nofreezedb{
+ KeyValueStore: db,
+ }
+}
+
+// NewDatabaseWithFreezer creates a high level database on top of a given key-
+// value data store with a freezer moving immutable chain segments into cold
+// storage.
+func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) {
+ // Create the idle freezer instance
+ frdb, err := newFreezer(freezer, namespace)
+ if err != nil {
+ return nil, err
+ }
+ // Since the freezer can be stored separately from the user's key-value database,
+ // there's a fairly high probability that the user requests invalid combinations
+ // of the freezer and database. Ensure that we don't shoot ourselves in the foot
+ // by serving up conflicting data, leading to both datastores getting corrupted.
+ //
+ // - If both the freezer and key-value store is empty (no genesis), we just
+ // initialized a new empty freezer, so everything's fine.
+ // - If the key-value store is empty, but the freezer is not, we need to make
+ // sure the user's genesis matches the freezer. That will be checked in the
+ // blockchain, since we don't have the genesis block here (nor should we at
+ // this point care, the key-value/freezer combo is valid).
+ // - If neither the key-value store nor the freezer is empty, cross validate
+ // the genesis hashes to make sure they are compatible. If they are, also
+ // ensure that there's no gap between the freezer and sunsequently leveldb.
+ // - If the key-value store is not empty, but the freezer is we might just be
+ // upgrading to the freezer release, or we might have had a small chain and
+ // not frozen anything yet. Ensure that no blocks are missing yet from the
+ // key-value store, since that would mean we already had an old freezer.
+
+ // If the genesis hash is empty, we have a new key-value store, so nothing to
+ // validate in this method. If, however, the genesis hash is not nil, compare
+ // it to the freezer content.
+ if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 {
+ if frozen, _ := frdb.Ancients(); frozen > 0 {
+ // If the freezer already contains something, ensure that the genesis blocks
+ // match, otherwise we might mix up freezers across chains and destroy both
+ // the freezer and the key-value store.
+ if frgenesis, _ := frdb.Ancient(freezerHashTable, 0); !bytes.Equal(kvgenesis, frgenesis) {
+ return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis)
+ }
+ // Key-value store and freezer belong to the same network. Ensure that they
+ // are contiguous, otherwise we might end up with a non-functional freezer.
+ if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 {
+ // Subsequent header after the freezer limit is missing from the database.
+ // Reject startup is the database has a more recent head.
+ if *ReadHeaderNumber(db, ReadHeadHeaderHash(db)) > frozen-1 {
+ return nil, fmt.Errorf("gap (#%d) in the chain between ancients and leveldb", frozen)
+ }
+ // Database contains only older data than the freezer, this happens if the
+ // state was wiped and reinited from an existing freezer.
+ } else {
+ // Key-value store continues where the freezer left off, all is fine. We might
+ // have duplicate blocks (crash after freezer write but before kay-value store
+ // deletion, but that's fine).
+ }
+ } else {
+ // If the freezer is empty, ensure nothing was moved yet from the key-value
+ // store, otherwise we'll end up missing data. We check block #1 to decide
+ // if we froze anything previously or not, but do take care of databases with
+ // only the genesis block.
+ if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) {
+ // Key-value store contains more data than the genesis block, make sure we
+ // didn't freeze anything yet.
+ if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 {
+ return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path")
+ }
+ // Block #1 is still in the database, we're allowed to init a new feezer
+ } else {
+ // The head header is still the genesis, we're allowed to init a new feezer
+ }
+ }
+ }
+ // Freezer is consistent with the key-value database, permit combining the two
+ go frdb.freeze(db)
+
+ return &freezerdb{
+ KeyValueStore: db,
+ AncientStore: frdb,
+ }, nil
}
// NewMemoryDatabase creates an ephemeral in-memory key-value database without a
@@ -34,9 +187,9 @@ func NewMemoryDatabase() ethdb.Database {
return NewDatabase(memorydb.New())
}
-// NewMemoryDatabaseWithCap creates an ephemeral in-memory key-value database with
-// an initial starting capacity, but without a freezer moving immutable chain
-// segments into cold storage.
+// NewMemoryDatabaseWithCap creates an ephemeral in-memory key-value database
+// with an initial starting capacity, but without a freezer moving immutable
+// chain segments into cold storage.
func NewMemoryDatabaseWithCap(size int) ethdb.Database {
return NewDatabase(memorydb.NewWithCap(size))
}
@@ -50,3 +203,153 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string) (
}
return NewDatabase(db), nil
}
+
+// NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a
+// freezer moving immutable chain segments into cold storage.
+func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string) (ethdb.Database, error) {
+ kvdb, err := leveldb.New(file, cache, handles, namespace)
+ if err != nil {
+ return nil, err
+ }
+ frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace)
+ if err != nil {
+ kvdb.Close()
+ return nil, err
+ }
+ return frdb, nil
+}
+
+// InspectDatabase traverses the entire database and checks the size
+// of all different categories of data.
+func InspectDatabase(db ethdb.Database) error {
+ it := db.NewIterator()
+ defer it.Release()
+
+ var (
+ count int64
+ start = time.Now()
+ logged = time.Now()
+
+ // Key-value store statistics
+ total common.StorageSize
+ headerSize common.StorageSize
+ bodySize common.StorageSize
+ receiptSize common.StorageSize
+ tdSize common.StorageSize
+ numHashPairing common.StorageSize
+ hashNumPairing common.StorageSize
+ trieSize common.StorageSize
+ txlookupSize common.StorageSize
+ preimageSize common.StorageSize
+ bloomBitsSize common.StorageSize
+ cliqueSnapsSize common.StorageSize
+
+ // Ancient store statistics
+ ancientHeaders common.StorageSize
+ ancientBodies common.StorageSize
+ ancientReceipts common.StorageSize
+ ancientHashes common.StorageSize
+ ancientTds common.StorageSize
+
+ // Les statistic
+ chtTrieNodes common.StorageSize
+ bloomTrieNodes common.StorageSize
+
+ // Meta- and unaccounted data
+ metadata common.StorageSize
+ unaccounted common.StorageSize
+ )
+ // Inspect key-value database first.
+ for it.Next() {
+ var (
+ key = it.Key()
+ size = common.StorageSize(len(key) + len(it.Value()))
+ )
+ total += size
+ switch {
+ case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix):
+ tdSize += size
+ case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix):
+ numHashPairing += size
+ case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength):
+ headerSize += size
+ case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength):
+ hashNumPairing += size
+ case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength):
+ bodySize += size
+ case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength):
+ receiptSize += size
+ case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
+ txlookupSize += size
+ case bytes.HasPrefix(key, preimagePrefix) && len(key) == (len(preimagePrefix)+common.HashLength):
+ preimageSize += size
+ case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
+ bloomBitsSize += size
+ case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength:
+ cliqueSnapsSize += size
+ case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength:
+ chtTrieNodes += size
+ case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength:
+ bloomTrieNodes += size
+ case len(key) == common.HashLength:
+ trieSize += size
+ default:
+ var accounted bool
+ for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey} {
+ if bytes.Equal(key, meta) {
+ metadata += size
+ accounted = true
+ break
+ }
+ }
+ if !accounted {
+ unaccounted += size
+ }
+ }
+ count += 1
+ if count%1000 == 0 && time.Since(logged) > 8*time.Second {
+ log.Info("Inspecting database", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
+ logged = time.Now()
+ }
+ }
+ // Inspect append-only file store then.
+ ancients := []*common.StorageSize{&ancientHeaders, &ancientBodies, &ancientReceipts, &ancientHashes, &ancientTds}
+ for i, category := range []string{freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerHashTable, freezerDifficultyTable} {
+ if size, err := db.AncientSize(category); err == nil {
+ *ancients[i] += common.StorageSize(size)
+ total += common.StorageSize(size)
+ }
+ }
+ // Display the database statistic.
+ stats := [][]string{
+ {"Key-Value store", "Headers", headerSize.String()},
+ {"Key-Value store", "Bodies", bodySize.String()},
+ {"Key-Value store", "Receipts", receiptSize.String()},
+ {"Key-Value store", "Difficulties", tdSize.String()},
+ {"Key-Value store", "Block number->hash", numHashPairing.String()},
+ {"Key-Value store", "Block hash->number", hashNumPairing.String()},
+ {"Key-Value store", "Transaction index", txlookupSize.String()},
+ {"Key-Value store", "Bloombit index", bloomBitsSize.String()},
+ {"Key-Value store", "Trie nodes", trieSize.String()},
+ {"Key-Value store", "Trie preimages", preimageSize.String()},
+ {"Key-Value store", "Clique snapshots", cliqueSnapsSize.String()},
+ {"Key-Value store", "Singleton metadata", metadata.String()},
+ {"Ancient store", "Headers", ancientHeaders.String()},
+ {"Ancient store", "Bodies", ancientBodies.String()},
+ {"Ancient store", "Receipts", ancientReceipts.String()},
+ {"Ancient store", "Difficulties", ancientTds.String()},
+ {"Ancient store", "Block number->hash", ancientHashes.String()},
+ {"Light client", "CHT trie nodes", chtTrieNodes.String()},
+ {"Light client", "Bloom trie nodes", bloomTrieNodes.String()},
+ }
+ table := tablewriter.NewWriter(os.Stdout)
+ table.SetHeader([]string{"Database", "Category", "Size"})
+ table.SetFooter([]string{"", "Total", total.String()})
+ table.AppendBulk(stats)
+ table.Render()
+
+ if unaccounted > 0 {
+ log.Error("Database contains unaccounted data", "size", unaccounted)
+ }
+ return nil
+}
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go
new file mode 100644
index 000000000..67ed87d66
--- /dev/null
+++ b/core/rawdb/freezer.go
@@ -0,0 +1,382 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rawdb
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "os"
+ "path/filepath"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/prometheus/tsdb/fileutil"
+)
+
+var (
+ // errUnknownTable is returned if the user attempts to read from a table that is
+ // not tracked by the freezer.
+ errUnknownTable = errors.New("unknown table")
+
+ // errOutOrderInsertion is returned if the user attempts to inject out-of-order
+ // binary blobs into the freezer.
+ errOutOrderInsertion = errors.New("the append operation is out-order")
+
+ // errSymlinkDatadir is returned if the ancient directory specified by user
+ // is a symbolic link.
+ errSymlinkDatadir = errors.New("symbolic link datadir is not supported")
+)
+
+const (
+ // freezerRecheckInterval is the frequency to check the key-value database for
+ // chain progression that might permit new blocks to be frozen into immutable
+ // storage.
+ freezerRecheckInterval = time.Minute
+
+ // freezerBatchLimit is the maximum number of blocks to freeze in one batch
+ // before doing an fsync and deleting it from the key-value store.
+ freezerBatchLimit = 30000
+)
+
+// freezer is an memory mapped append-only database to store immutable chain data
+// into flat files:
+//
+// - The append only nature ensures that disk writes are minimized.
+// - The memory mapping ensures we can max out system memory for caching without
+// reserving it for go-ethereum. This would also reduce the memory requirements
+// of Geth, and thus also GC overhead.
+type freezer struct {
+ tables map[string]*freezerTable // Data tables for storing everything
+ frozen uint64 // Number of blocks already frozen
+ instanceLock fileutil.Releaser // File-system lock to prevent double opens
+}
+
+// newFreezer creates a chain freezer that moves ancient chain data into
+// append-only flat file containers.
+func newFreezer(datadir string, namespace string) (*freezer, error) {
+ // Create the initial freezer object
+ var (
+ readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
+ writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
+ )
+ // Ensure the datadir is not a symbolic link if it exists.
+ if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
+ if info.Mode()&os.ModeSymlink != 0 {
+ log.Warn("Symbolic link ancient database is not supported", "path", datadir)
+ return nil, errSymlinkDatadir
+ }
+ }
+ // Leveldb uses LOCK as the filelock filename. To prevent the
+ // name collision, we use FLOCK as the lock name.
+ lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK"))
+ if err != nil {
+ return nil, err
+ }
+ // Open all the supported data tables
+ freezer := &freezer{
+ tables: make(map[string]*freezerTable),
+ instanceLock: lock,
+ }
+ for name, disableSnappy := range freezerNoSnappy {
+ table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy)
+ if err != nil {
+ for _, table := range freezer.tables {
+ table.Close()
+ }
+ lock.Release()
+ return nil, err
+ }
+ freezer.tables[name] = table
+ }
+ if err := freezer.repair(); err != nil {
+ for _, table := range freezer.tables {
+ table.Close()
+ }
+ lock.Release()
+ return nil, err
+ }
+ log.Info("Opened ancient database", "database", datadir)
+ return freezer, nil
+}
+
+// Close terminates the chain freezer, unmapping all the data files.
+func (f *freezer) Close() error {
+ var errs []error
+ for _, table := range f.tables {
+ if err := table.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ if err := f.instanceLock.Release(); err != nil {
+ errs = append(errs, err)
+ }
+ if errs != nil {
+ return fmt.Errorf("%v", errs)
+ }
+ return nil
+}
+
+// HasAncient returns an indicator whether the specified ancient data exists
+// in the freezer.
+func (f *freezer) HasAncient(kind string, number uint64) (bool, error) {
+ if table := f.tables[kind]; table != nil {
+ return table.has(number), nil
+ }
+ return false, nil
+}
+
+// Ancient retrieves an ancient binary blob from the append-only immutable files.
+func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
+ if table := f.tables[kind]; table != nil {
+ return table.Retrieve(number)
+ }
+ return nil, errUnknownTable
+}
+
+// Ancients returns the length of the frozen items.
+func (f *freezer) Ancients() (uint64, error) {
+ return atomic.LoadUint64(&f.frozen), nil
+}
+
+// AncientSize returns the ancient size of the specified category.
+func (f *freezer) AncientSize(kind string) (uint64, error) {
+ if table := f.tables[kind]; table != nil {
+ return table.size()
+ }
+ return 0, errUnknownTable
+}
+
+// AppendAncient injects all binary blobs belong to block at the end of the
+// append-only immutable table files.
+//
+// Notably, this function is lock free but kind of thread-safe. All out-of-order
+// injection will be rejected. But if two injections with same number happen at
+// the same time, we can get into the trouble.
+func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) {
+ // Ensure the binary blobs we are appending is continuous with freezer.
+ if atomic.LoadUint64(&f.frozen) != number {
+ return errOutOrderInsertion
+ }
+ // Rollback all inserted data if any insertion below failed to ensure
+ // the tables won't out of sync.
+ defer func() {
+ if err != nil {
+ rerr := f.repair()
+ if rerr != nil {
+ log.Crit("Failed to repair freezer", "err", rerr)
+ }
+ log.Info("Append ancient failed", "number", number, "err", err)
+ }
+ }()
+ // Inject all the components into the relevant data tables
+ if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil {
+ log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err)
+ return err
+ }
+ if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil {
+ log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err)
+ return err
+ }
+ if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil {
+ log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err)
+ return err
+ }
+ if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil {
+ log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err)
+ return err
+ }
+ if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil {
+ log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err)
+ return err
+ }
+ atomic.AddUint64(&f.frozen, 1) // Only modify atomically
+ return nil
+}
+
+// Truncate discards any recent data above the provided threshold number.
+func (f *freezer) TruncateAncients(items uint64) error {
+ if atomic.LoadUint64(&f.frozen) <= items {
+ return nil
+ }
+ for _, table := range f.tables {
+ if err := table.truncate(items); err != nil {
+ return err
+ }
+ }
+ atomic.StoreUint64(&f.frozen, items)
+ return nil
+}
+
+// sync flushes all data tables to disk.
+func (f *freezer) Sync() error {
+ var errs []error
+ for _, table := range f.tables {
+ if err := table.Sync(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ if errs != nil {
+ return fmt.Errorf("%v", errs)
+ }
+ return nil
+}
+
+// freeze is a background thread that periodically checks the blockchain for any
+// import progress and moves ancient data from the fast database into the freezer.
+//
+// This functionality is deliberately broken off from block importing to avoid
+// incurring additional data shuffling delays on block propagation.
+func (f *freezer) freeze(db ethdb.KeyValueStore) {
+ nfdb := &nofreezedb{KeyValueStore: db}
+
+ for {
+ // Retrieve the freezing threshold.
+ hash := ReadHeadBlockHash(nfdb)
+ if hash == (common.Hash{}) {
+ log.Debug("Current full block hash unavailable") // new chain, empty database
+ time.Sleep(freezerRecheckInterval)
+ continue
+ }
+ number := ReadHeaderNumber(nfdb, hash)
+ switch {
+ case number == nil:
+ log.Error("Current full block number unavailable", "hash", hash)
+ time.Sleep(freezerRecheckInterval)
+ continue
+
+ case *number < params.ImmutabilityThreshold:
+ log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
+ time.Sleep(freezerRecheckInterval)
+ continue
+
+ case *number-params.ImmutabilityThreshold <= f.frozen:
+ log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
+ time.Sleep(freezerRecheckInterval)
+ continue
+ }
+ head := ReadHeader(nfdb, hash, *number)
+ if head == nil {
+ log.Error("Current full block unavailable", "number", *number, "hash", hash)
+ time.Sleep(freezerRecheckInterval)
+ continue
+ }
+ // Seems we have data ready to be frozen, process in usable batches
+ limit := *number - params.ImmutabilityThreshold
+ if limit-f.frozen > freezerBatchLimit {
+ limit = f.frozen + freezerBatchLimit
+ }
+ var (
+ start = time.Now()
+ first = f.frozen
+ ancients = make([]common.Hash, 0, limit)
+ )
+ for f.frozen < limit {
+ // Retrieves all the components of the canonical block
+ hash := ReadCanonicalHash(nfdb, f.frozen)
+ if hash == (common.Hash{}) {
+ log.Error("Canonical hash missing, can't freeze", "number", f.frozen)
+ break
+ }
+ header := ReadHeaderRLP(nfdb, hash, f.frozen)
+ if len(header) == 0 {
+ log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash)
+ break
+ }
+ body := ReadBodyRLP(nfdb, hash, f.frozen)
+ if len(body) == 0 {
+ log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash)
+ break
+ }
+ receipts := ReadReceiptsRLP(nfdb, hash, f.frozen)
+ if len(receipts) == 0 {
+ log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash)
+ break
+ }
+ td := ReadTdRLP(nfdb, hash, f.frozen)
+ if len(td) == 0 {
+ log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash)
+ break
+ }
+ log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash)
+ // Inject all the components into the relevant data tables
+ if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil {
+ break
+ }
+ ancients = append(ancients, hash)
+ }
+ // Batch of blocks have been frozen, flush them before wiping from leveldb
+ if err := f.Sync(); err != nil {
+ log.Crit("Failed to flush frozen tables", "err", err)
+ }
+ // Wipe out all data from the active database
+ batch := db.NewBatch()
+ for i := 0; i < len(ancients); i++ {
+ DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i))
+ DeleteCanonicalHash(batch, first+uint64(i))
+ }
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to delete frozen canonical blocks", "err", err)
+ }
+ batch.Reset()
+ // Wipe out side chain also.
+ for number := first; number < f.frozen; number++ {
+ for _, hash := range ReadAllHashes(db, number) {
+ DeleteBlock(batch, hash, number)
+ }
+ }
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to delete frozen side blocks", "err", err)
+ }
+ // Log something friendly for the user
+ context := []interface{}{
+ "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1,
+ }
+ if n := len(ancients); n > 0 {
+ context = append(context, []interface{}{"hash", ancients[n-1]}...)
+ }
+ log.Info("Deep froze chain segment", context...)
+
+ // Avoid database thrashing with tiny writes
+ if f.frozen-first < freezerBatchLimit {
+ time.Sleep(freezerRecheckInterval)
+ }
+ }
+}
+
+// repair truncates all data tables to the same length.
+func (f *freezer) repair() error {
+ min := uint64(math.MaxUint64)
+ for _, table := range f.tables {
+ items := atomic.LoadUint64(&table.items)
+ if min > items {
+ min = items
+ }
+ }
+ for _, table := range f.tables {
+ if err := table.truncate(min); err != nil {
+ return err
+ }
+ }
+ atomic.StoreUint64(&f.frozen, min)
+ return nil
+}
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
new file mode 100644
index 000000000..673a181e4
--- /dev/null
+++ b/core/rawdb/freezer_table.go
@@ -0,0 +1,561 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rawdb
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sync"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/golang/snappy"
+)
+
+var (
+ // errClosed is returned if an operation attempts to read from or write to the
+ // freezer table after it has already been closed.
+ errClosed = errors.New("closed")
+
+ // errOutOfBounds is returned if the item requested is not contained within the
+ // freezer table.
+ errOutOfBounds = errors.New("out of bounds")
+
+ // errNotSupported is returned if the database doesn't support the required operation.
+ errNotSupported = errors.New("this operation is not supported")
+)
+
+// indexEntry contains the number/id of the file that the data resides in, aswell as the
+// offset within the file to the end of the data
+// In serialized form, the filenum is stored as uint16.
+type indexEntry struct {
+ filenum uint32 // stored as uint16 ( 2 bytes)
+ offset uint32 // stored as uint32 ( 4 bytes)
+}
+
+const indexEntrySize = 6
+
+// unmarshallBinary deserializes binary b into the rawIndex entry.
+func (i *indexEntry) unmarshalBinary(b []byte) error {
+ i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
+ i.offset = binary.BigEndian.Uint32(b[2:6])
+ return nil
+}
+
+// marshallBinary serializes the rawIndex entry into binary.
+func (i *indexEntry) marshallBinary() []byte {
+ b := make([]byte, indexEntrySize)
+ binary.BigEndian.PutUint16(b[:2], uint16(i.filenum))
+ binary.BigEndian.PutUint32(b[2:6], i.offset)
+ return b
+}
+
+// freezerTable represents a single chained data table within the freezer (e.g. blocks).
+// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
+// file (uncompressed 64 bit indices into the data file).
+type freezerTable struct {
+ noCompression bool // if true, disables snappy compression. Note: does not work retroactively
+ maxFileSize uint32 // Max file size for data-files
+ name string
+ path string
+
+ head *os.File // File descriptor for the data head of the table
+ files map[uint32]*os.File // open files
+ headId uint32 // number of the currently active head file
+ tailId uint32 // number of the earliest file
+ index *os.File // File descriptor for the indexEntry file of the table
+
+ // In the case that old items are deleted (from the tail), we use itemOffset
+ // to count how many historic items have gone missing.
+ items uint64 // Number of items stored in the table (including items removed from tail)
+ itemOffset uint32 // Offset (number of discarded items)
+
+ headBytes uint32 // Number of bytes written to the head file
+ readMeter metrics.Meter // Meter for measuring the effective amount of data read
+ writeMeter metrics.Meter // Meter for measuring the effective amount of data written
+
+ logger log.Logger // Logger with database path and table name ambedded
+ lock sync.RWMutex // Mutex protecting the data file descriptors
+}
+
+// newTable opens a freezer table with default settings - 2G files
+func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) {
+ return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
+}
+
+// newCustomTable opens a freezer table, creating the data and index files if they are
+// non existent. Both files are truncated to the shortest common length to ensure
+// they don't go out of sync.
+func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
+ // Ensure the containing directory exists and open the indexEntry file
+ if err := os.MkdirAll(path, 0755); err != nil {
+ return nil, err
+ }
+ var idxName string
+ if noCompression {
+ // raw idx
+ idxName = fmt.Sprintf("%s.ridx", name)
+ } else {
+ // compressed idx
+ idxName = fmt.Sprintf("%s.cidx", name)
+ }
+ offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
+ if err != nil {
+ return nil, err
+ }
+ // Create the table and repair any past inconsistency
+ tab := &freezerTable{
+ index: offsets,
+ files: make(map[uint32]*os.File),
+ readMeter: readMeter,
+ writeMeter: writeMeter,
+ name: name,
+ path: path,
+ logger: log.New("database", path, "table", name),
+ noCompression: noCompression,
+ maxFileSize: maxFilesize,
+ }
+ if err := tab.repair(); err != nil {
+ tab.Close()
+ return nil, err
+ }
+ return tab, nil
+}
+
+// repair cross checks the head and the index file and truncates them to
+// be in sync with each other after a potential crash / data loss.
+func (t *freezerTable) repair() error {
+ // Create a temporary offset buffer to init files with and read indexEntry into
+ buffer := make([]byte, indexEntrySize)
+
+ // If we've just created the files, initialize the index with the 0 indexEntry
+ stat, err := t.index.Stat()
+ if err != nil {
+ return err
+ }
+ if stat.Size() == 0 {
+ if _, err := t.index.Write(buffer); err != nil {
+ return err
+ }
+ }
+ // Ensure the index is a multiple of indexEntrySize bytes
+ if overflow := stat.Size() % indexEntrySize; overflow != 0 {
+ t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path
+ }
+ // Retrieve the file sizes and prepare for truncation
+ if stat, err = t.index.Stat(); err != nil {
+ return err
+ }
+ offsetsSize := stat.Size()
+
+ // Open the head file
+ var (
+ firstIndex indexEntry
+ lastIndex indexEntry
+ contentSize int64
+ contentExp int64
+ )
+ // Read index zero, determine what file is the earliest
+ // and what item offset to use
+ t.index.ReadAt(buffer, 0)
+ firstIndex.unmarshalBinary(buffer)
+
+ t.tailId = firstIndex.offset
+ t.itemOffset = firstIndex.filenum
+
+ t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
+ lastIndex.unmarshalBinary(buffer)
+ t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ if err != nil {
+ return err
+ }
+ if stat, err = t.head.Stat(); err != nil {
+ return err
+ }
+ contentSize = stat.Size()
+
+ // Keep truncating both files until they come in sync
+ contentExp = int64(lastIndex.offset)
+
+ for contentExp != contentSize {
+ // Truncate the head file to the last offset pointer
+ if contentExp < contentSize {
+ t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
+ if err := t.head.Truncate(contentExp); err != nil {
+ return err
+ }
+ contentSize = contentExp
+ }
+ // Truncate the index to point within the head file
+ if contentExp > contentSize {
+ t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
+ if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil {
+ return err
+ }
+ offsetsSize -= indexEntrySize
+ t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
+ var newLastIndex indexEntry
+ newLastIndex.unmarshalBinary(buffer)
+ // We might have slipped back into an earlier head-file here
+ if newLastIndex.filenum != lastIndex.filenum {
+ // release earlier opened file
+ t.releaseFile(lastIndex.filenum)
+ t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ if stat, err = t.head.Stat(); err != nil {
+ // TODO, anything more we can do here?
+ // A data file has gone missing...
+ return err
+ }
+ contentSize = stat.Size()
+ }
+ lastIndex = newLastIndex
+ contentExp = int64(lastIndex.offset)
+ }
+ }
+ // Ensure all reparation changes have been written to disk
+ if err := t.index.Sync(); err != nil {
+ return err
+ }
+ if err := t.head.Sync(); err != nil {
+ return err
+ }
+ // Update the item and byte counters and return
+ t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
+ t.headBytes = uint32(contentSize)
+ t.headId = lastIndex.filenum
+
+ // Close opened files and preopen all files
+ if err := t.preopen(); err != nil {
+ return err
+ }
+ t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.headBytes))
+ return nil
+}
+
+// preopen opens all files that the freezer will need. This method should be called from an init-context,
+// since it assumes that it doesn't have to bother with locking
+// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
+// obtain a write-lock within Retrieve.
+func (t *freezerTable) preopen() (err error) {
+ // The repair might have already opened (some) files
+ t.releaseFilesAfter(0, false)
+ // Open all except head in RDONLY
+ for i := t.tailId; i < t.headId; i++ {
+ if _, err = t.openFile(i, os.O_RDONLY); err != nil {
+ return err
+ }
+ }
+ // Open head in read/write
+ t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ return err
+}
+
+// truncate discards any recent data above the provided threashold number.
+func (t *freezerTable) truncate(items uint64) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // If our item count is correct, don't do anything
+ if atomic.LoadUint64(&t.items) <= items {
+ return nil
+ }
+ // Something's out of sync, truncate the table's offset index
+ t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
+ if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil {
+ return err
+ }
+ // Calculate the new expected size of the data file and truncate it
+ buffer := make([]byte, indexEntrySize)
+ if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil {
+ return err
+ }
+ var expected indexEntry
+ expected.unmarshalBinary(buffer)
+
+ // We might need to truncate back to older files
+ if expected.filenum != t.headId {
+ // If already open for reading, force-reopen for writing
+ t.releaseFile(expected.filenum)
+ newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ if err != nil {
+ return err
+ }
+ // release any files _after the current head -- both the previous head
+ // and any files which may have been opened for reading
+ t.releaseFilesAfter(expected.filenum, true)
+ // set back the historic head
+ t.head = newHead
+ atomic.StoreUint32(&t.headId, expected.filenum)
+ }
+ if err := t.head.Truncate(int64(expected.offset)); err != nil {
+ return err
+ }
+ // All data files truncated, set internal counters and return
+ atomic.StoreUint64(&t.items, items)
+ atomic.StoreUint32(&t.headBytes, expected.offset)
+ return nil
+}
+
+// Close closes all opened files.
+func (t *freezerTable) Close() error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ var errs []error
+ if err := t.index.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ t.index = nil
+
+ for _, f := range t.files {
+ if err := f.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ t.head = nil
+
+ if errs != nil {
+ return fmt.Errorf("%v", errs)
+ }
+ return nil
+}
+
+// openFile assumes that the write-lock is held by the caller
+func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
+ var exist bool
+ if f, exist = t.files[num]; !exist {
+ var name string
+ if t.noCompression {
+ name = fmt.Sprintf("%s.%04d.rdat", t.name, num)
+ } else {
+ name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
+ }
+ f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644)
+ if err != nil {
+ return nil, err
+ }
+ t.files[num] = f
+ }
+ return f, err
+}
+
+// releaseFile closes a file, and removes it from the open file cache.
+// Assumes that the caller holds the write lock
+func (t *freezerTable) releaseFile(num uint32) {
+ if f, exist := t.files[num]; exist {
+ delete(t.files, num)
+ f.Close()
+ }
+}
+
+// releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files
+func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
+ for fnum, f := range t.files {
+ if fnum > num {
+ delete(t.files, fnum)
+ f.Close()
+ if remove {
+ os.Remove(f.Name())
+ }
+ }
+ }
+}
+
+// Append injects a binary blob at the end of the freezer table. The item number
+// is a precautionary parameter to ensure data correctness, but the table will
+// reject already existing data.
+//
+// Note, this method will *not* flush any data to disk so be sure to explicitly
+// fsync before irreversibly deleting data from the database.
+func (t *freezerTable) Append(item uint64, blob []byte) error {
+ // Read lock prevents competition with truncate
+ t.lock.RLock()
+ // Ensure the table is still accessible
+ if t.index == nil || t.head == nil {
+ t.lock.RUnlock()
+ return errClosed
+ }
+ // Ensure only the next item can be written, nothing else
+ if atomic.LoadUint64(&t.items) != item {
+ t.lock.RUnlock()
+ return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
+ }
+ // Encode the blob and write it into the data file
+ if !t.noCompression {
+ blob = snappy.Encode(nil, blob)
+ }
+ bLen := uint32(len(blob))
+ if t.headBytes+bLen < bLen ||
+ t.headBytes+bLen > t.maxFileSize {
+ // we need a new file, writing would overflow
+ t.lock.RUnlock()
+ t.lock.Lock()
+ nextId := atomic.LoadUint32(&t.headId) + 1
+ // We open the next file in truncated mode -- if this file already
+ // exists, we need to start over from scratch on it
+ newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC)
+ if err != nil {
+ t.lock.Unlock()
+ return err
+ }
+ // Close old file, and reopen in RDONLY mode
+ t.releaseFile(t.headId)
+ t.openFile(t.headId, os.O_RDONLY)
+
+ // Swap out the current head
+ t.head = newHead
+ atomic.StoreUint32(&t.headBytes, 0)
+ atomic.StoreUint32(&t.headId, nextId)
+ t.lock.Unlock()
+ t.lock.RLock()
+ }
+
+ defer t.lock.RUnlock()
+
+ if _, err := t.head.Write(blob); err != nil {
+ return err
+ }
+ newOffset := atomic.AddUint32(&t.headBytes, bLen)
+ idx := indexEntry{
+ filenum: atomic.LoadUint32(&t.headId),
+ offset: newOffset,
+ }
+ // Write indexEntry
+ t.index.Write(idx.marshallBinary())
+ t.writeMeter.Mark(int64(bLen + indexEntrySize))
+ atomic.AddUint64(&t.items, 1)
+ return nil
+}
+
+// getBounds returns the indexes for the item
+// returns start, end, filenumber and error
+func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
+ var startIdx, endIdx indexEntry
+ buffer := make([]byte, indexEntrySize)
+ if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
+ return 0, 0, 0, err
+ }
+ startIdx.unmarshalBinary(buffer)
+ if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
+ return 0, 0, 0, err
+ }
+ endIdx.unmarshalBinary(buffer)
+ if startIdx.filenum != endIdx.filenum {
+ // If a piece of data 'crosses' a data-file,
+ // it's actually in one piece on the second data-file.
+ // We return a zero-indexEntry for the second file as start
+ return 0, endIdx.offset, endIdx.filenum, nil
+ }
+ return startIdx.offset, endIdx.offset, endIdx.filenum, nil
+}
+
+// Retrieve looks up the data offset of an item with the given number and retrieves
+// the raw binary blob from the data file.
+func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
+ // Ensure the table and the item is accessible
+ if t.index == nil || t.head == nil {
+ return nil, errClosed
+ }
+ if atomic.LoadUint64(&t.items) <= item {
+ return nil, errOutOfBounds
+ }
+ // Ensure the item was not deleted from the tail either
+ offset := atomic.LoadUint32(&t.itemOffset)
+ if uint64(offset) > item {
+ return nil, errOutOfBounds
+ }
+ t.lock.RLock()
+ startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset))
+ if err != nil {
+ t.lock.RUnlock()
+ return nil, err
+ }
+ dataFile, exist := t.files[filenum]
+ if !exist {
+ t.lock.RUnlock()
+ return nil, fmt.Errorf("missing data file %d", filenum)
+ }
+ // Retrieve the data itself, decompress and return
+ blob := make([]byte, endOffset-startOffset)
+ if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
+ t.lock.RUnlock()
+ return nil, err
+ }
+ t.lock.RUnlock()
+ t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize))
+
+ if t.noCompression {
+ return blob, nil
+ }
+ return snappy.Decode(nil, blob)
+}
+
+// has returns an indicator whether the specified number data
+// exists in the freezer table.
+func (t *freezerTable) has(number uint64) bool {
+ return atomic.LoadUint64(&t.items) > number
+}
+
+// size returns the total data size in the freezer table.
+func (t *freezerTable) size() (uint64, error) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ stat, err := t.index.Stat()
+ if err != nil {
+ return 0, err
+ }
+ total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size())
+ return total, nil
+}
+
+// Sync pushes any pending data from memory out to disk. This is an expensive
+// operation, so use it with care.
+func (t *freezerTable) Sync() error {
+ if err := t.index.Sync(); err != nil {
+ return err
+ }
+ return t.head.Sync()
+}
+
+// printIndex is a debug print utility function for testing
+func (t *freezerTable) printIndex() {
+ buf := make([]byte, indexEntrySize)
+
+ fmt.Printf("|-----------------|\n")
+ fmt.Printf("| fileno | offset |\n")
+ fmt.Printf("|--------+--------|\n")
+
+ for i := uint64(0); ; i++ {
+ if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
+ break
+ }
+ var entry indexEntry
+ entry.unmarshalBinary(buf)
+ fmt.Printf("| %03d | %03d | \n", entry.filenum, entry.offset)
+ if i > 100 {
+ fmt.Printf(" ... \n")
+ break
+ }
+ }
+ fmt.Printf("|-----------------|\n")
+}
diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go
new file mode 100644
index 000000000..e63fb63a3
--- /dev/null
+++ b/core/rawdb/freezer_table_test.go
@@ -0,0 +1,609 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rawdb
+
+import (
+ "bytes"
+ "fmt"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
+)
+
+func init() {
+ rand.Seed(time.Now().Unix())
+}
+
+// Gets a chunk of data, filled with 'b'
+func getChunk(size int, b int) []byte {
+ data := make([]byte, size)
+ for i := range data {
+ data[i] = byte(b)
+ }
+ return data
+}
+
+func print(t *testing.T, f *freezerTable, item uint64) {
+ a, err := f.Retrieve(item)
+ if err != nil {
+ t.Fatal(err)
+ }
+ fmt.Printf("db[%d] = %x\n", item, a)
+}
+
+// TestFreezerBasics test initializing a freezertable from scratch, writing to the table,
+// and reading it back.
+func TestFreezerBasics(t *testing.T) {
+ t.Parallel()
+ // set cutoff at 50 bytes
+ f, err := newCustomTable(os.TempDir(),
+ fmt.Sprintf("unittest-%d", rand.Uint64()),
+ metrics.NewMeter(), metrics.NewMeter(), 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ // Write 15 bytes 255 times, results in 85 files
+ for x := 0; x < 255; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+
+ //print(t, f, 0)
+ //print(t, f, 1)
+ //print(t, f, 2)
+ //
+ //db[0] = 000000000000000000000000000000
+ //db[1] = 010101010101010101010101010101
+ //db[2] = 020202020202020202020202020202
+
+ for y := 0; y < 255; y++ {
+ exp := getChunk(15, y)
+ got, err := f.Retrieve(uint64(y))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, exp) {
+ t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
+ }
+ }
+ // Check that we cannot read too far
+ _, err = f.Retrieve(uint64(255))
+ if err != errOutOfBounds {
+ t.Fatal(err)
+ }
+}
+
+// TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between
+// every operation
+func TestFreezerBasicsClosing(t *testing.T) {
+ t.Parallel()
+ // set cutoff at 50 bytes
+ var (
+ fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
+ m1, m2 = metrics.NewMeter(), metrics.NewMeter()
+ f *freezerTable
+ err error
+ )
+ f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 255 times, results in 85 files
+ for x := 0; x < 255; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ f.Close()
+ f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ }
+ defer f.Close()
+
+ for y := 0; y < 255; y++ {
+ exp := getChunk(15, y)
+ got, err := f.Retrieve(uint64(y))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, exp) {
+ t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
+ }
+ f.Close()
+ f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed
+func TestFreezerRepairDanglingHead(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
+
+ { // Fill table
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 255 times
+ for x := 0; x < 255; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ // The last item should be there
+ if _, err = f.Retrieve(0xfe); err != nil {
+ t.Fatal(err)
+ }
+ f.Close()
+ }
+ // open the index
+ idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
+ if err != nil {
+ t.Fatalf("Failed to open index file: %v", err)
+ }
+ // Remove 4 bytes
+ stat, err := idxFile.Stat()
+ if err != nil {
+ t.Fatalf("Failed to stat index file: %v", err)
+ }
+ idxFile.Truncate(stat.Size() - 4)
+ idxFile.Close()
+ // Now open it again
+ {
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ // The last item should be missing
+ if _, err = f.Retrieve(0xff); err == nil {
+ t.Errorf("Expected error for missing index entry")
+ }
+ // The one before should still be there
+ if _, err = f.Retrieve(0xfd); err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+ }
+}
+
+// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed
+func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
+
+ { // Fill a table and close it
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 255 times
+ for x := 0; x < 0xff; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ // The last item should be there
+ if _, err = f.Retrieve(f.items - 1); err == nil {
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ f.Close()
+ }
+ // open the index
+ idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
+ if err != nil {
+ t.Fatalf("Failed to open index file: %v", err)
+ }
+ // Remove everything but the first item, and leave data unaligned
+ // 0-indexEntry, 1-indexEntry, corrupt-indexEntry
+ idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2)
+ idxFile.Close()
+ // Now open it again
+ {
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ // The first item should be there
+ if _, err = f.Retrieve(0); err != nil {
+ t.Fatal(err)
+ }
+ // The second item should be missing
+ if _, err = f.Retrieve(1); err == nil {
+ t.Errorf("Expected error for missing index entry")
+ }
+ // We should now be able to store items again, from item = 1
+ for x := 1; x < 0xff; x++ {
+ data := getChunk(15, ^x)
+ f.Append(uint64(x), data)
+ }
+ f.Close()
+ }
+ // And if we open it, we should now be able to read all of them (new values)
+ {
+ f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ for y := 1; y < 255; y++ {
+ exp := getChunk(15, ^y)
+ got, err := f.Retrieve(uint64(y))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, exp) {
+ t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
+ }
+ }
+ }
+}
+
+// TestSnappyDetection tests that we fail to open a snappy database and vice versa
+func TestSnappyDetection(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
+ // Open with snappy
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 255 times
+ for x := 0; x < 0xff; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ f.Close()
+ }
+ // Open without snappy
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false)
+ if _, err = f.Retrieve(0); err == nil {
+ f.Close()
+ t.Fatalf("expected empty table")
+ }
+ }
+
+ // Open with snappy
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ // There should be 255 items
+ if _, err = f.Retrieve(0xfe); err != nil {
+ f.Close()
+ t.Fatalf("expected no error, got %v", err)
+ }
+ }
+
+}
+func assertFileSize(f string, size int64) error {
+ stat, err := os.Stat(f)
+ if err != nil {
+ return err
+ }
+ if stat.Size() != size {
+ return fmt.Errorf("error, expected size %d, got %d", size, stat.Size())
+ }
+ return nil
+
+}
+
+// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data,
+// the index is repaired
+func TestFreezerRepairDanglingIndex(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
+
+ { // Fill a table and close it
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 9 times : 150 bytes
+ for x := 0; x < 9; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ // The last item should be there
+ if _, err = f.Retrieve(f.items - 1); err != nil {
+ f.Close()
+ t.Fatal(err)
+ }
+ f.Close()
+ // File sizes should be 45, 45, 45 : items[3, 3, 3)
+ }
+ // Crop third file
+ fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname))
+ // Truncate third file: 45 ,45, 20
+ {
+ if err := assertFileSize(fileToCrop, 45); err != nil {
+ t.Fatal(err)
+ }
+ file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644)
+ if err != nil {
+ t.Fatal(err)
+ }
+ file.Truncate(20)
+ file.Close()
+ }
+ // Open db it again
+ // It should restore the file(s) to
+ // 45, 45, 15
+ // with 3+3+1 items
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if f.items != 7 {
+ f.Close()
+ t.Fatalf("expected %d items, got %d", 7, f.items)
+ }
+ if err := assertFileSize(fileToCrop, 15); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func TestFreezerTruncate(t *testing.T) {
+
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("truncation-%d", rand.Uint64())
+
+ { // Fill table
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 30 times
+ for x := 0; x < 30; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ // The last item should be there
+ if _, err = f.Retrieve(f.items - 1); err != nil {
+ t.Fatal(err)
+ }
+ f.Close()
+ }
+ // Reopen, truncate
+ {
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ f.truncate(10) // 150 bytes
+ if f.items != 10 {
+ t.Fatalf("expected %d items, got %d", 10, f.items)
+ }
+ // 45, 45, 45, 15 -- bytes should be 15
+ if f.headBytes != 15 {
+ t.Fatalf("expected %d bytes, got %d", 15, f.headBytes)
+ }
+
+ }
+
+}
+
+// TestFreezerRepairFirstFile tests a head file with the very first item only half-written.
+// That will rewind the index, and _should_ truncate the head file
+func TestFreezerRepairFirstFile(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
+ { // Fill table
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 80 bytes, splitting out into two files
+ f.Append(0, getChunk(40, 0xFF))
+ f.Append(1, getChunk(40, 0xEE))
+ // The last item should be there
+ if _, err = f.Retrieve(f.items - 1); err != nil {
+ t.Fatal(err)
+ }
+ f.Close()
+ }
+ // Truncate the file in half
+ fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname))
+ {
+ if err := assertFileSize(fileToCrop, 40); err != nil {
+ t.Fatal(err)
+ }
+ file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644)
+ if err != nil {
+ t.Fatal(err)
+ }
+ file.Truncate(20)
+ file.Close()
+ }
+ // Reopen
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if f.items != 1 {
+ f.Close()
+ t.Fatalf("expected %d items, got %d", 0, f.items)
+ }
+ // Write 40 bytes
+ f.Append(1, getChunk(40, 0xDD))
+ f.Close()
+ // Should have been truncated down to zero and then 40 written
+ if err := assertFileSize(fileToCrop, 40); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// TestFreezerReadAndTruncate tests:
+// - we have a table open
+// - do some reads, so files are open in readonly
+// - truncate so those files are 'removed'
+// - check that we did not keep the rdonly file descriptors
+func TestFreezerReadAndTruncate(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
+ { // Fill table
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 15 bytes 30 times
+ for x := 0; x < 30; x++ {
+ data := getChunk(15, x)
+ f.Append(uint64(x), data)
+ }
+ // The last item should be there
+ if _, err = f.Retrieve(f.items - 1); err != nil {
+ t.Fatal(err)
+ }
+ f.Close()
+ }
+ // Reopen and read all files
+ {
+ f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if f.items != 30 {
+ f.Close()
+ t.Fatalf("expected %d items, got %d", 0, f.items)
+ }
+ for y := byte(0); y < 30; y++ {
+ f.Retrieve(uint64(y))
+ }
+ // Now, truncate back to zero
+ f.truncate(0)
+ // Write the data again
+ for x := 0; x < 30; x++ {
+ data := getChunk(15, ^x)
+ if err := f.Append(uint64(x), data); err != nil {
+ t.Fatalf("error %v", err)
+ }
+ }
+ f.Close()
+ }
+}
+
+func TestOffset(t *testing.T) {
+ t.Parallel()
+ wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ fname := fmt.Sprintf("offset-%d", rand.Uint64())
+ { // Fill table
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write 6 x 20 bytes, splitting out into three files
+ f.Append(0, getChunk(20, 0xFF))
+ f.Append(1, getChunk(20, 0xEE))
+
+ f.Append(2, getChunk(20, 0xdd))
+ f.Append(3, getChunk(20, 0xcc))
+
+ f.Append(4, getChunk(20, 0xbb))
+ f.Append(5, getChunk(20, 0xaa))
+ f.printIndex()
+ f.Close()
+ }
+ // Now crop it.
+ {
+ // delete files 0 and 1
+ for i := 0; i < 2; i++ {
+ p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i))
+ if err := os.Remove(p); err != nil {
+ t.Fatal(err)
+ }
+ }
+ // Read the index file
+ p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
+ indexFile, err := os.OpenFile(p, os.O_RDWR, 0644)
+ if err != nil {
+ t.Fatal(err)
+ }
+ indexBuf := make([]byte, 7*indexEntrySize)
+ indexFile.Read(indexBuf)
+
+ // Update the index file, so that we store
+ // [ file = 2, offset = 4 ] at index zero
+
+ tailId := uint32(2) // First file is 2
+ itemOffset := uint32(4) // We have removed four items
+ zeroIndex := indexEntry{
+ offset: tailId,
+ filenum: itemOffset,
+ }
+ buf := zeroIndex.marshallBinary()
+ // Overwrite index zero
+ copy(indexBuf, buf)
+ // Remove the four next indices by overwriting
+ copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:])
+ indexFile.WriteAt(indexBuf, 0)
+ // Need to truncate the moved index items
+ indexFile.Truncate(indexEntrySize * (1 + 2))
+ indexFile.Close()
+
+ }
+ // Now open again
+ {
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ f.printIndex()
+ // It should allow writing item 6
+ f.Append(6, getChunk(20, 0x99))
+
+ // It should be fine to fetch 4,5,6
+ if got, err := f.Retrieve(4); err != nil {
+ t.Fatal(err)
+ } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) {
+ t.Fatalf("expected %x got %x", exp, got)
+ }
+ if got, err := f.Retrieve(5); err != nil {
+ t.Fatal(err)
+ } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) {
+ t.Fatalf("expected %x got %x", exp, got)
+ }
+ if got, err := f.Retrieve(6); err != nil {
+ t.Fatal(err)
+ } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) {
+ t.Fatalf("expected %x got %x", exp, got)
+ }
+
+ // It should error at 0, 1,2,3
+ for i := 0; i < 4; i++ {
+ if _, err := f.Retrieve(uint64(i)); err == nil {
+ t.Fatal("expected err")
+ }
+ }
+ }
+}
+
+// TODO (?)
+// - test that if we remove several head-files, aswell as data last data-file,
+// the index is truncated accordingly
+// Right now, the freezer would fail on these conditions:
+// 1. have data files d0, d1, d2, d3
+// 2. remove d2,d3
+//
+// However, all 'normal' failure modes arising due to failing to sync() or save a file should be
+// handled already, and the case described above can only (?) happen if an external process/user
+// deletes files from the filesystem.
diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go
index 62b60e2f3..a44a2c99f 100644
--- a/core/rawdb/schema.go
+++ b/core/rawdb/schema.go
@@ -63,6 +63,33 @@ var (
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
)
+const (
+ // freezerHeaderTable indicates the name of the freezer header table.
+ freezerHeaderTable = "headers"
+
+ // freezerHashTable indicates the name of the freezer canonical hash table.
+ freezerHashTable = "hashes"
+
+ // freezerBodiesTable indicates the name of the freezer block body table.
+ freezerBodiesTable = "bodies"
+
+ // freezerReceiptTable indicates the name of the freezer receipts table.
+ freezerReceiptTable = "receipts"
+
+ // freezerDifficultyTable indicates the name of the freezer total difficulty table.
+ freezerDifficultyTable = "diffs"
+)
+
+// freezerNoSnappy configures whether compression is disabled for the ancient-tables.
+// Hashes and difficulties don't compress well.
+var freezerNoSnappy = map[string]bool{
+ freezerHeaderTable: false,
+ freezerHashTable: true,
+ freezerBodiesTable: false,
+ freezerReceiptTable: false,
+ freezerDifficultyTable: true,
+}
+
// LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary
// fields.
type LegacyTxLookupEntry struct {
diff --git a/core/rawdb/table.go b/core/rawdb/table.go
index 0e50db7c9..6610b7f5a 100644
--- a/core/rawdb/table.go
+++ b/core/rawdb/table.go
@@ -50,6 +50,48 @@ func (t *table) Get(key []byte) ([]byte, error) {
return t.db.Get(append([]byte(t.prefix), key...))
}
+// HasAncient is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) HasAncient(kind string, number uint64) (bool, error) {
+ return t.db.HasAncient(kind, number)
+}
+
+// Ancient is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) Ancient(kind string, number uint64) ([]byte, error) {
+ return t.db.Ancient(kind, number)
+}
+
+// Ancients is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) Ancients() (uint64, error) {
+ return t.db.Ancients()
+}
+
+// AncientSize is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) AncientSize(kind string) (uint64, error) {
+ return t.db.AncientSize(kind)
+}
+
+// AppendAncient is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
+ return t.db.AppendAncient(number, hash, header, body, receipts, td)
+}
+
+// TruncateAncients is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) TruncateAncients(items uint64) error {
+ return t.db.TruncateAncients(items)
+}
+
+// Sync is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) Sync() error {
+ return t.db.Sync()
+}
+
// Put inserts the given value into the database at a prefixed version of the
// provided key.
func (t *table) Put(key []byte, value []byte) error {
@@ -157,6 +199,6 @@ func (b *tableBatch) Reset() {
}
// Replay replays the batch contents.
-func (b *tableBatch) Replay(w ethdb.Writer) error {
+func (b *tableBatch) Replay(w ethdb.KeyValueWriter) error {
return b.batch.Replay(w)
}
diff --git a/core/state/database.go b/core/state/database.go
index 8798b7380..ecc2c134d 100644
--- a/core/state/database.go
+++ b/core/state/database.go
@@ -93,7 +93,7 @@ type Trie interface {
// If the trie does not contain a value for key, the returned proof contains all
// nodes of the longest existing prefix of the key (at least the root), ending
// with the node that proves the absence of the key.
- Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error
+ Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error
}
// NewDatabase creates a backing store for state. The returned database is safe for
diff --git a/core/state/sync.go b/core/state/sync.go
index e4a08d293..ef7930527 100644
--- a/core/state/sync.go
+++ b/core/state/sync.go
@@ -26,7 +26,7 @@ import (
)
// NewStateSync create a new state trie download scheduler.
-func NewStateSync(root common.Hash, database ethdb.Reader, bloom *trie.SyncBloom) *trie.Sync {
+func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync {
var syncer *trie.Sync
callback := func(leaf []byte, parent common.Hash) error {
var obj Account