From 006c21efc7af8bdf04d003ef256d8e2eb30006bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 8 Mar 2019 15:56:20 +0200 Subject: cmd, core, eth, les, node: chain freezer on top of db rework --- core/rawdb/accessors_chain.go | 82 +++++++++--- core/rawdb/accessors_indexes.go | 4 +- core/rawdb/database.go | 57 +++++++- core/rawdb/freezer.go | 276 ++++++++++++++++++++++++++++++++++++++ core/rawdb/freezer_table.go | 284 ++++++++++++++++++++++++++++++++++++++++ core/rawdb/table.go | 6 + 6 files changed, 683 insertions(+), 26 deletions(-) create mode 100644 core/rawdb/freezer.go create mode 100644 core/rawdb/freezer_table.go (limited to 'core') diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index cc0591a4c..103f18f78 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -30,8 +30,11 @@ import ( ) // ReadCanonicalHash retrieves the hash assigned to a canonical block number. -func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { - data, _ := db.Get(headerHashKey(number)) +func ReadCanonicalHash(db ethdb.AncientReader, number uint64) common.Hash { + data, _ := db.Ancient("hashes", number) + if len(data) == 0 { + data, _ = db.Get(headerHashKey(number)) + } if len(data) == 0 { return common.Hash{} } @@ -52,6 +55,24 @@ func DeleteCanonicalHash(db ethdb.Writer, number uint64) { } } +// readAllHashes retrieves all the hashes assigned to blocks at a certain heights, +// both canonical and reorged forks included. +// +// This method is a helper for the chain reader. It should never be exposed to the +// outside world. +func readAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { + prefix := headerKeyPrefix(number) + + hashes := make([]common.Hash, 0, 1) + it := db.NewIteratorWithPrefix(prefix) + for it.Next() { + if key := it.Key(); len(key) == len(prefix)+32 { + hashes = append(hashes, common.BytesToHash(key[len(key)-32:])) + } + } + return hashes +} + // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) @@ -129,13 +150,19 @@ func WriteFastTrieProgress(db ethdb.Writer, count uint64) { } // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. -func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Get(headerKey(number, hash)) +func ReadHeaderRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient("headers", number) + if len(data) == 0 { + data, _ = db.Get(headerKey(number, hash)) + } return data } // HasHeader verifies the existence of a block header corresponding to the hash. -func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { +func HasHeader(db ethdb.AncientReader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { + return true + } if has, err := db.Has(headerKey(number, hash)); !has || err != nil { return false } @@ -143,7 +170,7 @@ func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { } // ReadHeader retrieves the block header corresponding to the hash. -func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header { +func ReadHeader(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Header { data := ReadHeaderRLP(db, hash, number) if len(data) == 0 { return nil @@ -197,8 +224,11 @@ func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) } // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. -func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Get(blockBodyKey(number, hash)) +func ReadBodyRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient("bodies", number) + if len(data) == 0 { + data, _ = db.Get(blockBodyKey(number, hash)) + } return data } @@ -210,7 +240,10 @@ func WriteBodyRLP(db ethdb.Writer, hash common.Hash, number uint64, rlp rlp.RawV } // HasBody verifies the existence of a block body corresponding to the hash. -func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { +func HasBody(db ethdb.AncientReader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { + return true + } if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil { return false } @@ -218,7 +251,7 @@ func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { } // ReadBody retrieves the block body corresponding to the hash. -func ReadBody(db ethdb.Reader, hash common.Hash, number uint64) *types.Body { +func ReadBody(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Body { data := ReadBodyRLP(db, hash, number) if len(data) == 0 { return nil @@ -248,13 +281,16 @@ func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) { } // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. -func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Get(headerTDKey(number, hash)) +func ReadTdRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient("diffs", number) + if len(data) == 0 { + data, _ = db.Get(headerTDKey(number, hash)) + } return data } // ReadTd retrieves a block's total difficulty corresponding to the hash. -func ReadTd(db ethdb.Reader, hash common.Hash, number uint64) *big.Int { +func ReadTd(db ethdb.AncientReader, hash common.Hash, number uint64) *big.Int { data := ReadTdRLP(db, hash, number) if len(data) == 0 { return nil @@ -287,7 +323,10 @@ func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { // HasReceipts verifies the existence of all the transaction receipts belonging // to a block. -func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { +func HasReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { + return true + } if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { return false } @@ -295,15 +334,18 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { } // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. -func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Get(blockReceiptsKey(number, hash)) +func ReadReceiptsRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient("receipts", number) + if len(data) == 0 { + data, _ = db.Get(blockReceiptsKey(number, hash)) + } return data } // ReadRawReceipts retrieves all the transaction receipts belonging to a block. // The receipt metadata fields are not guaranteed to be populated, so they // should not be used. Use ReadReceipts instead if the metadata is needed. -func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts { +func ReadRawReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) types.Receipts { // Retrieve the flattened receipt slice data := ReadReceiptsRLP(db, hash, number) if len(data) == 0 { @@ -329,7 +371,7 @@ func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Rec // The current implementation populates these metadata fields by reading the receipts' // corresponding block body, so if the block body is not found it will return nil even // if the receipt itself is stored. -func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { +func ReadReceipts(db ethdb.AncientReader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { // We're deriving many fields from the block body, retrieve beside the receipt receipts := ReadRawReceipts(db, hash, number) if receipts == nil { @@ -377,7 +419,7 @@ func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { // // Note, due to concurrent download of header and block body the header and thus // canonical hash can be stored in the database but the body data not (yet). -func ReadBlock(db ethdb.Reader, hash common.Hash, number uint64) *types.Block { +func ReadBlock(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Block { header := ReadHeader(db, hash, number) if header == nil { return nil @@ -413,7 +455,7 @@ func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) } // FindCommonAncestor returns the last common ancestor of two block headers -func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header { +func FindCommonAncestor(db ethdb.AncientReader, a, b *types.Header) *types.Header { for bn := b.Number.Uint64(); a.Number.Uint64() > bn; { a = ReadHeader(db, a.ParentHash, a.Number.Uint64()-1) if a == nil { diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 423145a76..666e3edff 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -69,7 +69,7 @@ func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) { // ReadTransaction retrieves a specific transaction from the database, along with // its added positional metadata. -func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { +func ReadTransaction(db ethdb.AncientReader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { return nil, common.Hash{}, 0, 0 @@ -94,7 +94,7 @@ func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, com // ReadReceipt retrieves a specific transaction receipt from the database, along with // its added positional metadata. -func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { +func ReadReceipt(db ethdb.AncientReader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { // Retrieve the context of the receipt based on the transaction hash blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index b4c5dea70..0f994c3fd 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -22,10 +22,44 @@ import ( "github.com/ethereum/go-ethereum/ethdb/memorydb" ) +// freezerdb is a databse wrapper that enabled freezer data retrievals. +type freezerdb struct { + ethdb.KeyValueStore + ethdb.Ancienter +} + +// nofreezedb is a database wrapper that disables freezer data retrievals. +type nofreezedb struct { + ethdb.KeyValueStore +} + +// Frozen returns nil as we don't have a backing chain freezer. +func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { + return nil, errOutOfBounds +} + // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { - return db + return &nofreezedb{ + KeyValueStore: db, + } +} + +// NewDatabaseWithFreezer creates a high level database on top of a given key- +// value data store with a freezer moving immutable chain segments into cold +// storage. +func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) { + frdb, err := newFreezer(freezer, namespace) + if err != nil { + return nil, err + } + go frdb.freeze(db) + + return &freezerdb{ + KeyValueStore: db, + Ancienter: frdb, + }, nil } // NewMemoryDatabase creates an ephemeral in-memory key-value database without a @@ -34,9 +68,9 @@ func NewMemoryDatabase() ethdb.Database { return NewDatabase(memorydb.New()) } -// NewMemoryDatabaseWithCap creates an ephemeral in-memory key-value database with -// an initial starting capacity, but without a freezer moving immutable chain -// segments into cold storage. +// NewMemoryDatabaseWithCap creates an ephemeral in-memory key-value database +// with an initial starting capacity, but without a freezer moving immutable +// chain segments into cold storage. func NewMemoryDatabaseWithCap(size int) ethdb.Database { return NewDatabase(memorydb.NewWithCap(size)) } @@ -50,3 +84,18 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string) ( } return NewDatabase(db), nil } + +// NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a +// freezer moving immutable chain segments into cold storage. +func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string) (ethdb.Database, error) { + kvdb, err := leveldb.New(file, cache, handles, namespace) + if err != nil { + return nil, err + } + frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace) + if err != nil { + kvdb.Close() + return nil, err + } + return frdb, nil +} diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go new file mode 100644 index 000000000..4f227e3b7 --- /dev/null +++ b/core/rawdb/freezer.go @@ -0,0 +1,276 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "errors" + "fmt" + "math" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +// errUnknownTable is returned if the user attempts to read from a table that is +// not tracked by the freezer. +var errUnknownTable = errors.New("unknown table") + +const ( + // freezerRecheckInterval is the frequency to check the key-value database for + // chain progression that might permit new blocks to be frozen into immutable + // storage. + freezerRecheckInterval = time.Minute + + // freezerBlockGraduation is the number of confirmations a block must achieve + // before it becomes elligible for chain freezing. This must exceed any chain + // reorg depth, since the freezer also deletes all block siblings. + freezerBlockGraduation = 60000 + + // freezerBatchLimit is the maximum number of blocks to freeze in one batch + // before doing an fsync and deleting it from the key-value store. + freezerBatchLimit = 30000 +) + +// freezer is an memory mapped append-only database to store immutable chain data +// into flat files: +// +// - The append only nature ensures that disk writes are minimized. +// - The memory mapping ensures we can max out system memory for caching without +// reserving it for go-ethereum. This would also reduce the memory requirements +// of Geth, and thus also GC overhead. +type freezer struct { + tables map[string]*freezerTable // Data tables for storing everything + frozen uint64 // Number of blocks already frozen +} + +// newFreezer creates a chain freezer that moves ancient chain data into +// append-only flat file containers. +func newFreezer(datadir string, namespace string) (*freezer, error) { + // Create the initial freezer object + var ( + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + ) + // Open all the supported data tables + freezer := &freezer{ + tables: make(map[string]*freezerTable), + } + for _, name := range []string{"hashes", "headers", "bodies", "receipts", "diffs"} { + table, err := newTable(datadir, name, readMeter, writeMeter) + if err != nil { + for _, table := range freezer.tables { + table.Close() + } + return nil, err + } + freezer.tables[name] = table + } + // Truncate all data tables to the same length + freezer.frozen = math.MaxUint64 + for _, table := range freezer.tables { + if freezer.frozen > table.items { + freezer.frozen = table.items + } + } + for _, table := range freezer.tables { + if err := table.truncate(freezer.frozen); err != nil { + for _, table := range freezer.tables { + table.Close() + } + return nil, err + } + } + return freezer, nil +} + +// Close terminates the chain freezer, unmapping all the data files. +func (f *freezer) Close() error { + var errs []error + for _, table := range f.tables { + if err := table.Close(); err != nil { + errs = append(errs, err) + } + } + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// sync flushes all data tables to disk. +func (f *freezer) sync() error { + var errs []error + for _, table := range f.tables { + if err := table.Sync(); err != nil { + errs = append(errs, err) + } + } + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { + if table := f.tables[kind]; table != nil { + return table.Retrieve(number) + } + return nil, errUnknownTable +} + +// freeze is a background thread that periodically checks the blockchain for any +// import progress and moves ancient data from the fast database into the freezer. +// +// This functionality is deliberately broken off from block importing to avoid +// incurring additional data shuffling delays on block propagation. +func (f *freezer) freeze(db ethdb.KeyValueStore) { + nfdb := &nofreezedb{KeyValueStore: db} + + for { + // Retrieve the freezing threshold. In theory we're interested only in full + // blocks post-sync, but that would keep the live database enormous during + // dast sync. By picking the fast block, we still get to deep freeze all the + // final immutable data without having to wait for sync to finish. + hash := ReadHeadFastBlockHash(nfdb) + if hash == (common.Hash{}) { + log.Debug("Current fast block hash unavailable") // new chain, empty database + time.Sleep(freezerRecheckInterval) + continue + } + number := ReadHeaderNumber(nfdb, hash) + switch { + case number == nil: + log.Error("Current fast block number unavailable", "hash", hash) + time.Sleep(freezerRecheckInterval) + continue + + case *number < freezerBlockGraduation: + log.Debug("Current fast block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) + time.Sleep(freezerRecheckInterval) + continue + + case *number-freezerBlockGraduation <= f.frozen: + log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) + time.Sleep(freezerRecheckInterval) + continue + } + head := ReadHeader(nfdb, hash, *number) + if head == nil { + log.Error("Current fast block unavailable", "number", *number, "hash", hash) + time.Sleep(freezerRecheckInterval) + continue + } + // Seems we have data ready to be frozen, process in usable batches + limit := *number - freezerBlockGraduation + if limit-f.frozen > freezerBatchLimit { + limit = f.frozen + freezerBatchLimit + } + var ( + start = time.Now() + first = f.frozen + ancients = make([]common.Hash, 0, limit) + ) + for f.frozen < limit { + // Retrieves all the components of the canonical block + hash := ReadCanonicalHash(nfdb, f.frozen) + if hash == (common.Hash{}) { + log.Error("Canonical hash missing, can't freeze", "number", f.frozen) + break + } + header := ReadHeaderRLP(nfdb, hash, f.frozen) + if len(header) == 0 { + log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash) + break + } + body := ReadBodyRLP(nfdb, hash, f.frozen) + if len(body) == 0 { + log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash) + break + } + receipts := ReadReceiptsRLP(nfdb, hash, f.frozen) + if len(receipts) == 0 { + log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash) + break + } + td := ReadTdRLP(nfdb, hash, f.frozen) + if len(td) == 0 { + log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) + break + } + // Inject all the components into the relevant data tables + if err := f.tables["hashes"].Append(f.frozen, hash[:]); err != nil { + log.Error("Failed to deep freeze hash", "number", f.frozen, "hash", hash, "err", err) + break + } + if err := f.tables["headers"].Append(f.frozen, header); err != nil { + log.Error("Failed to deep freeze header", "number", f.frozen, "hash", hash, "err", err) + break + } + if err := f.tables["bodies"].Append(f.frozen, body); err != nil { + log.Error("Failed to deep freeze body", "number", f.frozen, "hash", hash, "err", err) + break + } + if err := f.tables["receipts"].Append(f.frozen, receipts); err != nil { + log.Error("Failed to deep freeze receipts", "number", f.frozen, "hash", hash, "err", err) + break + } + if err := f.tables["diffs"].Append(f.frozen, td); err != nil { + log.Error("Failed to deep freeze difficulty", "number", f.frozen, "hash", hash, "err", err) + break + } + log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) + atomic.AddUint64(&f.frozen, 1) // Only modify atomically + ancients = append(ancients, hash) + } + // Batch of blocks have been frozen, flush them before wiping from leveldb + if err := f.sync(); err != nil { + log.Crit("Failed to flush frozen tables", "err", err) + } + // Wipe out all data from the active database + batch := db.NewBatch() + for number := first; number < f.frozen; number++ { + for _, hash := range readAllHashes(db, number) { + if hash == ancients[number-first] { + deleteBlockWithoutNumber(batch, hash, number) + } else { + DeleteBlock(batch, hash, number) + } + } + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete frozen items", "err", err) + } + // Log something friendly for the user + context := []interface{}{ + "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, + } + if n := len(ancients); n > 0 { + context = append(context, []interface{}{"hash", ancients[n-1]}...) + } + log.Info("Deep froze chain segment", context...) + + // Avoid database thrashing with tiny writes + if f.frozen-first < freezerBatchLimit { + time.Sleep(freezerRecheckInterval) + } + } +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go new file mode 100644 index 000000000..546db0c65 --- /dev/null +++ b/core/rawdb/freezer_table.go @@ -0,0 +1,284 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "encoding/binary" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/golang/snappy" +) + +var ( + // errClosed is returned if an operation attempts to read from or write to the + // freezer table after it has already been closed. + errClosed = errors.New("closed") + + // errOutOfBounds is returned if the item requested is not contained within the + // freezer table. + errOutOfBounds = errors.New("out of bounds") +) + +// freezerTable represents a single chained data table within the freezer (e.g. blocks). +// It consists of a data file (snappy encoded arbitrary data blobs) and an index +// file (uncompressed 64 bit indices into the data file). +type freezerTable struct { + content *os.File // File descriptor for the data content of the table + offsets *os.File // File descriptor for the index file of the table + + items uint64 // Number of items stored in the table + bytes uint64 // Number of content bytes stored in the table + + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + + logger log.Logger // Logger with database path and table name ambedded + lock sync.RWMutex // Mutex protecting the data file descriptors +} + +// newTable opens a freezer table, creating the data and index files if they are +// non existent. Both files are truncated to the shortest common length to ensure +// they don't go out of sync. +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter) (*freezerTable, error) { + // Ensure the containing directory exists and open the two data files + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + content, err := os.OpenFile(filepath.Join(path, name+".dat"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + offsets, err := os.OpenFile(filepath.Join(path, name+".idx"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + content.Close() + return nil, err + } + // Create the table and repair any past inconsistency + tab := &freezerTable{ + content: content, + offsets: offsets, + readMeter: readMeter, + writeMeter: writeMeter, + logger: log.New("database", path, "table", name), + } + if err := tab.repair(); err != nil { + offsets.Close() + content.Close() + return nil, err + } + return tab, nil +} + +// repair cross checks the content and the offsets file and truncates them to +// be in sync with each other after a potential crash / data loss. +func (t *freezerTable) repair() error { + // Create a temporary offset buffer to init files with and read offsts into + offset := make([]byte, 8) + + // If we've just created the files, initialize the offsets with the 0 index + stat, err := t.offsets.Stat() + if err != nil { + return err + } + if stat.Size() == 0 { + if _, err := t.offsets.Write(offset); err != nil { + return err + } + } + // Ensure the offsets are a multiple of 8 bytes + if overflow := stat.Size() % 8; overflow != 0 { + t.offsets.Truncate(stat.Size() - overflow) // New file can't trigger this path + } + // Retrieve the file sizes and prepare for truncation + if stat, err = t.offsets.Stat(); err != nil { + return err + } + offsetsSize := stat.Size() + + if stat, err = t.content.Stat(); err != nil { + return err + } + contentSize := stat.Size() + + // Keep truncating both files until they come in sync + t.offsets.ReadAt(offset, offsetsSize-8) + contentExp := int64(binary.LittleEndian.Uint64(offset)) + + for contentExp != contentSize { + // Truncate the content file to the last offset pointer + if contentExp < contentSize { + t.logger.Warn("Truncating dangling content", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) + if err := t.content.Truncate(contentExp); err != nil { + return err + } + contentSize = contentExp + } + // Truncate the offsets to point within the content file + if contentExp > contentSize { + t.logger.Warn("Truncating dangling offsets", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) + if err := t.offsets.Truncate(offsetsSize - 8); err != nil { + return err + } + offsetsSize -= 8 + + t.offsets.ReadAt(offset, offsetsSize-8) + contentExp = int64(binary.LittleEndian.Uint64(offset)) + } + } + // Ensure all reparation changes have been written to disk + if err := t.offsets.Sync(); err != nil { + return err + } + if err := t.content.Sync(); err != nil { + return err + } + // Update the item and byte counters and return + t.items = uint64(offsetsSize/8 - 1) // last index points to the end of the data file + t.bytes = uint64(contentSize) + + t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.bytes)) + return nil +} + +// truncate discards any recent data above the provided threashold number. +func (t *freezerTable) truncate(items uint64) error { + // If out item count is corrent, don't do anything + if t.items <= items { + return nil + } + // Something's out of sync, truncate the table's offset index + t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) + if err := t.offsets.Truncate(int64(items+1) * 8); err != nil { + return err + } + // Calculate the new expected size of the data file and truncate it + offset := make([]byte, 8) + t.offsets.ReadAt(offset, int64(items)*8) + expected := binary.LittleEndian.Uint64(offset) + + if err := t.content.Truncate(int64(expected)); err != nil { + return err + } + // All data files truncated, set internal counters and return + t.items, t.bytes = items, expected + return nil +} + +// Close unmaps all active memory mapped regions. +func (t *freezerTable) Close() error { + t.lock.Lock() + defer t.lock.Unlock() + + var errs []error + if err := t.offsets.Close(); err != nil { + errs = append(errs, err) + } + t.offsets = nil + + if err := t.content.Close(); err != nil { + errs = append(errs, err) + } + t.content = nil + + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// Append injects a binary blob at the end of the freezer table. The item index +// is a precautionary parameter to ensure data correctness, but the table will +// reject already existing data. +// +// Note, this method will *not* flush any data to disk so be sure to explicitly +// fsync before irreversibly deleting data from the database. +func (t *freezerTable) Append(item uint64, blob []byte) error { + // Ensure the table is still accessible + if t.offsets == nil || t.content == nil { + return errClosed + } + // Ensure only the next item can be written, nothing else + if t.items != item { + panic(fmt.Sprintf("appending unexpected item: want %d, have %d", t.items, item)) + } + // Encode the blob and write it into the data file + blob = snappy.Encode(nil, blob) + if _, err := t.content.Write(blob); err != nil { + return err + } + t.bytes += uint64(len(blob)) + + offset := make([]byte, 8) + binary.LittleEndian.PutUint64(offset, t.bytes) + if _, err := t.offsets.Write(offset); err != nil { + return err + } + t.items++ + + t.writeMeter.Mark(int64(len(blob) + 8)) // 8 = 1 x 8 byte offset + return nil +} + +// Retrieve looks up the data offset of an item with the given index and retrieves +// the raw binary blob from the data file. +func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + // Ensure the table and the item is accessible + if t.offsets == nil || t.content == nil { + return nil, errClosed + } + if t.items <= item { + return nil, errOutOfBounds + } + // Item reachable, retrieve the data content boundaries + offset := make([]byte, 8) + if _, err := t.offsets.ReadAt(offset, int64(item*8)); err != nil { + return nil, err + } + start := binary.LittleEndian.Uint64(offset) + + if _, err := t.offsets.ReadAt(offset, int64((item+1)*8)); err != nil { + return nil, err + } + end := binary.LittleEndian.Uint64(offset) + + // Retrieve the data itself, decompress and return + blob := make([]byte, end-start) + if _, err := t.content.ReadAt(blob, int64(start)); err != nil { + return nil, err + } + t.readMeter.Mark(int64(len(blob) + 16)) // 16 = 2 x 8 byte offset + return snappy.Decode(nil, blob) +} + +// Sync pushes any pending data from memory out to disk. This is an expensive +// operation, so use it with care. +func (t *freezerTable) Sync() error { + if err := t.offsets.Sync(); err != nil { + return err + } + return t.content.Sync() +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 0e50db7c9..0b5e08b20 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -50,6 +50,12 @@ func (t *table) Get(key []byte) ([]byte, error) { return t.db.Get(append([]byte(t.prefix), key...)) } +// Ancient is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Ancient(kind string, number uint64) ([]byte, error) { + return t.db.Ancient(kind, number) +} + // Put inserts the given value into the database at a prefixed version of the // provided key. func (t *table) Put(key []byte, value []byte) error { -- cgit v1.2.3 From b69bdc2a4f8efab7cb99934652e2d9b2508c4544 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 26 Mar 2019 12:28:23 +0100 Subject: freezer: implement split files for data * freezer: implement split files for data * freezer: add tests * freezer: close old head-file when opening next * freezer: fix truncation * freezer: more testing around close/open * rawdb/freezer: address review concerns * freezer: fix minor review concerns * freezer: fix remaining concerns + testcases around truncation * freezer: docs * freezer: implement multithreading * core/rawdb: fix freezer nitpicks + change offsets to uint32 * freezer: preopen files, simplify lock constructs * freezer: delete files during truncation --- core/rawdb/freezer_table.go | 399 ++++++++++++++++++++++-------- core/rawdb/freezer_table_test.go | 511 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 815 insertions(+), 95 deletions(-) create mode 100644 core/rawdb/freezer_table_test.go (limited to 'core') diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 546db0c65..313ac8b78 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -1,4 +1,4 @@ -// Copyright 2018 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -40,16 +41,47 @@ var ( errOutOfBounds = errors.New("out of bounds") ) +// indexEntry contains the number/id of the file that the data resides in, aswell as the +// offset within the file to the end of the data +// In serialized form, the filenum is stored as uint16. +type indexEntry struct { + filenum uint32 // stored as uint16 ( 2 bytes) + offset uint32 // stored as uint32 ( 4 bytes) +} + +const indexEntrySize = 6 + +// unmarshallBinary deserializes binary b into the rawIndex entry. +func (i *indexEntry) unmarshalBinary(b []byte) error { + i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) + i.offset = binary.BigEndian.Uint32(b[2:6]) + return nil +} + +// marshallBinary serializes the rawIndex entry into binary. +func (i *indexEntry) marshallBinary() []byte { + b := make([]byte, indexEntrySize) + binary.BigEndian.PutUint16(b[:2], uint16(i.filenum)) + binary.BigEndian.PutUint32(b[2:6], i.offset) + return b +} + // freezerTable represents a single chained data table within the freezer (e.g. blocks). -// It consists of a data file (snappy encoded arbitrary data blobs) and an index +// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry // file (uncompressed 64 bit indices into the data file). type freezerTable struct { - content *os.File // File descriptor for the data content of the table - offsets *os.File // File descriptor for the index file of the table + noCompression bool // if true, disables snappy compression. Note: does not work retroactively + maxFileSize uint32 // Max file size for data-files + name string + path string - items uint64 // Number of items stored in the table - bytes uint64 // Number of content bytes stored in the table + head *os.File // File descriptor for the data head of the table + files map[uint32]*os.File // open files + headId uint32 // number of the currently active head file + index *os.File // File descriptor for the indexEntry file of the table + items uint64 // Number of items stored in the table + headBytes uint32 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -57,149 +89,231 @@ type freezerTable struct { lock sync.RWMutex // Mutex protecting the data file descriptors } -// newTable opens a freezer table, creating the data and index files if they are +// newTable opens a freezer table with default settings - 2G files and snappy compression +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, false) +} + +// newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter) (*freezerTable, error) { - // Ensure the containing directory exists and open the two data files +func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { + // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err } - content, err := os.OpenFile(filepath.Join(path, name+".dat"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return nil, err + var idxName string + if noCompression { + // raw idx + idxName = fmt.Sprintf("%s.ridx", name) + } else { + // compressed idx + idxName = fmt.Sprintf("%s.cidx", name) } - offsets, err := os.OpenFile(filepath.Join(path, name+".idx"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) if err != nil { - content.Close() return nil, err } // Create the table and repair any past inconsistency tab := &freezerTable{ - content: content, - offsets: offsets, - readMeter: readMeter, - writeMeter: writeMeter, - logger: log.New("database", path, "table", name), + index: offsets, + files: make(map[uint32]*os.File), + readMeter: readMeter, + writeMeter: writeMeter, + name: name, + path: path, + logger: log.New("database", path, "table", name), + noCompression: noCompression, + maxFileSize: maxFilesize, } if err := tab.repair(); err != nil { - offsets.Close() - content.Close() + tab.Close() return nil, err } return tab, nil } -// repair cross checks the content and the offsets file and truncates them to +// repair cross checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { - // Create a temporary offset buffer to init files with and read offsts into - offset := make([]byte, 8) + // Create a temporary offset buffer to init files with and read indexEntry into + buffer := make([]byte, indexEntrySize) - // If we've just created the files, initialize the offsets with the 0 index - stat, err := t.offsets.Stat() + // If we've just created the files, initialize the index with the 0 indexEntry + stat, err := t.index.Stat() if err != nil { return err } if stat.Size() == 0 { - if _, err := t.offsets.Write(offset); err != nil { + if _, err := t.index.Write(buffer); err != nil { return err } } - // Ensure the offsets are a multiple of 8 bytes - if overflow := stat.Size() % 8; overflow != 0 { - t.offsets.Truncate(stat.Size() - overflow) // New file can't trigger this path + // Ensure the index is a multiple of indexEntrySize bytes + if overflow := stat.Size() % indexEntrySize; overflow != 0 { + t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path } // Retrieve the file sizes and prepare for truncation - if stat, err = t.offsets.Stat(); err != nil { + if stat, err = t.index.Stat(); err != nil { return err } offsetsSize := stat.Size() - if stat, err = t.content.Stat(); err != nil { + // Open the head file + var ( + lastIndex indexEntry + contentSize int64 + contentExp int64 + ) + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + lastIndex.unmarshalBinary(buffer) + t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + if err != nil { + return err + } + if stat, err = t.head.Stat(); err != nil { return err } - contentSize := stat.Size() + contentSize = stat.Size() // Keep truncating both files until they come in sync - t.offsets.ReadAt(offset, offsetsSize-8) - contentExp := int64(binary.LittleEndian.Uint64(offset)) + contentExp = int64(lastIndex.offset) for contentExp != contentSize { - // Truncate the content file to the last offset pointer + // Truncate the head file to the last offset pointer if contentExp < contentSize { - t.logger.Warn("Truncating dangling content", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := t.content.Truncate(contentExp); err != nil { + t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) + if err := t.head.Truncate(contentExp); err != nil { return err } contentSize = contentExp } - // Truncate the offsets to point within the content file + // Truncate the index to point within the head file if contentExp > contentSize { - t.logger.Warn("Truncating dangling offsets", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := t.offsets.Truncate(offsetsSize - 8); err != nil { + t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) + if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil { return err } - offsetsSize -= 8 - - t.offsets.ReadAt(offset, offsetsSize-8) - contentExp = int64(binary.LittleEndian.Uint64(offset)) + offsetsSize -= indexEntrySize + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + var newLastIndex indexEntry + newLastIndex.unmarshalBinary(buffer) + // We might have slipped back into an earlier head-file here + if newLastIndex.filenum != lastIndex.filenum { + // release earlier opened file + t.releaseFile(lastIndex.filenum) + t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + if stat, err = t.head.Stat(); err != nil { + // TODO, anything more we can do here? + // A data file has gone missing... + return err + } + contentSize = stat.Size() + } + lastIndex = newLastIndex + contentExp = int64(lastIndex.offset) } } // Ensure all reparation changes have been written to disk - if err := t.offsets.Sync(); err != nil { + if err := t.index.Sync(); err != nil { return err } - if err := t.content.Sync(); err != nil { + if err := t.head.Sync(); err != nil { return err } // Update the item and byte counters and return - t.items = uint64(offsetsSize/8 - 1) // last index points to the end of the data file - t.bytes = uint64(contentSize) + t.items = uint64(offsetsSize/indexEntrySize - 1) // last indexEntry points to the end of the data file + t.headBytes = uint32(contentSize) + t.headId = lastIndex.filenum - t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.bytes)) + // Close opened files and preopen all files + if err := t.preopen(); err != nil { + return err + } + t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.headBytes)) return nil } +// preopen opens all files that the freezer will need. This method should be called from an init-context, +// since it assumes that it doesn't have to bother with locking +// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever +// obtain a write-lock within Retrieve. +func (t *freezerTable) preopen() (err error) { + // The repair might have already opened (some) files + t.releaseFilesAfter(0, false) + // Open all except head in RDONLY + for i := uint32(0); i < t.headId; i++ { + if _, err = t.openFile(i, os.O_RDONLY); err != nil { + return err + } + } + // Open head in read/write + t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND) + return err +} + // truncate discards any recent data above the provided threashold number. func (t *freezerTable) truncate(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() // If out item count is corrent, don't do anything - if t.items <= items { + if atomic.LoadUint64(&t.items) <= items { return nil } // Something's out of sync, truncate the table's offset index t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) - if err := t.offsets.Truncate(int64(items+1) * 8); err != nil { + if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil { return err } // Calculate the new expected size of the data file and truncate it - offset := make([]byte, 8) - t.offsets.ReadAt(offset, int64(items)*8) - expected := binary.LittleEndian.Uint64(offset) + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { + return err + } + var expected indexEntry + expected.unmarshalBinary(buffer) + // We might need to truncate back to older files + if expected.filenum != t.headId { + // If already open for reading, force-reopen for writing + t.releaseFile(expected.filenum) + newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + if err != nil { + return err + } + // release any files _after the current head -- both the previous head + // and any files which may have been opened for reading + t.releaseFilesAfter(expected.filenum, true) + // set back the historic head + t.head = newHead + atomic.StoreUint32(&t.headId, expected.filenum) + } - if err := t.content.Truncate(int64(expected)); err != nil { + if err := t.head.Truncate(int64(expected.offset)); err != nil { return err } // All data files truncated, set internal counters and return - t.items, t.bytes = items, expected + atomic.StoreUint64(&t.items, items) + atomic.StoreUint32(&t.headBytes, expected.offset) return nil } -// Close unmaps all active memory mapped regions. +// Close closes all opened files. func (t *freezerTable) Close() error { t.lock.Lock() defer t.lock.Unlock() var errs []error - if err := t.offsets.Close(); err != nil { + if err := t.index.Close(); err != nil { errs = append(errs, err) } - t.offsets = nil + t.index = nil - if err := t.content.Close(); err != nil { - errs = append(errs, err) + for _, f := range t.files { + if err := f.Close(); err != nil { + errs = append(errs, err) + } } - t.content = nil + t.head = nil if errs != nil { return fmt.Errorf("%v", errs) @@ -207,78 +321,173 @@ func (t *freezerTable) Close() error { return nil } -// Append injects a binary blob at the end of the freezer table. The item index +// openFile assumes that the write-lock is held by the caller +func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { + var exist bool + if f, exist = t.files[num]; !exist { + var name string + if t.noCompression { + name = fmt.Sprintf("%s.%d.rdat", t.name, num) + } else { + name = fmt.Sprintf("%s.%d.cdat", t.name, num) + } + f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) + if err != nil { + return nil, err + } + t.files[num] = f + } + return f, err +} + +// releaseFile closes a file, and removes it from the open file cache. +// Assumes that the caller holds the write lock +func (t *freezerTable) releaseFile(num uint32) { + if f, exist := t.files[num]; exist { + delete(t.files, num) + f.Close() + } +} + +// releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files +func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum > num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + +// Append injects a binary blob at the end of the freezer table. The item number // is a precautionary parameter to ensure data correctness, but the table will // reject already existing data. // // Note, this method will *not* flush any data to disk so be sure to explicitly // fsync before irreversibly deleting data from the database. func (t *freezerTable) Append(item uint64, blob []byte) error { + // Read lock prevents competition with truncate + t.lock.RLock() // Ensure the table is still accessible - if t.offsets == nil || t.content == nil { + if t.index == nil || t.head == nil { return errClosed } // Ensure only the next item can be written, nothing else - if t.items != item { + if atomic.LoadUint64(&t.items) != item { panic(fmt.Sprintf("appending unexpected item: want %d, have %d", t.items, item)) } // Encode the blob and write it into the data file - blob = snappy.Encode(nil, blob) - if _, err := t.content.Write(blob); err != nil { - return err + if !t.noCompression { + blob = snappy.Encode(nil, blob) } - t.bytes += uint64(len(blob)) + bLen := uint32(len(blob)) + if t.headBytes+bLen < bLen || + t.headBytes+bLen > t.maxFileSize { + // we need a new file, writing would overflow + t.lock.RUnlock() + t.lock.Lock() + nextId := atomic.LoadUint32(&t.headId) + 1 + // We open the next file in truncated mode -- if this file already + // exists, we need to start over from scratch on it + newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC) + if err != nil { + t.lock.Unlock() + return err + } + // Close old file, and reopen in RDONLY mode + t.releaseFile(t.headId) + t.openFile(t.headId, os.O_RDONLY) - offset := make([]byte, 8) - binary.LittleEndian.PutUint64(offset, t.bytes) - if _, err := t.offsets.Write(offset); err != nil { - return err + // Swap out the current head + t.head = newHead + atomic.StoreUint32(&t.headBytes, 0) + atomic.StoreUint32(&t.headId, nextId) + t.lock.Unlock() + t.lock.RLock() } - t.items++ - t.writeMeter.Mark(int64(len(blob) + 8)) // 8 = 1 x 8 byte offset + defer t.lock.RUnlock() + + if _, err := t.head.Write(blob); err != nil { + return err + } + newOffset := atomic.AddUint32(&t.headBytes, bLen) + idx := indexEntry{ + filenum: atomic.LoadUint32(&t.headId), + offset: newOffset, + } + // Write indexEntry + t.index.Write(idx.marshallBinary()) + t.writeMeter.Mark(int64(bLen + indexEntrySize)) + atomic.AddUint64(&t.items, 1) return nil } -// Retrieve looks up the data offset of an item with the given index and retrieves +// getBounds returns the indexes for the item +// returns start, end, filenumber and error +func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { + var startIdx, endIdx indexEntry + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { + return 0, 0, 0, err + } + startIdx.unmarshalBinary(buffer) + if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { + return 0, 0, 0, err + } + endIdx.unmarshalBinary(buffer) + if startIdx.filenum != endIdx.filenum { + // If a piece of data 'crosses' a data-file, + // it's actually in one piece on the second data-file. + // We return a zero-indexEntry for the second file as start + return 0, endIdx.offset, endIdx.filenum, nil + } + return startIdx.offset, endIdx.offset, endIdx.filenum, nil +} + +// Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - t.lock.RLock() - defer t.lock.RUnlock() // Ensure the table and the item is accessible - if t.offsets == nil || t.content == nil { + if t.index == nil || t.head == nil { return nil, errClosed } - if t.items <= item { + if atomic.LoadUint64(&t.items) <= item { return nil, errOutOfBounds } - // Item reachable, retrieve the data content boundaries - offset := make([]byte, 8) - if _, err := t.offsets.ReadAt(offset, int64(item*8)); err != nil { + t.lock.RLock() + startOffset, endOffset, filenum, err := t.getBounds(item) + if err != nil { return nil, err } - start := binary.LittleEndian.Uint64(offset) - - if _, err := t.offsets.ReadAt(offset, int64((item+1)*8)); err != nil { - return nil, err + dataFile, exist := t.files[filenum] + if !exist { + return nil, fmt.Errorf("missing data file %d", filenum) } - end := binary.LittleEndian.Uint64(offset) - // Retrieve the data itself, decompress and return - blob := make([]byte, end-start) - if _, err := t.content.ReadAt(blob, int64(start)); err != nil { + blob := make([]byte, endOffset-startOffset) + if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { + t.lock.RUnlock() return nil, err } - t.readMeter.Mark(int64(len(blob) + 16)) // 16 = 2 x 8 byte offset + t.lock.RUnlock() + t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) + + if t.noCompression { + return blob, nil + } return snappy.Decode(nil, blob) } // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { - if err := t.offsets.Sync(); err != nil { + if err := t.index.Sync(); err != nil { return err } - return t.content.Sync() + return t.head.Sync() } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go new file mode 100644 index 000000000..d6ce6e93c --- /dev/null +++ b/core/rawdb/freezer_table_test.go @@ -0,0 +1,511 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "fmt" + "github.com/ethereum/go-ethereum/metrics" + "math/rand" + "os" + "path/filepath" + "testing" + "time" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +// Gets a chunk of data, filled with 'b' +func getChunk(size int, b byte) []byte { + data := make([]byte, size) + for i, _ := range data { + data[i] = b + } + return data +} + +func print(t *testing.T, f *freezerTable, item uint64) { + a, err := f.Retrieve(item) + if err != nil { + t.Fatal(err) + } + fmt.Printf("db[%d] = %x\n", item, a) +} + +// TestFreezerBasics test initializing a freezertable from scratch, writing to the table, +// and reading it back. +func TestFreezerBasics(t *testing.T) { + t.Parallel() + // set cutoff at 50 bytes + f, err := newCustomTable(os.TempDir(), + fmt.Sprintf("unittest-%d", rand.Uint64()), + metrics.NewMeter(), metrics.NewMeter(), 50, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + // Write 15 bytes 255 times, results in 85 files + for x := byte(0); x < 255; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + + //print(t, f, 0) + //print(t, f, 1) + //print(t, f, 2) + // + //db[0] = 000000000000000000000000000000 + //db[1] = 010101010101010101010101010101 + //db[2] = 020202020202020202020202020202 + + for y := byte(0); y < 255; y++ { + exp := getChunk(15, y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + } +} + +// TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between +// every operation +func TestFreezerBasicsClosing(t *testing.T) { + t.Parallel() + // set cutoff at 50 bytes + var ( + fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) + m1, m2 = metrics.NewMeter(), metrics.NewMeter() + f *freezerTable + err error + ) + f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times, results in 85 files + for x := byte(0); x < 255; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + f.Close() + f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + if err != nil { + t.Fatal(err) + } + } + defer f.Close() + + for y := byte(0); y < 255; y++ { + exp := getChunk(15, y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + f.Close() + f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + if err != nil { + t.Fatal(err) + } + } +} + +// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed +func TestFreezerRepairDanglingHead(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) + + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + for x := byte(0); x < 0xff; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + // The last item should be there + if _, err = f.Retrieve(0xfe); err != nil { + t.Fatal(err) + } + f.Close() + } + // open the index + idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Failed to open index file: %v", err) + } + // Remove 4 bytes + stat, err := idxFile.Stat() + if err != nil { + t.Fatalf("Failed to stat index file: %v", err) + } + idxFile.Truncate(stat.Size() - 4) + idxFile.Close() + // Now open it again + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + // The last item should be missing + if _, err = f.Retrieve(0xff); err == nil { + t.Errorf("Expected error for missing index entry") + } + // The one before should still be there + if _, err = f.Retrieve(0xfd); err != nil { + t.Fatalf("Expected no error, got %v", err) + } + } +} + +// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed +func TestFreezerRepairDanglingHeadLarge(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) + + { // Fill a table and close it + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + for x := byte(0); x < 0xff; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + // The last item should be there + if _, err = f.Retrieve(f.items - 1); err == nil { + if err != nil { + t.Fatal(err) + } + } + f.Close() + } + // open the index + idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Failed to open index file: %v", err) + } + // Remove everything but the first item, and leave data unaligned + // 0-indexEntry, 1-indexEntry, corrupt-indexEntry + idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) + idxFile.Close() + // Now open it again + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + // The first item should be there + if _, err = f.Retrieve(0); err != nil { + t.Fatal(err) + } + // The second item should be missing + if _, err = f.Retrieve(1); err == nil { + t.Errorf("Expected error for missing index entry") + } + // We should now be able to store items again, from item = 1 + for x := byte(1); x < 0xff; x++ { + data := getChunk(15, ^x) + f.Append(uint64(x), data) + } + f.Close() + } + // And if we open it, we should now be able to read all of them (new values) + { + f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + for y := byte(1); y < 255; y++ { + exp := getChunk(15, ^y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + } + } +} + +// TestSnappyDetection tests that we fail to open a snappy database and vice versa +func TestSnappyDetection(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) + // Open with snappy + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + for x := byte(0); x < 0xff; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + f.Close() + } + // Open without snappy + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false) + if _, err = f.Retrieve(0); err == nil { + f.Close() + t.Fatalf("expected empty table") + } + } + + // Open with snappy + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + // There should be 255 items + if _, err = f.Retrieve(0xfe); err != nil { + f.Close() + t.Fatalf("expected no error, got %v", err) + } + } + +} +func assertFileSize(f string, size int64) error { + stat, err := os.Stat(f) + if err != nil { + return err + } + if stat.Size() != size { + return fmt.Errorf("error, expected size %d, got %d", size, stat.Size()) + } + return nil + +} + +// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data, +// the index is repaired +func TestFreezerRepairDanglingIndex(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) + + { // Fill a table and close it + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 9 times : 150 bytes + for x := byte(0); x < 9; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + // The last item should be there + if _, err = f.Retrieve(f.items - 1); err != nil { + f.Close() + t.Fatal(err) + } + f.Close() + // File sizes should be 45, 45, 45 : items[3, 3, 3) + } + // Crop third file + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.2.rdat", fname)) + // Truncate third file: 45 ,45, 20 + { + if err := assertFileSize(fileToCrop, 45); err != nil { + t.Fatal(err) + } + file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + file.Truncate(20) + file.Close() + } + // Open db it again + // It should restore the file(s) to + // 45, 45, 15 + // with 3+3+1 items + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + if f.items != 7 { + f.Close() + t.Fatalf("expected %d items, got %d", 7, f.items) + } + if err := assertFileSize(fileToCrop, 15); err != nil { + t.Fatal(err) + } + } +} + +func TestFreezerTruncate(t *testing.T) { + + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("truncation-%d", rand.Uint64()) + + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + for x := byte(0); x < 30; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + // The last item should be there + if _, err = f.Retrieve(f.items - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + // Reopen, truncate + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + f.truncate(10) // 150 bytes + if f.items != 10 { + t.Fatalf("expected %d items, got %d", 10, f.items) + } + // 45, 45, 45, 15 -- bytes should be 15 + if f.headBytes != 15 { + t.Fatalf("expected %d bytes, got %d", 15, f.headBytes) + } + + } + +} + +// TestFreezerRepairFirstFile tests a head file with the very first item only half-written. +// That will rewind the index, and _should_ truncate the head file +func TestFreezerRepairFirstFile(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 80 bytes, splitting out into two files + f.Append(0, getChunk(40, 0xFF)) + f.Append(1, getChunk(40, 0xEE)) + // The last item should be there + if _, err = f.Retrieve(f.items - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + // Truncate the file in half + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.1.rdat", fname)) + { + if err := assertFileSize(fileToCrop, 40); err != nil { + t.Fatal(err) + } + file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + file.Truncate(20) + file.Close() + } + // Reopen + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + if f.items != 1 { + f.Close() + t.Fatalf("expected %d items, got %d", 0, f.items) + } + // Write 40 bytes + f.Append(1, getChunk(40, 0xDD)) + f.Close() + // Should have been truncated down to zero and then 40 written + if err := assertFileSize(fileToCrop, 40); err != nil { + t.Fatal(err) + } + } +} + +// TestFreezerReadAndTruncate tests: +// - we have a table open +// - do some reads, so files are open in readonly +// - truncate so those files are 'removed' +// - check that we did not keep the rdonly file descriptors +func TestFreezerReadAndTruncate(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + for x := byte(0); x < 30; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + // The last item should be there + if _, err = f.Retrieve(f.items - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + // Reopen and read all files + { + f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + if err != nil { + t.Fatal(err) + } + if f.items != 30 { + f.Close() + t.Fatalf("expected %d items, got %d", 0, f.items) + } + for y := byte(0); y < 30; y++ { + f.Retrieve(uint64(y)) + } + // Now, truncate back to zero + f.truncate(0) + // Write the data again + for x := byte(0); x < 30; x++ { + data := getChunk(15, ^x) + if err := f.Append(uint64(x), data); err != nil { + t.Fatalf("error %v", err) + } + } + f.Close() + } +} + +// TODO (?) +// - test that if we remove several head-files, aswell as data last data-file, +// the index is truncated accordingly +// Right now, the freezer would fail on these conditions: +// 1. have data files d0, d1, d2, d3 +// 2. remove d2,d3 +// +// However, all 'normal' failure modes arising due to failing to sync() or save a file should be +// handled already, and the case described above can only (?) happen if an external process/user +// deletes files from the filesystem. -- cgit v1.2.3 From b6cac42e9ffc0b19a1e70416db85593f1cb0d30c Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 14 Mar 2019 14:59:47 +0800 Subject: core/rawdb: add file lock for freezer --- core/rawdb/database.go | 22 ++++++++++++++++++++-- core/rawdb/freezer.go | 19 ++++++++++++++++--- 2 files changed, 36 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 0f994c3fd..cd1048cbc 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -17,6 +17,8 @@ package rawdb import ( + "fmt" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/leveldb" "github.com/ethereum/go-ethereum/ethdb/memorydb" @@ -25,7 +27,23 @@ import ( // freezerdb is a databse wrapper that enabled freezer data retrievals. type freezerdb struct { ethdb.KeyValueStore - ethdb.Ancienter + ethdb.AncientStore +} + +// Close implements io.Closer, closing both the fast key-value store as well as +// the slow ancient tables. +func (frdb *freezerdb) Close() error { + var errs []error + if err := frdb.KeyValueStore.Close(); err != nil { + errs = append(errs, err) + } + if err := frdb.AncientStore.Close(); err != nil { + errs = append(errs, err) + } + if len(errs) != 0 { + return fmt.Errorf("%v", errs) + } + return nil } // nofreezedb is a database wrapper that disables freezer data retrievals. @@ -58,7 +76,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st return &freezerdb{ KeyValueStore: db, - Ancienter: frdb, + AncientStore: frdb, }, nil } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 4f227e3b7..07df4c759 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math" + "path/filepath" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/prometheus/tsdb/fileutil" ) // errUnknownTable is returned if the user attempts to read from a table that is @@ -57,8 +59,9 @@ const ( // reserving it for go-ethereum. This would also reduce the memory requirements // of Geth, and thus also GC overhead. type freezer struct { - tables map[string]*freezerTable // Data tables for storing everything - frozen uint64 // Number of blocks already frozen + tables map[string]*freezerTable // Data tables for storing everything + frozen uint64 // Number of blocks already frozen + instanceLock fileutil.Releaser // File-system lock to prevent double opens } // newFreezer creates a chain freezer that moves ancient chain data into @@ -69,9 +72,14 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) ) + lock, _, err := fileutil.Flock(filepath.Join(datadir, "LOCK")) + if err != nil { + return nil, err + } // Open all the supported data tables freezer := &freezer{ - tables: make(map[string]*freezerTable), + tables: make(map[string]*freezerTable), + instanceLock: lock, } for _, name := range []string{"hashes", "headers", "bodies", "receipts", "diffs"} { table, err := newTable(datadir, name, readMeter, writeMeter) @@ -79,6 +87,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { for _, table := range freezer.tables { table.Close() } + lock.Release() return nil, err } freezer.tables[name] = table @@ -95,6 +104,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { for _, table := range freezer.tables { table.Close() } + lock.Release() return nil, err } } @@ -109,6 +119,9 @@ func (f *freezer) Close() error { errs = append(errs, err) } } + if err := f.instanceLock.Release(); err != nil { + errs = append(errs, err) + } if errs != nil { return fmt.Errorf("%v", errs) } -- cgit v1.2.3 From 80469bea0cc6dbfae749d944094a7c2357dc050d Mon Sep 17 00:00:00 2001 From: gary rong Date: Thu, 25 Apr 2019 22:59:48 +0800 Subject: all: integrate the freezer with fast sync * all: freezer style syncing core, eth, les, light: clean up freezer relative APIs core, eth, les, trie, ethdb, light: clean a bit core, eth, les, light: add unit tests core, light: rewrite setHead function core, eth: fix downloader unit tests core: add receipt chain insertion test core: use constant instead of hardcoding table name core: fix rollback core: fix setHead core/rawdb: remove canonical block first and then iterate side chain core/rawdb, ethdb: add hasAncient interface eth/downloader: calculate ancient limit via cht first core, eth, ethdb: lots of fixes * eth/downloader: print ancient disable log only for fast sync --- core/blockchain.go | 383 ++++++++++++++++++++++++++++++--------- core/blockchain_test.go | 211 ++++++++++++++++++--- core/headerchain.go | 64 ++++--- core/rawdb/accessors_chain.go | 178 ++++++++++++------ core/rawdb/accessors_indexes.go | 12 +- core/rawdb/accessors_metadata.go | 12 +- core/rawdb/database.go | 31 +++- core/rawdb/freezer.go | 208 ++++++++++++++------- core/rawdb/freezer_table.go | 10 +- core/rawdb/schema.go | 17 ++ core/rawdb/table.go | 32 +++- core/state/database.go | 2 +- core/state/sync.go | 2 +- 13 files changed, 895 insertions(+), 267 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 61b809319..4ac2c3a44 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -63,6 +63,8 @@ var ( blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + + errInsertionInterrupted = errors.New("insertion is interrupted") ) const ( @@ -138,7 +140,6 @@ type BlockChain struct { chainmu sync.RWMutex // blockchain insertion lock - checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -161,8 +162,9 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config - badBlocks *lru.Cache // Bad block cache - shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + badBlocks *lru.Cache // Bad block cache + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. } // NewBlockChain returns a fully initialised block chain using information @@ -216,6 +218,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if err := bc.loadLastState(); err != nil { return nil, err } + if frozen, err := bc.db.Ancients(); err == nil && frozen >= 1 { + var ( + needRewind bool + low uint64 + ) + // The head full block may be rolled back to a very low height due to + // blockchain repair. If the head full block is even lower than the ancient + // chain, truncate the ancient store. + fullBlock := bc.CurrentBlock() + if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 { + needRewind = true + low = fullBlock.NumberU64() + } + // In fast sync, it may happen that ancient data has been written to the + // ancient store, but the LastFastBlock has not been updated, truncate the + // extra data here. + fastBlock := bc.CurrentFastBlock() + if fastBlock != nil && fastBlock.NumberU64() < frozen-1 { + needRewind = true + if fastBlock.NumberU64() < low || low == 0 { + low = fastBlock.NumberU64() + } + } + if needRewind { + var hashes []common.Hash + previous := bc.CurrentHeader().Number.Uint64() + for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ { + hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i)) + } + bc.Rollback(hashes) + log.Warn("Truncate ancient chain", "from", previous, "to", low) + } + } // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { @@ -267,6 +302,7 @@ func (bc *BlockChain) loadLastState() error { if err := bc.repair(¤tBlock); err != nil { return err } + rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) @@ -312,12 +348,55 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.chainmu.Lock() defer bc.chainmu.Unlock() - // Rewind the header chain, deleting all block bodies until then - delFn := func(db ethdb.Writer, hash common.Hash, num uint64) { - rawdb.DeleteBody(db, hash, num) + updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { + // Rewind the block chain, ensuring we don't end up with a stateless head block + if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { + newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + if newHeadBlock == nil { + newHeadBlock = bc.genesisBlock + } else { + if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil { + // Rewound state missing, rolled back to before pivot, reset to genesis + newHeadBlock = bc.genesisBlock + } + } + rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + bc.currentBlock.Store(newHeadBlock) + } + + // Rewind the fast block in a simpleton way to the target head + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { + newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + // If either blocks reached nil, reset to the genesis state + if newHeadFastBlock == nil { + newHeadFastBlock = bc.genesisBlock + } + rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + bc.currentFastBlock.Store(newHeadFastBlock) + } } - bc.hc.SetHead(head, delFn) - currentHeader := bc.hc.CurrentHeader() + + // Rewind the header chain, deleting all block bodies until then + delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { + // Ignore the error here since light client won't hit this path + frozen, _ := bc.db.Ancients() + if num+1 <= frozen { + // Truncate all relative data(header, total difficulty, body, receipt + // and canonical hash) from ancient store. + bc.db.TruncateAncients(num + 1) + + // Remove the hash <-> number mapping from the active store. + rawdb.DeleteHeaderNumber(db, hash) + } else { + // Remove relative body and receipts from the active store. + // The header, total difficulty and canonical hash will be + // removed in the hc.SetHead function. + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + } + // Todo(rjl493456442) txlookup, bloombits, etc + } + bc.hc.SetHead(head, updateFn, delFn) // Clear out any stale content from the caches bc.bodyCache.Purge() @@ -326,33 +405,6 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.blockCache.Purge() bc.futureBlocks.Purge() - // Rewind the block chain, ensuring we don't end up with a stateless head block - if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() { - bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - } - if currentBlock := bc.CurrentBlock(); currentBlock != nil { - if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { - // Rewound state missing, rolled back to before pivot, reset to genesis - bc.currentBlock.Store(bc.genesisBlock) - } - } - // Rewind the fast block in a simpleton way to the target head - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() { - bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - } - // If either blocks reached nil, reset to the genesis state - if currentBlock := bc.CurrentBlock(); currentBlock == nil { - bc.currentBlock.Store(bc.genesisBlock) - } - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil { - bc.currentFastBlock.Store(bc.genesisBlock) - } - currentBlock := bc.CurrentBlock() - currentFastBlock := bc.CurrentFastBlock() - - rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) - rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()) - return bc.loadLastState() } @@ -780,96 +832,259 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { } if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) - bc.currentFastBlock.Store(newFastBlock) rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) + bc.currentFastBlock.Store(newFastBlock) } if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) - bc.currentBlock.Store(newBlock) rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) + bc.currentBlock.Store(newBlock) } } + // Truncate ancient data which exceeds the current header. + // + // Notably, it can happen that system crashes without truncating the ancient data + // but the head indicator has been updated in the active store. Regarding this issue, + // system will self recovery by truncating the extra data during the setup phase. + if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil { + log.Crit("Truncate ancient store failed", "err", err) + } +} + +// truncateAncient rewinds the blockchain to the specified header and deletes all +// data in the ancient store that exceeds the specified header. +func (bc *BlockChain) truncateAncient(head uint64) error { + frozen, err := bc.db.Ancients() + if err != nil { + return err + } + // Short circuit if there is no data to truncate in ancient store. + if frozen <= head+1 { + return nil + } + // Truncate all the data in the freezer beyond the specified head + if err := bc.db.TruncateAncients(head + 1); err != nil { + return err + } + // Clear out any stale content from the caches + bc.hc.headerCache.Purge() + bc.hc.tdCache.Purge() + bc.hc.numberCache.Purge() + + // Clear out any stale content from the caches + bc.bodyCache.Purge() + bc.bodyRLPCache.Purge() + bc.receiptsCache.Purge() + bc.blockCache.Purge() + bc.futureBlocks.Purge() + + log.Info("Rewind ancient data", "number", head) + return nil } // InsertReceiptChain attempts to complete an already existing header chain with // transaction and receipt data. -func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { +func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { bc.wg.Add(1) defer bc.wg.Done() + var ( + ancientBlocks, liveBlocks types.Blocks + ancientReceipts, liveReceipts []types.Receipts + ) // Do a sanity check that the provided chain is actually ordered and linked - for i := 1; i < len(blockChain); i++ { - if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() { - log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(), - "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash()) - return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(), - blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4]) + for i := 0; i < len(blockChain); i++ { + if i != 0 { + if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() { + log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(), + "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash()) + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(), + blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4]) + } + } + if blockChain[i].NumberU64() <= ancientLimit { + ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i]) + } else { + liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i]) } } var ( stats = struct{ processed, ignored int32 }{} start = time.Now() - bytes = 0 - batch = bc.db.NewBatch() + size = 0 ) - for i, block := range blockChain { - receipts := receiptChain[i] - // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil + // updateHead updates the head fast sync block if the inserted blocks are better + // and returns a indicator whether the inserted blocks are canonical. + updateHead := func(head *types.Block) bool { + var isCanonical bool + bc.chainmu.Lock() + if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case + currentFastBlock := bc.CurrentFastBlock() + if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { + rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) + bc.currentFastBlock.Store(head) + isCanonical = true + } + } + bc.chainmu.Unlock() + return isCanonical + } + // writeAncient writes blockchain and corresponding receipt chain into ancient store. + // + // this function only accepts canonical chain data. All side chain will be reverted + // eventually. + writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + var ( + previous = bc.CurrentFastBlock() + batch = bc.db.NewBatch() + ) + // If any error occurs before updating the head or we are inserting a side chain, + // all the data written this time wll be rolled back. + defer func() { + if previous != nil { + if err := bc.truncateAncient(previous.NumberU64()); err != nil { + log.Crit("Truncate ancient store failed", "err", err) + } + } + }() + for i, block := range blockChain { + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, errInsertionInterrupted + } + // Short circuit insertion if it is required(used in testing only) + if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) { + return i, errors.New("insertion is terminated for testing purpose") + } + // Short circuit if the owner header is unknown + if !bc.HasHeader(block.Hash(), block.NumberU64()) { + return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + } + // Compute all the non-consensus fields of the receipts + if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { + return i, fmt.Errorf("failed to derive receipts data: %v", err) + } + // Initialize freezer with genesis block first + if frozen, err := bc.db.Ancients(); err == nil && frozen == 0 && block.NumberU64() == 1 { + genesisBlock := rawdb.ReadBlock(bc.db, rawdb.ReadCanonicalHash(bc.db, 0), 0) + size += rawdb.WriteAncientBlock(bc.db, genesisBlock, nil, genesisBlock.Difficulty()) + } + // Flush data into ancient store. + size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) + rawdb.WriteTxLookupEntries(batch, block) + + stats.processed++ } - // Short circuit if the owner header is unknown - if !bc.HasHeader(block.Hash(), block.NumberU64()) { - return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + // Flush all tx-lookup index data. + size += batch.ValueSize() + if err := batch.Write(); err != nil { + return 0, err } - // Skip if the entire data is already known - if bc.HasBlock(block.Hash(), block.NumberU64()) { - stats.ignored++ - continue + batch.Reset() + + // Sync the ancient store explicitly to ensure all data has been flushed to disk. + if err := bc.db.Sync(); err != nil { + return 0, err } - // Compute all the non-consensus fields of the receipts - if err := receipts.DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { - return i, fmt.Errorf("failed to derive receipts data: %v", err) + if !updateHead(blockChain[len(blockChain)-1]) { + return 0, errors.New("side blocks can't be accepted as the ancient chain data") } - // Write all the data out into the database - rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) - rawdb.WriteTxLookupEntries(batch, block) + previous = nil // disable rollback explicitly - stats.processed++ + // Remove the ancient data from the active store + cleanGenesis := len(blockChain) > 0 && blockChain[0].NumberU64() == 1 + if cleanGenesis { + // Migrate genesis block to ancient store too. + rawdb.DeleteBlockWithoutNumber(batch, rawdb.ReadCanonicalHash(bc.db, 0), 0) + rawdb.DeleteCanonicalHash(batch, 0) + } + // Wipe out canonical block data. + for _, block := range blockChain { + rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) + rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + } + if err := batch.Write(); err != nil { + return 0, err + } + batch.Reset() + // Wipe out side chain too. + for _, block := range blockChain { + for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { + rawdb.DeleteBlock(batch, hash, block.NumberU64()) + } + } + if err := batch.Write(); err != nil { + return 0, err + } + return 0, nil + } + // writeLive writes blockchain and corresponding receipt chain into active store. + writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + batch := bc.db.NewBatch() + for i, block := range blockChain { + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, errInsertionInterrupted + } + // Short circuit if the owner header is unknown + if !bc.HasHeader(block.Hash(), block.NumberU64()) { + return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + } + if bc.HasBlock(block.Hash(), block.NumberU64()) { + stats.ignored++ + continue + } + // Compute all the non-consensus fields of the receipts + if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { + return i, fmt.Errorf("failed to derive receipts data: %v", err) + } + // Write all the data out into the database + rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) + rawdb.WriteTxLookupEntries(batch, block) - if batch.ValueSize() >= ethdb.IdealBatchSize { + stats.processed++ + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return 0, err + } + size += batch.ValueSize() + batch.Reset() + } + } + if batch.ValueSize() > 0 { + size += batch.ValueSize() if err := batch.Write(); err != nil { return 0, err } - bytes += batch.ValueSize() - batch.Reset() } + updateHead(blockChain[len(blockChain)-1]) + return 0, nil } - if batch.ValueSize() > 0 { - bytes += batch.ValueSize() - if err := batch.Write(); err != nil { - return 0, err + // Write downloaded chain data and corresponding receipt chain data. + if len(ancientBlocks) > 0 { + if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { + if err == errInsertionInterrupted { + return 0, nil + } + return n, err } } - - // Update the head fast sync block if better - bc.chainmu.Lock() - head := blockChain[len(blockChain)-1] - if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case - currentFastBlock := bc.CurrentFastBlock() - if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { - rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) - bc.currentFastBlock.Store(head) + if len(liveBlocks) > 0 { + if n, err := writeLive(liveBlocks, liveReceipts); err != nil { + if err == errInsertionInterrupted { + return 0, nil + } + return n, err } } - bc.chainmu.Unlock() + head := blockChain[len(blockChain)-1] context := []interface{}{ "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)), - "size", common.StorageSize(bytes), + "size", common.StorageSize(size), } if stats.ignored > 0 { context = append(context, []interface{}{"ignored", stats.ignored}...) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 70e3207f5..7b1a9a54f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -18,8 +18,10 @@ package core import ( "fmt" + "io/ioutil" "math/big" "math/rand" + "os" "sync" "testing" "time" @@ -33,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/params" ) @@ -639,7 +640,27 @@ func TestFastVsFullChains(t *testing.T) { if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } - if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { + if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + // Freezer style fast import the chain. + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } // Iterate over all chain data components, and cross reference @@ -647,26 +668,35 @@ func TestFastVsFullChains(t *testing.T) { num, hash := blocks[i].NumberU64(), blocks[i].Hash() if ftd, atd := fast.GetTdByHash(hash), archive.GetTdByHash(hash); ftd.Cmp(atd) != 0 { - t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd) + t.Errorf("block #%d [%x]: td mismatch: fastdb %v, archivedb %v", num, hash, ftd, atd) + } + if antd, artd := ancient.GetTdByHash(hash), archive.GetTdByHash(hash); antd.Cmp(artd) != 0 { + t.Errorf("block #%d [%x]: td mismatch: ancientdb %v, archivedb %v", num, hash, antd, artd) } if fheader, aheader := fast.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); fheader.Hash() != aheader.Hash() { - t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader) + t.Errorf("block #%d [%x]: header mismatch: fastdb %v, archivedb %v", num, hash, fheader, aheader) + } + if anheader, arheader := ancient.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); anheader.Hash() != arheader.Hash() { + t.Errorf("block #%d [%x]: header mismatch: ancientdb %v, archivedb %v", num, hash, anheader, arheader) } - if fblock, ablock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash); fblock.Hash() != ablock.Hash() { - t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock) - } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) { - t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions()) - } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) { - t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles()) + if fblock, arblock, anblock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash), ancient.GetBlockByHash(hash); fblock.Hash() != arblock.Hash() || anblock.Hash() != arblock.Hash() { + t.Errorf("block #%d [%x]: block mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock, anblock, arblock) + } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(arblock.Transactions()) || types.DeriveSha(anblock.Transactions()) != types.DeriveSha(arblock.Transactions()) { + t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions()) + } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) { + t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles()) } - if freceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), archive.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) { - t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts) + if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) { + t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts) } } // Check that the canonical chains are the same between the databases for i := 0; i < len(blocks)+1; i++ { if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash { - t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash) + t.Errorf("block #%d: canonical hash mismatch: fastdb %v, archivedb %v", i, fhash, ahash) + } + if anhash, arhash := rawdb.ReadCanonicalHash(ancientDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); anhash != arhash { + t.Errorf("block #%d: canonical hash mismatch: ancientdb %v, archivedb %v", i, anhash, arhash) } } } @@ -730,13 +760,40 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } - if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { + if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } assert(t, "fast", fast, height, height, 0) fast.Rollback(remove) assert(t, "fast", fast, height/2, height/2, 0) + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + assert(t, "ancient", ancient, height, height, 0) + ancient.Rollback(remove) + assert(t, "ancient", ancient, height/2, height/2, 0) + if frozen, err := ancientDb.Ancients(); err != nil || frozen != height/2+1 { + t.Fatalf("failed to truncate ancient store, want %v, have %v", height/2+1, frozen) + } + // Import the chain as a light node and ensure all pointers are updated lightDb := rawdb.NewMemoryDatabase() gspec.MustCommit(lightDb) @@ -918,7 +975,7 @@ func TestLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = memorydb.New() + db = rawdb.NewMemoryDatabase() // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") @@ -1040,7 +1097,7 @@ func TestSideLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = memorydb.New() + db = rawdb.NewMemoryDatabase() // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") @@ -1564,6 +1621,122 @@ func TestLargeReorgTrieGC(t *testing.T) { } } +func TestBlockchainRecovery(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + ) + height := uint64(1024) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) + + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + ancient.Stop() + + // Destroy head fast block manually + midBlock := blocks[len(blocks)/2] + rawdb.WriteHeadFastBlockHash(ancientDb, midBlock.Hash()) + + // Reopen broken blockchain again + ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + if num := ancient.CurrentBlock().NumberU64(); num != 0 { + t.Errorf("head block mismatch: have #%v, want #%v", num, 0) + } + if num := ancient.CurrentFastBlock().NumberU64(); num != midBlock.NumberU64() { + t.Errorf("head fast-block mismatch: have #%v, want #%v", num, midBlock.NumberU64()) + } + if num := ancient.CurrentHeader().Number.Uint64(); num != midBlock.NumberU64() { + t.Errorf("head header mismatch: have #%v, want #%v", num, midBlock.NumberU64()) + } +} + +func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + ) + height := uint64(1024) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) + + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + // Abort ancient receipt chain insertion deliberately + ancient.terminateInsert = func(hash common.Hash, number uint64) bool { + if number == blocks[len(blocks)/2].NumberU64() { + return true + } + return false + } + previousFastBlock := ancient.CurrentFastBlock() + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() { + t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64()) + } + if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 { + t.Fatalf("failed to truncate ancient data") + } + ancient.terminateInsert = nil + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() { + t.Fatalf("failed to insert ancient recept chain after rollback") + } +} + // Tests that importing a very large side fork, which is larger than the canon chain, // but where the difficulty per block is kept low: this means that it will not // overtake the 'canon' chain until after it's passed canon by about 200 blocks. @@ -1764,7 +1937,7 @@ func testInsertKnownChainData(t *testing.T, typ string) { if err != nil { return err } - _, err = chain.InsertReceiptChain(blocks, receipts) + _, err = chain.InsertReceiptChain(blocks, receipts, 0) return err } asserter = func(t *testing.T, block *types.Block) { @@ -2019,14 +2192,12 @@ func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) { numTxs = 1000 numBlocks = 1 ) - recipientFn := func(nonce uint64) common.Address { return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce)) } dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } @@ -2044,7 +2215,6 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) { dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } @@ -2062,6 +2232,5 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) { dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } diff --git a/core/headerchain.go b/core/headerchain.go index d0c1987fb..659141fd1 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -453,33 +453,56 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { hc.currentHeaderHash = head.Hash() } -// DeleteCallback is a callback function that is called by SetHead before -// each header is deleted. -type DeleteCallback func(ethdb.Writer, common.Hash, uint64) +type ( + // UpdateHeadBlocksCallback is a callback function that is called by SetHead + // before head header is updated. + UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header) + + // DeleteBlockContentCallback is a callback function that is called by SetHead + // before each header is deleted. + DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64) +) // SetHead rewinds the local chain to a new head. Everything above the new head // will be deleted and the new one set. -func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { - height := uint64(0) - - if hdr := hc.CurrentHeader(); hdr != nil { - height = hdr.Number.Uint64() - } - batch := hc.chainDb.NewBatch() +func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) { + var ( + parentHash common.Hash + batch = hc.chainDb.NewBatch() + ) for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { - hash := hdr.Hash() - num := hdr.Number.Uint64() + hash, num := hdr.Hash(), hdr.Number.Uint64() + + // Rewind block chain to new head. + parent := hc.GetHeader(hdr.ParentHash, num-1) + if parent == nil { + parent = hc.genesisHeader + } + parentHash = hdr.ParentHash + // Notably, since geth has the possibility for setting the head to a low + // height which is even lower than ancient head. + // In order to ensure that the head is always no higher than the data in + // the database(ancient store or active store), we need to update head + // first then remove the relative data from the database. + // + // Update head first(head fast block, head full block) before deleting the data. + if updateFn != nil { + updateFn(hc.chainDb, parent) + } + // Update head header then. + rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash) + + // Remove the relative data from the database. if delFn != nil { delFn(batch, hash, num) } + // Rewind header chain to new head. rawdb.DeleteHeader(batch, hash, num) rawdb.DeleteTd(batch, hash, num) + rawdb.DeleteCanonicalHash(batch, num) - hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1)) - } - // Roll back the canonical chain numbering - for i := height; i > head; i-- { - rawdb.DeleteCanonicalHash(batch, i) + hc.currentHeader.Store(parent) + hc.currentHeaderHash = parentHash } batch.Write() @@ -487,13 +510,6 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { hc.headerCache.Purge() hc.tdCache.Purge() hc.numberCache.Purge() - - if hc.CurrentHeader() == nil { - hc.currentHeader.Store(hc.genesisHeader) - } - hc.currentHeaderHash = hc.CurrentHeader().Hash() - - rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash) } // SetGenesis sets a new genesis block header for the chain diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 103f18f78..681e6e917 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -30,10 +30,17 @@ import ( ) // ReadCanonicalHash retrieves the hash assigned to a canonical block number. -func ReadCanonicalHash(db ethdb.AncientReader, number uint64) common.Hash { - data, _ := db.Ancient("hashes", number) +func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { + data, _ := db.Ancient(freezerHashTable, number) if len(data) == 0 { data, _ = db.Get(headerHashKey(number)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerHashTable, number) + } } if len(data) == 0 { return common.Hash{} @@ -42,29 +49,28 @@ func ReadCanonicalHash(db ethdb.AncientReader, number uint64) common.Hash { } // WriteCanonicalHash stores the hash assigned to a canonical block number. -func WriteCanonicalHash(db ethdb.Writer, hash common.Hash, number uint64) { +func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil { log.Crit("Failed to store number to hash mapping", "err", err) } } // DeleteCanonicalHash removes the number to hash canonical mapping. -func DeleteCanonicalHash(db ethdb.Writer, number uint64) { +func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { if err := db.Delete(headerHashKey(number)); err != nil { log.Crit("Failed to delete number to hash mapping", "err", err) } } -// readAllHashes retrieves all the hashes assigned to blocks at a certain heights, +// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, // both canonical and reorged forks included. -// -// This method is a helper for the chain reader. It should never be exposed to the -// outside world. -func readAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { +func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { prefix := headerKeyPrefix(number) hashes := make([]common.Hash, 0, 1) it := db.NewIteratorWithPrefix(prefix) + defer it.Release() + for it.Next() { if key := it.Key(); len(key) == len(prefix)+32 { hashes = append(hashes, common.BytesToHash(key[len(key)-32:])) @@ -74,7 +80,7 @@ func readAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { } // ReadHeaderNumber returns the header number assigned to a hash. -func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 { +func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) if len(data) != 8 { return nil @@ -83,8 +89,15 @@ func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 { return &number } +// DeleteHeaderNumber removes hash to number mapping. +func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(headerNumberKey(hash)); err != nil { + log.Crit("Failed to delete hash to number mapping", "err", err) + } +} + // ReadHeadHeaderHash retrieves the hash of the current canonical head header. -func ReadHeadHeaderHash(db ethdb.Reader) common.Hash { +func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headHeaderKey) if len(data) == 0 { return common.Hash{} @@ -93,14 +106,14 @@ func ReadHeadHeaderHash(db ethdb.Reader) common.Hash { } // WriteHeadHeaderHash stores the hash of the current canonical head header. -func WriteHeadHeaderHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headHeaderKey, hash.Bytes()); err != nil { log.Crit("Failed to store last header's hash", "err", err) } } // ReadHeadBlockHash retrieves the hash of the current canonical head block. -func ReadHeadBlockHash(db ethdb.Reader) common.Hash { +func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headBlockKey) if len(data) == 0 { return common.Hash{} @@ -109,14 +122,14 @@ func ReadHeadBlockHash(db ethdb.Reader) common.Hash { } // WriteHeadBlockHash stores the head block's hash. -func WriteHeadBlockHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headBlockKey, hash.Bytes()); err != nil { log.Crit("Failed to store last block's hash", "err", err) } } // ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block. -func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash { +func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headFastBlockKey) if len(data) == 0 { return common.Hash{} @@ -125,7 +138,7 @@ func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash { } // WriteHeadFastBlockHash stores the hash of the current fast-sync head block. -func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil { log.Crit("Failed to store last fast block's hash", "err", err) } @@ -133,7 +146,7 @@ func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) { // ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow // reporting correct numbers across restarts. -func ReadFastTrieProgress(db ethdb.Reader) uint64 { +func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 { data, _ := db.Get(fastTrieProgressKey) if len(data) == 0 { return 0 @@ -143,24 +156,31 @@ func ReadFastTrieProgress(db ethdb.Reader) uint64 { // WriteFastTrieProgress stores the fast sync trie process counter to support // retrieving it across restarts. -func WriteFastTrieProgress(db ethdb.Writer, count uint64) { +func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) { if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil { log.Crit("Failed to store fast sync trie progress", "err", err) } } // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. -func ReadHeaderRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("headers", number) +func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerHeaderTable, number) if len(data) == 0 { data, _ = db.Get(headerKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerHeaderTable, number) + } } return data } // HasHeader verifies the existence of a block header corresponding to the hash. -func HasHeader(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(headerKey(number, hash)); !has || err != nil { @@ -170,7 +190,7 @@ func HasHeader(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadHeader retrieves the block header corresponding to the hash. -func ReadHeader(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Header { +func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header { data := ReadHeaderRLP(db, hash, number) if len(data) == 0 { return nil @@ -185,7 +205,7 @@ func ReadHeader(db ethdb.AncientReader, hash common.Hash, number uint64) *types. // WriteHeader stores a block header into the database and also stores the hash- // to-number mapping. -func WriteHeader(db ethdb.Writer, header *types.Header) { +func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) { // Write the hash -> number mapping var ( hash = header.Hash() @@ -208,7 +228,7 @@ func WriteHeader(db ethdb.Writer, header *types.Header) { } // DeleteHeader removes all block header data associated with a hash. -func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { deleteHeaderWithoutNumber(db, hash, number) if err := db.Delete(headerNumberKey(hash)); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) @@ -217,31 +237,38 @@ func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) { // deleteHeaderWithoutNumber removes only the block header but does not remove // the hash to number mapping. -func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { +func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(headerKey(number, hash)); err != nil { log.Crit("Failed to delete header", "err", err) } } // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. -func ReadBodyRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("bodies", number) +func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerBodiesTable, number) if len(data) == 0 { data, _ = db.Get(blockBodyKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerBodiesTable, number) + } } return data } // WriteBodyRLP stores an RLP encoded block body into the database. -func WriteBodyRLP(db ethdb.Writer, hash common.Hash, number uint64, rlp rlp.RawValue) { +func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp rlp.RawValue) { if err := db.Put(blockBodyKey(number, hash), rlp); err != nil { log.Crit("Failed to store block body", "err", err) } } // HasBody verifies the existence of a block body corresponding to the hash. -func HasBody(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil { @@ -251,7 +278,7 @@ func HasBody(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadBody retrieves the block body corresponding to the hash. -func ReadBody(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Body { +func ReadBody(db ethdb.Reader, hash common.Hash, number uint64) *types.Body { data := ReadBodyRLP(db, hash, number) if len(data) == 0 { return nil @@ -265,7 +292,7 @@ func ReadBody(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Bo } // WriteBody stores a block body into the database. -func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Body) { +func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *types.Body) { data, err := rlp.EncodeToBytes(body) if err != nil { log.Crit("Failed to RLP encode body", "err", err) @@ -274,23 +301,30 @@ func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Bod } // DeleteBody removes all block body data associated with a hash. -func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(blockBodyKey(number, hash)); err != nil { log.Crit("Failed to delete block body", "err", err) } } // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. -func ReadTdRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("diffs", number) +func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerDifficultyTable, number) if len(data) == 0 { data, _ = db.Get(headerTDKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerDifficultyTable, number) + } } return data } // ReadTd retrieves a block's total difficulty corresponding to the hash. -func ReadTd(db ethdb.AncientReader, hash common.Hash, number uint64) *big.Int { +func ReadTd(db ethdb.Reader, hash common.Hash, number uint64) *big.Int { data := ReadTdRLP(db, hash, number) if len(data) == 0 { return nil @@ -304,7 +338,7 @@ func ReadTd(db ethdb.AncientReader, hash common.Hash, number uint64) *big.Int { } // WriteTd stores the total difficulty of a block into the database. -func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) { +func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) { data, err := rlp.EncodeToBytes(td) if err != nil { log.Crit("Failed to RLP encode block total difficulty", "err", err) @@ -315,7 +349,7 @@ func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) { } // DeleteTd removes all block total difficulty data associated with a hash. -func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(headerTDKey(number, hash)); err != nil { log.Crit("Failed to delete block total difficulty", "err", err) } @@ -323,8 +357,8 @@ func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { // HasReceipts verifies the existence of all the transaction receipts belonging // to a block. -func HasReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { @@ -334,10 +368,17 @@ func HasReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. -func ReadReceiptsRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("receipts", number) +func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerReceiptTable, number) if len(data) == 0 { data, _ = db.Get(blockReceiptsKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerReceiptTable, number) + } } return data } @@ -345,7 +386,7 @@ func ReadReceiptsRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rl // ReadRawReceipts retrieves all the transaction receipts belonging to a block. // The receipt metadata fields are not guaranteed to be populated, so they // should not be used. Use ReadReceipts instead if the metadata is needed. -func ReadRawReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) types.Receipts { +func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts { // Retrieve the flattened receipt slice data := ReadReceiptsRLP(db, hash, number) if len(data) == 0 { @@ -371,7 +412,7 @@ func ReadRawReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) ty // The current implementation populates these metadata fields by reading the receipts' // corresponding block body, so if the block body is not found it will return nil even // if the receipt itself is stored. -func ReadReceipts(db ethdb.AncientReader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { +func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { // We're deriving many fields from the block body, retrieve beside the receipt receipts := ReadRawReceipts(db, hash, number) if receipts == nil { @@ -390,7 +431,7 @@ func ReadReceipts(db ethdb.AncientReader, hash common.Hash, number uint64, confi } // WriteReceipts stores all the transaction receipts belonging to a block. -func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts types.Receipts) { +func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts types.Receipts) { // Convert the receipts into their storage form and serialize them storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) for i, receipt := range receipts { @@ -407,7 +448,7 @@ func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts ty } // DeleteReceipts removes all receipt data associated with a block hash. -func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(blockReceiptsKey(number, hash)); err != nil { log.Crit("Failed to delete block receipts", "err", err) } @@ -419,7 +460,7 @@ func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { // // Note, due to concurrent download of header and block body the header and thus // canonical hash can be stored in the database but the body data not (yet). -func ReadBlock(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Block { +func ReadBlock(db ethdb.Reader, hash common.Hash, number uint64) *types.Block { header := ReadHeader(db, hash, number) if header == nil { return nil @@ -432,22 +473,53 @@ func ReadBlock(db ethdb.AncientReader, hash common.Hash, number uint64) *types.B } // WriteBlock serializes a block into the database, header and body separately. -func WriteBlock(db ethdb.Writer, block *types.Block) { +func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) WriteHeader(db, block.Header()) } +// WriteAncientBlock writes entire block data into ancient store and returns the total written size. +func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int { + // Encode all block components to RLP format. + headerBlob, err := rlp.EncodeToBytes(block.Header()) + if err != nil { + log.Crit("Failed to RLP encode block header", "err", err) + } + bodyBlob, err := rlp.EncodeToBytes(block.Body()) + if err != nil { + log.Crit("Failed to RLP encode body", "err", err) + } + storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) + for i, receipt := range receipts { + storageReceipts[i] = (*types.ReceiptForStorage)(receipt) + } + receiptBlob, err := rlp.EncodeToBytes(storageReceipts) + if err != nil { + log.Crit("Failed to RLP encode block receipts", "err", err) + } + tdBlob, err := rlp.EncodeToBytes(td) + if err != nil { + log.Crit("Failed to RLP encode block total difficulty", "err", err) + } + // Write all blob to flatten files. + err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob) + if err != nil { + log.Crit("Failed to write block data to ancient store", "err", err) + } + return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength +} + // DeleteBlock removes all block data associated with a hash. -func DeleteBlock(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) DeleteHeader(db, hash, number) DeleteBody(db, hash, number) DeleteTd(db, hash, number) } -// deleteBlockWithoutNumber removes all block data associated with a hash, except +// DeleteBlockWithoutNumber removes all block data associated with a hash, except // the hash to number mapping. -func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) deleteHeaderWithoutNumber(db, hash, number) DeleteBody(db, hash, number) @@ -455,7 +527,7 @@ func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) } // FindCommonAncestor returns the last common ancestor of two block headers -func FindCommonAncestor(db ethdb.AncientReader, a, b *types.Header) *types.Header { +func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header { for bn := b.Number.Uint64(); a.Number.Uint64() > bn; { a = ReadHeader(db, a.ParentHash, a.Number.Uint64()-1) if a == nil { diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 666e3edff..ed1f1bca6 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -54,7 +54,7 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 { // WriteTxLookupEntries stores a positional metadata for every transaction from // a block, enabling hash based transaction and receipt lookups. -func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) { +func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { for _, tx := range block.Transactions() { if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil { log.Crit("Failed to store transaction lookup entry", "err", err) @@ -63,13 +63,13 @@ func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) { } // DeleteTxLookupEntry removes all transaction data associated with a hash. -func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) { +func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) { db.Delete(txLookupKey(hash)) } // ReadTransaction retrieves a specific transaction from the database, along with // its added positional metadata. -func ReadTransaction(db ethdb.AncientReader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { +func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { return nil, common.Hash{}, 0, 0 @@ -94,7 +94,7 @@ func ReadTransaction(db ethdb.AncientReader, hash common.Hash) (*types.Transacti // ReadReceipt retrieves a specific transaction receipt from the database, along with // its added positional metadata. -func ReadReceipt(db ethdb.AncientReader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { +func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { // Retrieve the context of the receipt based on the transaction hash blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { @@ -117,13 +117,13 @@ func ReadReceipt(db ethdb.AncientReader, hash common.Hash, config *params.ChainC // ReadBloomBits retrieves the compressed bloom bit vector belonging to the given // section and bit index from the. -func ReadBloomBits(db ethdb.Reader, bit uint, section uint64, head common.Hash) ([]byte, error) { +func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) { return db.Get(bloomBitsKey(bit, section, head)) } // WriteBloomBits stores the compressed bloom bits vector belonging to the given // section and bit index. -func WriteBloomBits(db ethdb.Writer, bit uint, section uint64, head common.Hash, bits []byte) { +func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) { if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil { log.Crit("Failed to store bloom bits", "err", err) } diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index 1361b0d73..f8d09fbdd 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -27,7 +27,7 @@ import ( ) // ReadDatabaseVersion retrieves the version number of the database. -func ReadDatabaseVersion(db ethdb.Reader) *uint64 { +func ReadDatabaseVersion(db ethdb.KeyValueReader) *uint64 { var version uint64 enc, _ := db.Get(databaseVerisionKey) @@ -42,7 +42,7 @@ func ReadDatabaseVersion(db ethdb.Reader) *uint64 { } // WriteDatabaseVersion stores the version number of the database -func WriteDatabaseVersion(db ethdb.Writer, version uint64) { +func WriteDatabaseVersion(db ethdb.KeyValueWriter, version uint64) { enc, err := rlp.EncodeToBytes(version) if err != nil { log.Crit("Failed to encode database version", "err", err) @@ -53,7 +53,7 @@ func WriteDatabaseVersion(db ethdb.Writer, version uint64) { } // ReadChainConfig retrieves the consensus settings based on the given genesis hash. -func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig { +func ReadChainConfig(db ethdb.KeyValueReader, hash common.Hash) *params.ChainConfig { data, _ := db.Get(configKey(hash)) if len(data) == 0 { return nil @@ -67,7 +67,7 @@ func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig { } // WriteChainConfig writes the chain config settings to the database. -func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig) { +func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.ChainConfig) { if cfg == nil { return } @@ -81,13 +81,13 @@ func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig } // ReadPreimage retrieves a single preimage of the provided hash. -func ReadPreimage(db ethdb.Reader, hash common.Hash) []byte { +func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { data, _ := db.Get(preimageKey(hash)) return data } // WritePreimages writes the provided set of preimages to the database. -func WritePreimages(db ethdb.Writer, preimages map[common.Hash][]byte) { +func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { for hash, preimage := range preimages { if err := db.Put(preimageKey(hash), preimage); err != nil { log.Crit("Failed to store trie preimage", "err", err) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index cd1048cbc..5a3c7f94b 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb/memorydb" ) -// freezerdb is a databse wrapper that enabled freezer data retrievals. +// freezerdb is a database wrapper that enabled freezer data retrievals. type freezerdb struct { ethdb.KeyValueStore ethdb.AncientStore @@ -51,9 +51,34 @@ type nofreezedb struct { ethdb.KeyValueStore } -// Frozen returns nil as we don't have a backing chain freezer. +// HasAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) HasAncient(kind string, number uint64) (bool, error) { + return false, errNotSupported +} + +// Ancient returns an error as we don't have a backing chain freezer. func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { - return nil, errOutOfBounds + return nil, errNotSupported +} + +// Ancients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Ancients() (uint64, error) { + return 0, errNotSupported +} + +// AppendAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + return errNotSupported +} + +// TruncateAncients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateAncients(items uint64) error { + return errNotSupported +} + +// Sync returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Sync() error { + return errNotSupported } // NewDatabase creates a high level database on top of a given key-value data diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 07df4c759..84426d8ae 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -31,9 +31,15 @@ import ( "github.com/prometheus/tsdb/fileutil" ) -// errUnknownTable is returned if the user attempts to read from a table that is -// not tracked by the freezer. -var errUnknownTable = errors.New("unknown table") +var ( + // errUnknownTable is returned if the user attempts to read from a table that is + // not tracked by the freezer. + errUnknownTable = errors.New("unknown table") + + // errOutOrderInsertion is returned if the user attempts to inject out-of-order + // binary blobs into the freezer. + errOutOrderInsertion = errors.New("the append operation is out-order") +) const ( // freezerRecheckInterval is the frequency to check the key-value database for @@ -44,7 +50,7 @@ const ( // freezerBlockGraduation is the number of confirmations a block must achieve // before it becomes elligible for chain freezing. This must exceed any chain // reorg depth, since the freezer also deletes all block siblings. - freezerBlockGraduation = 60000 + freezerBlockGraduation = 90000 // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. @@ -72,7 +78,9 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) ) - lock, _, err := fileutil.Flock(filepath.Join(datadir, "LOCK")) + // Leveldb uses LOCK as the filelock filename. To prevent the + // name collision, we use FLOCK as the lock name. + lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK")) if err != nil { return nil, err } @@ -81,7 +89,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { tables: make(map[string]*freezerTable), instanceLock: lock, } - for _, name := range []string{"hashes", "headers", "bodies", "receipts", "diffs"} { + for _, name := range []string{freezerHashTable, freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerDifficultyTable} { table, err := newTable(datadir, name, readMeter, writeMeter) if err != nil { for _, table := range freezer.tables { @@ -92,21 +100,12 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { } freezer.tables[name] = table } - // Truncate all data tables to the same length - freezer.frozen = math.MaxUint64 - for _, table := range freezer.tables { - if freezer.frozen > table.items { - freezer.frozen = table.items - } - } - for _, table := range freezer.tables { - if err := table.truncate(freezer.frozen); err != nil { - for _, table := range freezer.tables { - table.Close() - } - lock.Release() - return nil, err + if err := freezer.repair(); err != nil { + for _, table := range freezer.tables { + table.Close() } + lock.Release() + return nil, err } return freezer, nil } @@ -128,8 +127,91 @@ func (f *freezer) Close() error { return nil } +// HasAncient returns an indicator whether the specified ancient data exists +// in the freezer. +func (f *freezer) HasAncient(kind string, number uint64) (bool, error) { + if table := f.tables[kind]; table != nil { + return table.has(number), nil + } + return false, nil +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { + if table := f.tables[kind]; table != nil { + return table.Retrieve(number) + } + return nil, errUnknownTable +} + +// Ancients returns the length of the frozen items. +func (f *freezer) Ancients() (uint64, error) { + return atomic.LoadUint64(&f.frozen), nil +} + +// AppendAncient injects all binary blobs belong to block at the end of the +// append-only immutable table files. +// +// Notably, this function is lock free but kind of thread-safe. All out-of-order +// injection will be rejected. But if two injections with same number happen at +// the same time, we can get into the trouble. +func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) { + // Ensure the binary blobs we are appending is continuous with freezer. + if atomic.LoadUint64(&f.frozen) != number { + return errOutOrderInsertion + } + // Rollback all inserted data if any insertion below failed to ensure + // the tables won't out of sync. + defer func() { + if err != nil { + rerr := f.repair() + if rerr != nil { + log.Crit("Failed to repair freezer", "err", rerr) + } + log.Info("Append ancient failed", "number", number, "err", err) + } + }() + // Inject all the components into the relevant data tables + if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { + log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { + log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { + log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { + log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { + log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) + return err + } + atomic.AddUint64(&f.frozen, 1) // Only modify atomically + return nil +} + +// Truncate discards any recent data above the provided threshold number. +func (f *freezer) TruncateAncients(items uint64) error { + if atomic.LoadUint64(&f.frozen) <= items { + return nil + } + for _, table := range f.tables { + if err := table.truncate(items); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, items) + return nil +} + // sync flushes all data tables to disk. -func (f *freezer) sync() error { +func (f *freezer) Sync() error { var errs []error for _, table := range f.tables { if err := table.Sync(); err != nil { @@ -142,14 +224,6 @@ func (f *freezer) sync() error { return nil } -// Ancient retrieves an ancient binary blob from the append-only immutable files. -func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { - if table := f.tables[kind]; table != nil { - return table.Retrieve(number) - } - return nil, errUnknownTable -} - // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -159,25 +233,22 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { nfdb := &nofreezedb{KeyValueStore: db} for { - // Retrieve the freezing threshold. In theory we're interested only in full - // blocks post-sync, but that would keep the live database enormous during - // dast sync. By picking the fast block, we still get to deep freeze all the - // final immutable data without having to wait for sync to finish. - hash := ReadHeadFastBlockHash(nfdb) + // Retrieve the freezing threshold. + hash := ReadHeadBlockHash(nfdb) if hash == (common.Hash{}) { - log.Debug("Current fast block hash unavailable") // new chain, empty database + log.Debug("Current full block hash unavailable") // new chain, empty database time.Sleep(freezerRecheckInterval) continue } number := ReadHeaderNumber(nfdb, hash) switch { case number == nil: - log.Error("Current fast block number unavailable", "hash", hash) + log.Error("Current full block number unavailable", "hash", hash) time.Sleep(freezerRecheckInterval) continue case *number < freezerBlockGraduation: - log.Debug("Current fast block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) time.Sleep(freezerRecheckInterval) continue @@ -188,7 +259,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } head := ReadHeader(nfdb, hash, *number) if head == nil { - log.Error("Current fast block unavailable", "number", *number, "hash", hash) + log.Error("Current full block unavailable", "number", *number, "hash", hash) time.Sleep(freezerRecheckInterval) continue } @@ -229,48 +300,35 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) break } + log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) // Inject all the components into the relevant data tables - if err := f.tables["hashes"].Append(f.frozen, hash[:]); err != nil { - log.Error("Failed to deep freeze hash", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["headers"].Append(f.frozen, header); err != nil { - log.Error("Failed to deep freeze header", "number", f.frozen, "hash", hash, "err", err) + if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil { break } - if err := f.tables["bodies"].Append(f.frozen, body); err != nil { - log.Error("Failed to deep freeze body", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["receipts"].Append(f.frozen, receipts); err != nil { - log.Error("Failed to deep freeze receipts", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["diffs"].Append(f.frozen, td); err != nil { - log.Error("Failed to deep freeze difficulty", "number", f.frozen, "hash", hash, "err", err) - break - } - log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) - atomic.AddUint64(&f.frozen, 1) // Only modify atomically ancients = append(ancients, hash) } // Batch of blocks have been frozen, flush them before wiping from leveldb - if err := f.sync(); err != nil { + if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } // Wipe out all data from the active database batch := db.NewBatch() + for i := 0; i < len(ancients); i++ { + DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i)) + DeleteCanonicalHash(batch, first+uint64(i)) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete frozen canonical blocks", "err", err) + } + batch.Reset() + // Wipe out side chain also. for number := first; number < f.frozen; number++ { - for _, hash := range readAllHashes(db, number) { - if hash == ancients[number-first] { - deleteBlockWithoutNumber(batch, hash, number) - } else { - DeleteBlock(batch, hash, number) - } + for _, hash := range ReadAllHashes(db, number) { + DeleteBlock(batch, hash, number) } } if err := batch.Write(); err != nil { - log.Crit("Failed to delete frozen items", "err", err) + log.Crit("Failed to delete frozen side blocks", "err", err) } // Log something friendly for the user context := []interface{}{ @@ -287,3 +345,21 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } } } + +// repair truncates all data tables to the same length. +func (f *freezer) repair() error { + min := uint64(math.MaxUint64) + for _, table := range f.tables { + items := atomic.LoadUint64(&table.items) + if min > items { + min = items + } + } + for _, table := range f.tables { + if err := table.truncate(min); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, min) + return nil +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 313ac8b78..8e117301b 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -39,6 +39,9 @@ var ( // errOutOfBounds is returned if the item requested is not contained within the // freezer table. errOutOfBounds = errors.New("out of bounds") + + // errNotSupported is returned if the database doesn't support the required operation. + errNotSupported = errors.New("this operation is not supported") ) // indexEntry contains the number/id of the file that the data resides in, aswell as the @@ -451,7 +454,6 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - // Ensure the table and the item is accessible if t.index == nil || t.head == nil { return nil, errClosed @@ -483,6 +485,12 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { return snappy.Decode(nil, blob) } +// has returns an indicator whether the specified number data +// exists in the freezer table. +func (t *freezerTable) has(number uint64) bool { + return atomic.LoadUint64(&t.items) > number +} + // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 62b60e2f3..ebca17252 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,6 +63,23 @@ var ( preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) ) +const ( + // freezerHeaderTable indicates the name of the freezer header table. + freezerHeaderTable = "headers" + + // freezerHashTable indicates the name of the freezer canonical hash table. + freezerHashTable = "hashes" + + // freezerBodiesTable indicates the name of the freezer block body table. + freezerBodiesTable = "bodies" + + // freezerReceiptTable indicates the name of the freezer receipts table. + freezerReceiptTable = "receipts" + + // freezerDifficultyTable indicates the name of the freezer total difficulty table. + freezerDifficultyTable = "diffs" +) + // LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary // fields. type LegacyTxLookupEntry struct { diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 0b5e08b20..124678959 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -50,12 +50,42 @@ func (t *table) Get(key []byte) ([]byte, error) { return t.db.Get(append([]byte(t.prefix), key...)) } +// HasAncient is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) HasAncient(kind string, number uint64) (bool, error) { + return t.db.HasAncient(kind, number) +} + // Ancient is a noop passthrough that just forwards the request to the underlying // database. func (t *table) Ancient(kind string, number uint64) ([]byte, error) { return t.db.Ancient(kind, number) } +// Ancients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Ancients() (uint64, error) { + return t.db.Ancients() +} + +// AppendAncient is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + return t.db.AppendAncient(number, hash, header, body, receipts, td) +} + +// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) TruncateAncients(items uint64) error { + return t.db.TruncateAncients(items) +} + +// Sync is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Sync() error { + return t.db.Sync() +} + // Put inserts the given value into the database at a prefixed version of the // provided key. func (t *table) Put(key []byte, value []byte) error { @@ -163,6 +193,6 @@ func (b *tableBatch) Reset() { } // Replay replays the batch contents. -func (b *tableBatch) Replay(w ethdb.Writer) error { +func (b *tableBatch) Replay(w ethdb.KeyValueWriter) error { return b.batch.Replay(w) } diff --git a/core/state/database.go b/core/state/database.go index 8798b7380..ecc2c134d 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -93,7 +93,7 @@ type Trie interface { // If the trie does not contain a value for key, the returned proof contains all // nodes of the longest existing prefix of the key (at least the root), ending // with the node that proves the absence of the key. - Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error + Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error } // NewDatabase creates a backing store for state. The returned database is safe for diff --git a/core/state/sync.go b/core/state/sync.go index e4a08d293..ef7930527 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -26,7 +26,7 @@ import ( ) // NewStateSync create a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.Reader, bloom *trie.SyncBloom) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync { var syncer *trie.Sync callback := func(leaf []byte, parent common.Hash) error { var obj Account -- cgit v1.2.3 From 331de17e4d773803c0d507bd574361f777acdf57 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sun, 14 Apr 2019 21:25:32 +0200 Subject: core/rawdb: support starting offset for future deletion --- core/rawdb/freezer_table.go | 65 +++++++++++++++--- core/rawdb/freezer_table_test.go | 140 +++++++++++++++++++++++++++++++++------ 2 files changed, 175 insertions(+), 30 deletions(-) (limited to 'core') diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 8e117301b..93636a5ba 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -81,9 +81,14 @@ type freezerTable struct { head *os.File // File descriptor for the data head of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file + tailId uint32 // number of the earliest file index *os.File // File descriptor for the indexEntry file of the table - items uint64 // Number of items stored in the table + // In the case that old items are deleted (from the tail), we use itemOffset + // to count how many historic items have gone missing. + items uint64 // Number of items stored in the table (including items removed from tail) + itemOffset uint32 // Offset (number of discarded items) + headBytes uint32 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -164,10 +169,19 @@ func (t *freezerTable) repair() error { // Open the head file var ( + firstIndex indexEntry lastIndex indexEntry contentSize int64 contentExp int64 ) + // Read index zero, determine what file is the earliest + // and what item offset to use + t.index.ReadAt(buffer, 0) + firstIndex.unmarshalBinary(buffer) + + t.tailId = firstIndex.offset + t.itemOffset = firstIndex.filenum + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) @@ -225,7 +239,7 @@ func (t *freezerTable) repair() error { return err } // Update the item and byte counters and return - t.items = uint64(offsetsSize/indexEntrySize - 1) // last indexEntry points to the end of the data file + t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file t.headBytes = uint32(contentSize) t.headId = lastIndex.filenum @@ -245,7 +259,7 @@ func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) // Open all except head in RDONLY - for i := uint32(0); i < t.headId; i++ { + for i := uint32(t.tailId); i < t.headId; i++ { if _, err = t.openFile(i, os.O_RDONLY); err != nil { return err } @@ -259,7 +273,8 @@ func (t *freezerTable) preopen() (err error) { func (t *freezerTable) truncate(items uint64) error { t.lock.Lock() defer t.lock.Unlock() - // If out item count is corrent, don't do anything + + // If our item count is correct, don't do anything if atomic.LoadUint64(&t.items) <= items { return nil } @@ -275,6 +290,7 @@ func (t *freezerTable) truncate(items uint64) error { } var expected indexEntry expected.unmarshalBinary(buffer) + // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -290,7 +306,6 @@ func (t *freezerTable) truncate(items uint64) error { t.head = newHead atomic.StoreUint32(&t.headId, expected.filenum) } - if err := t.head.Truncate(int64(expected.offset)); err != nil { return err } @@ -330,9 +345,9 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { if f, exist = t.files[num]; !exist { var name string if t.noCompression { - name = fmt.Sprintf("%s.%d.rdat", t.name, num) + name = fmt.Sprintf("%s.%04d.rdat", t.name, num) } else { - name = fmt.Sprintf("%s.%d.cdat", t.name, num) + name = fmt.Sprintf("%s.%04d.cdat", t.name, num) } f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) if err != nil { @@ -376,11 +391,13 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.lock.RLock() // Ensure the table is still accessible if t.index == nil || t.head == nil { + t.lock.RUnlock() return errClosed } // Ensure only the next item can be written, nothing else if atomic.LoadUint64(&t.items) != item { - panic(fmt.Sprintf("appending unexpected item: want %d, have %d", t.items, item)) + t.lock.RUnlock() + return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) } // Encode the blob and write it into the data file if !t.noCompression { @@ -461,13 +478,20 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { if atomic.LoadUint64(&t.items) <= item { return nil, errOutOfBounds } + // Ensure the item was not deleted from the tail either + offset := atomic.LoadUint32(&t.itemOffset) + if uint64(offset) > item { + return nil, errOutOfBounds + } t.lock.RLock() - startOffset, endOffset, filenum, err := t.getBounds(item) + startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset)) if err != nil { + t.lock.RUnlock() return nil, err } dataFile, exist := t.files[filenum] if !exist { + t.lock.RUnlock() return nil, fmt.Errorf("missing data file %d", filenum) } // Retrieve the data itself, decompress and return @@ -499,3 +523,26 @@ func (t *freezerTable) Sync() error { } return t.head.Sync() } + +// printIndex is a debug print utility function for testing +func (t *freezerTable) printIndex() { + buf := make([]byte, indexEntrySize) + + fmt.Printf("|-----------------|\n") + fmt.Printf("| fileno | offset |\n") + fmt.Printf("|--------+--------|\n") + + for i := uint64(0); ; i++ { + if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { + break + } + var entry indexEntry + entry.unmarshalBinary(buf) + fmt.Printf("| %03d | %03d | \n", entry.filenum, entry.offset) + if i > 100 { + fmt.Printf(" ... \n") + break + } + } + fmt.Printf("|-----------------|\n") +} diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index d6ce6e93c..9a7eec505 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -19,12 +19,13 @@ package rawdb import ( "bytes" "fmt" - "github.com/ethereum/go-ethereum/metrics" "math/rand" "os" "path/filepath" "testing" "time" + + "github.com/ethereum/go-ethereum/metrics" ) func init() { @@ -32,10 +33,10 @@ func init() { } // Gets a chunk of data, filled with 'b' -func getChunk(size int, b byte) []byte { +func getChunk(size int, b int) []byte { data := make([]byte, size) for i, _ := range data { - data[i] = b + data[i] = byte(b) } return data } @@ -61,7 +62,7 @@ func TestFreezerBasics(t *testing.T) { } defer f.Close() // Write 15 bytes 255 times, results in 85 files - for x := byte(0); x < 255; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -74,7 +75,7 @@ func TestFreezerBasics(t *testing.T) { //db[1] = 010101010101010101010101010101 //db[2] = 020202020202020202020202020202 - for y := byte(0); y < 255; y++ { + for y := 0; y < 255; y++ { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -84,6 +85,11 @@ func TestFreezerBasics(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } } + // Check that we cannot read too far + _, err = f.Retrieve(uint64(255)) + if err != errOutOfBounds { + t.Fatal(err) + } } // TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between @@ -102,18 +108,15 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times, results in 85 files - for x := byte(0); x < 255; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) f.Close() f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) - if err != nil { - t.Fatal(err) - } } defer f.Close() - for y := byte(0); y < 255; y++ { + for y := 0; y < 255; y++ { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -142,7 +145,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -190,7 +193,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 0xff; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -223,7 +226,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 - for x := byte(1); x < 0xff; x++ { + for x := 1; x < 0xff; x++ { data := getChunk(15, ^x) f.Append(uint64(x), data) } @@ -232,7 +235,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // And if we open it, we should now be able to read all of them (new values) { f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) - for y := byte(1); y < 255; y++ { + for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -257,7 +260,7 @@ func TestSnappyDetection(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 0xff; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -308,7 +311,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { t.Fatal(err) } // Write 15 bytes 9 times : 150 bytes - for x := byte(0); x < 9; x++ { + for x := 0; x < 9; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -321,7 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { // File sizes should be 45, 45, 45 : items[3, 3, 3) } // Crop third file - fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.2.rdat", fname)) + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) // Truncate third file: 45 ,45, 20 { if err := assertFileSize(fileToCrop, 45); err != nil { @@ -365,7 +368,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } // Write 15 bytes 30 times - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -416,7 +419,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { f.Close() } // Truncate the file in half - fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.1.rdat", fname)) + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) { if err := assertFileSize(fileToCrop, 40); err != nil { t.Fatal(err) @@ -463,7 +466,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { t.Fatal(err) } // Write 15 bytes 30 times - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -489,7 +492,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { // Now, truncate back to zero f.truncate(0) // Write the data again - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, ^x) if err := f.Append(uint64(x), data); err != nil { t.Fatalf("error %v", err) @@ -499,6 +502,101 @@ func TestFreezerReadAndTruncate(t *testing.T) { } } +func TestOffset(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("offset-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + if err != nil { + t.Fatal(err) + } + // Write 6 x 20 bytes, splitting out into three files + f.Append(0, getChunk(20, 0xFF)) + f.Append(1, getChunk(20, 0xEE)) + + f.Append(2, getChunk(20, 0xdd)) + f.Append(3, getChunk(20, 0xcc)) + + f.Append(4, getChunk(20, 0xbb)) + f.Append(5, getChunk(20, 0xaa)) + f.printIndex() + f.Close() + } + // Now crop it. + { + // delete files 0 and 1 + for i := 0; i < 2; i++ { + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) + if err := os.Remove(p); err != nil { + t.Fatal(err) + } + } + // Read the index file + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) + indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + indexBuf := make([]byte, 7*indexEntrySize) + indexFile.Read(indexBuf) + + // Update the index file, so that we store + // [ file = 2, offset = 4 ] at index zero + + tailId := uint32(2) // First file is 2 + itemOffset := uint32(4) // We have removed four items + zeroIndex := indexEntry{ + offset: tailId, + filenum: itemOffset, + } + buf := zeroIndex.marshallBinary() + // Overwrite index zero + copy(indexBuf, buf) + // Remove the four next indices by overwriting + copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) + indexFile.WriteAt(indexBuf, 0) + // Need to truncate the moved index items + indexFile.Truncate(indexEntrySize * (1 + 2)) + indexFile.Close() + + } + // Now open again + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + if err != nil { + t.Fatal(err) + } + f.printIndex() + // It should allow writing item 6 + f.Append(6, getChunk(20, 0x99)) + + // It should be fine to fetch 4,5,6 + if got, err := f.Retrieve(4); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + if got, err := f.Retrieve(5); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + if got, err := f.Retrieve(6); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + + // It should error at 0, 1,2,3 + for i := 0; i < 4; i++ { + if _, err := f.Retrieve(uint64(i)); err == nil { + t.Fatal("expected err") + } + } + } +} + // TODO (?) // - test that if we remove several head-files, aswell as data last data-file, // the index is truncated accordingly -- cgit v1.2.3 From 42c746d6f405deb0c49d868dcc6e0afe279e19ab Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 3 May 2019 12:55:36 +0200 Subject: freezer: disable compression on hashes and difficulties (#14) * freezer: disable compression on hashes and difficulties * core/rawdb: address review concerns * core/rawdb: address review concerns --- core/rawdb/freezer.go | 4 ++-- core/rawdb/freezer_table.go | 6 +++--- core/rawdb/schema.go | 10 ++++++++++ 3 files changed, 15 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 84426d8ae..21a6055cd 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -89,8 +89,8 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { tables: make(map[string]*freezerTable), instanceLock: lock, } - for _, name := range []string{freezerHashTable, freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerDifficultyTable} { - table, err := newTable(datadir, name, readMeter, writeMeter) + for name, disableSnappy := range freezerNoSnappy { + table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 93636a5ba..d46597f73 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -97,9 +97,9 @@ type freezerTable struct { lock sync.RWMutex // Mutex protecting the data file descriptors } -// newTable opens a freezer table with default settings - 2G files and snappy compression -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, false) +// newTable opens a freezer table with default settings - 2G files +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) } // newCustomTable opens a freezer table, creating the data and index files if they are diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index ebca17252..a44a2c99f 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -80,6 +80,16 @@ const ( freezerDifficultyTable = "diffs" ) +// freezerNoSnappy configures whether compression is disabled for the ancient-tables. +// Hashes and difficulties don't compress well. +var freezerNoSnappy = map[string]bool{ + freezerHeaderTable: false, + freezerHashTable: true, + freezerBodiesTable: false, + freezerReceiptTable: false, + freezerDifficultyTable: true, +} + // LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary // fields. type LegacyTxLookupEntry struct { -- cgit v1.2.3 From 37d280da411eb649ce22ab69827ac5aacd46534b Mon Sep 17 00:00:00 2001 From: gary rong Date: Tue, 14 May 2019 22:07:44 +0800 Subject: core, cmd, vendor: fixes and database inspection tool (#15) * core, eth: some fixes for freezer * vendor, core/rawdb, cmd/geth: add db inspector * core, cmd/utils: check ancient store path forceily * cmd/geth, common, core/rawdb: a few fixes * cmd/geth: support windows file rename and fix rename error * core: support ancient plugin * core, cmd: streaming file copy * cmd, consensus, core, tests: keep genesis in leveldb * core: write txlookup during ancient init * core: bump database version --- core/blockchain.go | 105 +++++++++++++++++++++++------- core/blockchain_test.go | 70 +++++++++++--------- core/genesis.go | 17 +++++ core/headerchain.go | 11 +++- core/rawdb/accessors_chain.go | 26 +++++--- core/rawdb/accessors_metadata.go | 14 ++++ core/rawdb/database.go | 134 +++++++++++++++++++++++++++++++++++++++ core/rawdb/freezer.go | 21 ++++++ core/rawdb/freezer_table.go | 13 ++++ core/rawdb/schema.go | 3 + core/rawdb/table.go | 6 ++ 11 files changed, 353 insertions(+), 67 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 4ac2c3a44..651c67c5d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -93,7 +93,10 @@ const ( // - Version 6 // The following incompatible database changes were added: // * Transaction lookup information stores the corresponding block number instead of block hash - BlockChainVersion uint64 = 6 + // - Version 7 + // The following incompatible database changes were added: + // * Use freezer as the ancient database to maintain all ancient data + BlockChainVersion uint64 = 7 ) // CacheConfig contains the configuration values for the trie caching/pruning @@ -215,10 +218,35 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if bc.genesisBlock == nil { return nil, ErrNoGenesis } + // Initialize the chain with ancient data if it isn't empty. + if bc.empty() { + if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + for i := uint64(0); i < frozen; i++ { + // Inject hash<->number mapping. + hash := rawdb.ReadCanonicalHash(bc.db, i) + if hash == (common.Hash{}) { + return nil, errors.New("broken ancient database") + } + rawdb.WriteHeaderNumber(bc.db, hash, i) + + // Inject txlookup indexes. + block := rawdb.ReadBlock(bc.db, hash, i) + if block == nil { + return nil, errors.New("broken ancient database") + } + rawdb.WriteTxLookupEntries(bc.db, block) + } + hash := rawdb.ReadCanonicalHash(bc.db, frozen-1) + rawdb.WriteHeadHeaderHash(bc.db, hash) + rawdb.WriteHeadFastBlockHash(bc.db, hash) + + log.Info("Initialized chain with ancients", "number", frozen-1, "hash", hash) + } + } if err := bc.loadLastState(); err != nil { return nil, err } - if frozen, err := bc.db.Ancients(); err == nil && frozen >= 1 { + if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( needRewind bool low uint64 @@ -278,6 +306,20 @@ func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig } +// empty returns an indicator whether the blockchain is empty. +// Note, it's a special case that we connect a non-empty ancient +// database with an empty node, so that we can plugin the ancient +// into node seamlessly. +func (bc *BlockChain) empty() bool { + genesis := bc.genesisBlock.Hash() + for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} { + if hash != genesis { + return false + } + } + return true +} + // loadLastState loads the last known chain state from the database. This method // assumes that the chain manager mutex is held. func (bc *BlockChain) loadLastState() error { @@ -383,7 +425,9 @@ func (bc *BlockChain) SetHead(head uint64) error { if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - bc.db.TruncateAncients(num + 1) + if err := bc.db.TruncateAncients(num + 1); err != nil { + log.Crit("Failed to truncate ancient data", "number", num, "err", err) + } // Remove the hash <-> number mapping from the active store. rawdb.DeleteHeaderNumber(db, hash) @@ -948,6 +992,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } } }() + var deleted types.Blocks for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed if atomic.LoadInt32(&bc.procInterrupt) == 1 { @@ -961,16 +1006,38 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !bc.HasHeader(block.Hash(), block.NumberU64()) { return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) } - // Compute all the non-consensus fields of the receipts - if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { - return i, fmt.Errorf("failed to derive receipts data: %v", err) + var ( + start = time.Now() + logged = time.Now() + count int + ) + // Migrate all ancient blocks. This can happen if someone upgrades from Geth + // 1.8.x to 1.9.x mid-fast-sync. Perhaps we can get rid of this path in the + // long term. + for { + // We can ignore the error here since light client won't hit this code path. + frozen, _ := bc.db.Ancients() + if frozen >= block.NumberU64() { + break + } + h := rawdb.ReadCanonicalHash(bc.db, frozen) + b := rawdb.ReadBlock(bc.db, h, frozen) + size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, frozen, bc.chainConfig), rawdb.ReadTd(bc.db, h, frozen)) + count += 1 + + // Always keep genesis block in active database. + if b.NumberU64() != 0 { + deleted = append(deleted, b) + } + if time.Since(logged) > 8*time.Second { + log.Info("Migrating ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } } - // Initialize freezer with genesis block first - if frozen, err := bc.db.Ancients(); err == nil && frozen == 0 && block.NumberU64() == 1 { - genesisBlock := rawdb.ReadBlock(bc.db, rawdb.ReadCanonicalHash(bc.db, 0), 0) - size += rawdb.WriteAncientBlock(bc.db, genesisBlock, nil, genesisBlock.Difficulty()) + if count > 0 { + log.Info("Migrated ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) } - // Flush data into ancient store. + // Flush data into ancient database. size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) rawdb.WriteTxLookupEntries(batch, block) @@ -992,15 +1059,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } previous = nil // disable rollback explicitly - // Remove the ancient data from the active store - cleanGenesis := len(blockChain) > 0 && blockChain[0].NumberU64() == 1 - if cleanGenesis { - // Migrate genesis block to ancient store too. - rawdb.DeleteBlockWithoutNumber(batch, rawdb.ReadCanonicalHash(bc.db, 0), 0) - rawdb.DeleteCanonicalHash(batch, 0) - } // Wipe out canonical block data. - for _, block := range blockChain { + for _, block := range append(deleted, blockChain...) { rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) rawdb.DeleteCanonicalHash(batch, block.NumberU64()) } @@ -1008,8 +1068,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, err } batch.Reset() + // Wipe out side chain too. - for _, block := range blockChain { + for _, block := range append(deleted, blockChain...) { for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { rawdb.DeleteBlock(batch, hash, block.NumberU64()) } @@ -1035,10 +1096,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ stats.ignored++ continue } - // Compute all the non-consensus fields of the receipts - if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { - return i, fmt.Errorf("failed to derive receipts data: %v", err) - } // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 7b1a9a54f..09caf7e60 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -716,6 +716,20 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { height := uint64(1024) blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) + // makeDb creates a db instance for testing. + makeDb := func() (ethdb.Database, func()) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(dir) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(db) + return db, func() { os.RemoveAll(dir) } + } // Configure a subchain to roll back remove := []common.Hash{} for _, block := range blocks[height/2:] { @@ -734,9 +748,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { } } // Import the chain as an archive node and ensure all pointers are updated - archiveDb := rawdb.NewMemoryDatabase() - gspec.MustCommit(archiveDb) - + archiveDb, delfn := makeDb() + defer delfn() archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) @@ -748,8 +761,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { assert(t, "archive", archive, height/2, height/2, height/2) // Import the chain as a non-archive node and ensure all pointers are updated - fastDb := rawdb.NewMemoryDatabase() - gspec.MustCommit(fastDb) + fastDb, delfn := makeDb() + defer delfn() fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) defer fast.Stop() @@ -768,16 +781,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { assert(t, "fast", fast, height/2, height/2, 0) // Import the chain as a ancient-first node and ensure all pointers are updated - frdir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatalf("failed to create temp freezer dir: %v", err) - } - defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } - gspec.MustCommit(ancientDb) + ancientDb, delfn := makeDb() + defer delfn() ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) defer ancient.Stop() @@ -795,9 +800,8 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { } // Import the chain as a light node and ensure all pointers are updated - lightDb := rawdb.NewMemoryDatabase() - gspec.MustCommit(lightDb) - + lightDb, delfn := makeDb() + defer delfn() light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) @@ -1892,10 +1896,18 @@ func testInsertKnownChainData(t *testing.T, typ string) { b.SetCoinbase(common.Address{1}) b.OffsetTime(-9) // A higher difficulty }) - // Import the shared chain and the original canonical one - chaindb := rawdb.NewMemoryDatabase() + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(dir) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } new(Genesis).MustCommit(chaindb) + defer os.RemoveAll(dir) chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil) if err != nil { @@ -1992,18 +2004,16 @@ func testInsertKnownChainData(t *testing.T, typ string) { // The head shouldn't change. asserter(t, blocks3[len(blocks3)-1]) - if typ != "headers" { - // Rollback the heavier chain and re-insert the longer chain again - for i := 0; i < len(blocks3); i++ { - rollback = append(rollback, blocks3[i].Hash()) - } - chain.Rollback(rollback) + // Rollback the heavier chain and re-insert the longer chain again + for i := 0; i < len(blocks3); i++ { + rollback = append(rollback, blocks3[i].Hash()) + } + chain.Rollback(rollback) - if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil { - t.Fatalf("failed to insert chain data: %v", err) - } - asserter(t, blocks2[len(blocks2)-1]) + if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil { + t.Fatalf("failed to insert chain data: %v", err) } + asserter(t, blocks2[len(blocks2)-1]) } // getLongAndShortChains returns two chains, diff --git a/core/genesis.go b/core/genesis.go index 1f34a3a9e..830fb033b 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -170,6 +170,22 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, genesis *Genesis, constant return genesis.Config, block.Hash(), err } + // We have the genesis block in database(perhaps in ancient database) + // but the corresponding state is missing. + header := rawdb.ReadHeader(db, stored, 0) + if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0)); err != nil { + if genesis == nil { + genesis = DefaultGenesisBlock() + } + // Ensure the stored genesis matches with the given one. + hash := genesis.ToBlock(nil).Hash() + if hash != stored { + return genesis.Config, hash, &GenesisMismatchError{stored, hash} + } + block, err := genesis.Commit(db) + return genesis.Config, block.Hash(), err + } + // Check whether the genesis block is already written. if genesis != nil { hash := genesis.ToBlock(nil).Hash() @@ -277,6 +293,7 @@ func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) { rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), nil) rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) rawdb.WriteHeadBlockHash(db, block.Hash()) + rawdb.WriteHeadFastBlockHash(db, block.Hash()) rawdb.WriteHeadHeaderHash(db, block.Hash()) config := g.Config diff --git a/core/headerchain.go b/core/headerchain.go index 659141fd1..cdd64bb50 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -274,9 +274,14 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCa return i, errors.New("aborted") } // If the header's already known, skip it, otherwise store - if hc.HasHeader(header.Hash(), header.Number.Uint64()) { - stats.ignored++ - continue + hash := header.Hash() + if hc.HasHeader(hash, header.Number.Uint64()) { + externTd := hc.GetTd(hash, header.Number.Uint64()) + localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64()) + if externTd == nil || externTd.Cmp(localTd) <= 0 { + stats.ignored++ + continue + } } if err := writeHeader(header); err != nil { return i, err diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 681e6e917..fab7ca56c 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -89,7 +89,16 @@ func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { return &number } -// DeleteHeaderNumber removes hash to number mapping. +// WriteHeaderNumber stores the hash->number mapping. +func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + key := headerNumberKey(hash) + enc := encodeBlockNumber(number) + if err := db.Put(key, enc); err != nil { + log.Crit("Failed to store hash to number mapping", "err", err) + } +} + +// DeleteHeaderNumber removes hash->number mapping. func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Delete(headerNumberKey(hash)); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) @@ -206,22 +215,19 @@ func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header // WriteHeader stores a block header into the database and also stores the hash- // to-number mapping. func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) { - // Write the hash -> number mapping var ( - hash = header.Hash() - number = header.Number.Uint64() - encoded = encodeBlockNumber(number) + hash = header.Hash() + number = header.Number.Uint64() ) - key := headerNumberKey(hash) - if err := db.Put(key, encoded); err != nil { - log.Crit("Failed to store hash to number mapping", "err", err) - } + // Write the hash -> number mapping + WriteHeaderNumber(db, hash, number) + // Write the encoded header data, err := rlp.EncodeToBytes(header) if err != nil { log.Crit("Failed to RLP encode header", "err", err) } - key = headerKey(number, hash) + key := headerKey(number, hash) if err := db.Put(key, data); err != nil { log.Crit("Failed to store header", "err", err) } diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index f8d09fbdd..e6235f010 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -80,6 +80,20 @@ func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.Cha } } +// ReadAncientPath retrieves ancient database path which is recorded during the +// first node setup or forcibly changed by user. +func ReadAncientPath(db ethdb.KeyValueReader) string { + data, _ := db.Get(ancientKey) + return string(data) +} + +// WriteAncientPath writes ancient database path into the key-value database. +func WriteAncientPath(db ethdb.KeyValueWriter, path string) { + if err := db.Put(ancientKey, []byte(path)); err != nil { + log.Crit("Failed to store ancient path", "err", err) + } +} + // ReadPreimage retrieves a single preimage of the provided hash. func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { data, _ := db.Get(preimageKey(hash)) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 5a3c7f94b..016c6c909 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -17,11 +17,17 @@ package rawdb import ( + "bytes" "fmt" + "os" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/leveldb" "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" + "github.com/olekukonko/tablewriter" ) // freezerdb is a database wrapper that enabled freezer data retrievals. @@ -66,6 +72,11 @@ func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported } +// AncientSize returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) AncientSize(kind string) (uint64, error) { + return 0, errNotSupported +} + // AppendAncient returns an error as we don't have a backing chain freezer. func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { return errNotSupported @@ -140,5 +151,128 @@ func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer kvdb.Close() return nil, err } + // Make sure we always use the same ancient store. + // + // | stored == nil | stored != nil + // ----------------+------------------+---------------------- + // freezer == nil | non-freezer mode | ancient store missing + // freezer != nil | initialize | ensure consistency + stored := ReadAncientPath(kvdb) + if stored == "" && freezer != "" { + WriteAncientPath(kvdb, freezer) + } else if stored != freezer { + log.Warn("Ancient path mismatch", "stored", stored, "given", freezer) + log.Crit("Please use a consistent ancient path or migrate it via the command line tool `geth migrate-ancient`") + } return frdb, nil } + +// InspectDatabase traverses the entire database and checks the size +// of all different categories of data. +func InspectDatabase(db ethdb.Database) error { + it := db.NewIterator() + defer it.Release() + + var ( + count int64 + start = time.Now() + logged = time.Now() + + // Key-value store statistics + total common.StorageSize + headerSize common.StorageSize + bodySize common.StorageSize + receiptSize common.StorageSize + tdSize common.StorageSize + numHashPairing common.StorageSize + hashNumPairing common.StorageSize + trieSize common.StorageSize + txlookupSize common.StorageSize + preimageSize common.StorageSize + bloomBitsSize common.StorageSize + + // Ancient store statistics + ancientHeaders common.StorageSize + ancientBodies common.StorageSize + ancientReceipts common.StorageSize + ancientHashes common.StorageSize + ancientTds common.StorageSize + + // Les statistic + ChtTrieNodes common.StorageSize + BloomTrieNodes common.StorageSize + ) + // Inspect key-value database first. + for it.Next() { + var ( + key = it.Key() + size = common.StorageSize(len(key) + len(it.Value())) + ) + total += size + switch { + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix): + tdSize += size + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix): + numHashPairing += size + case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength): + headerSize += size + case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): + hashNumPairing += size + case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength): + bodySize += size + case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength): + receiptSize += size + case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): + txlookupSize += size + case bytes.HasPrefix(key, preimagePrefix) && len(key) == (len(preimagePrefix)+common.HashLength): + preimageSize += size + case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): + bloomBitsSize += size + case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength: + ChtTrieNodes += size + case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength: + BloomTrieNodes += size + case len(key) == common.HashLength: + trieSize += size + } + count += 1 + if count%1000 == 0 && time.Since(logged) > 8*time.Second { + log.Info("Inspecting database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + // Inspect append-only file store then. + ancients := []*common.StorageSize{&ancientHeaders, &ancientBodies, &ancientReceipts, &ancientHashes, &ancientTds} + for i, category := range []string{freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerHashTable, freezerDifficultyTable} { + if size, err := db.AncientSize(category); err == nil { + *ancients[i] += common.StorageSize(size) + total += common.StorageSize(size) + } + } + // Display the database statistic. + stats := [][]string{ + {"Key-Value store", "Headers", headerSize.String()}, + {"Key-Value store", "Bodies", bodySize.String()}, + {"Key-Value store", "Receipts", receiptSize.String()}, + {"Key-Value store", "Difficulties", tdSize.String()}, + {"Key-Value store", "Block number->hash", numHashPairing.String()}, + {"Key-Value store", "Block hash->number", hashNumPairing.String()}, + {"Key-Value store", "Transaction index", txlookupSize.String()}, + {"Key-Value store", "Bloombit index", bloomBitsSize.String()}, + {"Key-Value store", "Trie nodes", trieSize.String()}, + {"Key-Value store", "Trie preimages", preimageSize.String()}, + {"Ancient store", "Headers", ancientHeaders.String()}, + {"Ancient store", "Bodies", ancientBodies.String()}, + {"Ancient store", "Receipts", ancientReceipts.String()}, + {"Ancient store", "Difficulties", ancientTds.String()}, + {"Ancient store", "Block number->hash", ancientHashes.String()}, + {"Light client", "CHT trie nodes", ChtTrieNodes.String()}, + {"Light client", "Bloom trie nodes", BloomTrieNodes.String()}, + } + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"Database", "Category", "Size"}) + table.SetFooter([]string{"", "Total", total.String()}) + table.AppendBulk(stats) + table.Render() + return nil +} diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 21a6055cd..f3a6bbb8f 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math" + "os" "path/filepath" "sync/atomic" "time" @@ -39,6 +40,10 @@ var ( // errOutOrderInsertion is returned if the user attempts to inject out-of-order // binary blobs into the freezer. errOutOrderInsertion = errors.New("the append operation is out-order") + + // errSymlinkDatadir is returned if the ancient directory specified by user + // is a symbolic link. + errSymlinkDatadir = errors.New("symbolic link datadir is not supported") ) const ( @@ -78,6 +83,13 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) ) + // Ensure the datadir is not a symbolic link if it exists. + if info, err := os.Lstat(datadir); !os.IsNotExist(err) { + if info.Mode()&os.ModeSymlink != 0 { + log.Warn("Symbolic link ancient database is not supported", "path", datadir) + return nil, errSymlinkDatadir + } + } // Leveldb uses LOCK as the filelock filename. To prevent the // name collision, we use FLOCK as the lock name. lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK")) @@ -107,6 +119,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { lock.Release() return nil, err } + log.Info("Opened ancient database", "database", datadir) return freezer, nil } @@ -149,6 +162,14 @@ func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil } +// AncientSize returns the ancient size of the specified category. +func (f *freezer) AncientSize(kind string) (uint64, error) { + if table := f.tables[kind]; table != nil { + return table.size() + } + return 0, errUnknownTable +} + // AppendAncient injects all binary blobs belong to block at the end of the // append-only immutable table files. // diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d46597f73..ebccf7816 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -515,6 +515,19 @@ func (t *freezerTable) has(number uint64) bool { return atomic.LoadUint64(&t.items) > number } +// size returns the total data size in the freezer table. +func (t *freezerTable) size() (uint64, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + stat, err := t.index.Stat() + if err != nil { + return 0, err + } + total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size()) + return total, nil +} + // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index a44a2c99f..0d54a3c8b 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -41,6 +41,9 @@ var ( // fastTrieProgressKey tracks the number of trie entries imported during fast sync. fastTrieProgressKey = []byte("TrieSync") + // ancientKey tracks the absolute path of ancient database. + ancientKey = []byte("AncientPath") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 124678959..6610b7f5a 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -68,6 +68,12 @@ func (t *table) Ancients() (uint64, error) { return t.db.Ancients() } +// AncientSize is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) AncientSize(kind string) (uint64, error) { + return t.db.AncientSize(kind) +} + // AppendAncient is a noop passthrough that just forwards the request to the underlying // database. func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { -- cgit v1.2.3 From 536b3b416c6ff53ea11a0d29dcc351a6d7919901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 15 May 2019 14:33:33 +0300 Subject: cosensus, core, eth, params, trie: fixes + clique history cap --- core/blockchain.go | 17 +++++++++++- core/rawdb/database.go | 59 ++++++++++++++++++++++++++++------------ core/rawdb/freezer.go | 14 ++++------ core/rawdb/freezer_table_test.go | 2 +- 4 files changed, 64 insertions(+), 28 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 651c67c5d..7ab6806c2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -221,6 +221,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par // Initialize the chain with ancient data if it isn't empty. if bc.empty() { if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + var ( + start = time.Now() + logged time.Time + ) for i := uint64(0); i < frozen; i++ { // Inject hash<->number mapping. hash := rawdb.ReadCanonicalHash(bc.db, i) @@ -235,12 +239,23 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return nil, errors.New("broken ancient database") } rawdb.WriteTxLookupEntries(bc.db, block) + + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } } hash := rawdb.ReadCanonicalHash(bc.db, frozen-1) rawdb.WriteHeadHeaderHash(bc.db, hash) rawdb.WriteHeadFastBlockHash(bc.db, hash) - log.Info("Initialized chain with ancients", "number", frozen-1, "hash", hash) + // The first thing the node will do is reconstruct the verification data for + // the head block (ethash cache or clique voting snapshot). Might as well do + // it in advance. + bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true) + + log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) } } if err := bc.loadLastState(); err != nil { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 016c6c909..37379147c 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -179,17 +179,18 @@ func InspectDatabase(db ethdb.Database) error { logged = time.Now() // Key-value store statistics - total common.StorageSize - headerSize common.StorageSize - bodySize common.StorageSize - receiptSize common.StorageSize - tdSize common.StorageSize - numHashPairing common.StorageSize - hashNumPairing common.StorageSize - trieSize common.StorageSize - txlookupSize common.StorageSize - preimageSize common.StorageSize - bloomBitsSize common.StorageSize + total common.StorageSize + headerSize common.StorageSize + bodySize common.StorageSize + receiptSize common.StorageSize + tdSize common.StorageSize + numHashPairing common.StorageSize + hashNumPairing common.StorageSize + trieSize common.StorageSize + txlookupSize common.StorageSize + preimageSize common.StorageSize + bloomBitsSize common.StorageSize + cliqueSnapsSize common.StorageSize // Ancient store statistics ancientHeaders common.StorageSize @@ -199,8 +200,12 @@ func InspectDatabase(db ethdb.Database) error { ancientTds common.StorageSize // Les statistic - ChtTrieNodes common.StorageSize - BloomTrieNodes common.StorageSize + chtTrieNodes common.StorageSize + bloomTrieNodes common.StorageSize + + // Meta- and unaccounted data + metadata common.StorageSize + unaccounted common.StorageSize ) // Inspect key-value database first. for it.Next() { @@ -228,12 +233,26 @@ func InspectDatabase(db ethdb.Database) error { preimageSize += size case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): bloomBitsSize += size + case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength: + cliqueSnapsSize += size case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength: - ChtTrieNodes += size + chtTrieNodes += size case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength: - BloomTrieNodes += size + bloomTrieNodes += size case len(key) == common.HashLength: trieSize += size + default: + var accounted bool + for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey, ancientKey} { + if bytes.Equal(key, meta) { + metadata += size + accounted = true + break + } + } + if !accounted { + unaccounted += size + } } count += 1 if count%1000 == 0 && time.Since(logged) > 8*time.Second { @@ -261,18 +280,24 @@ func InspectDatabase(db ethdb.Database) error { {"Key-Value store", "Bloombit index", bloomBitsSize.String()}, {"Key-Value store", "Trie nodes", trieSize.String()}, {"Key-Value store", "Trie preimages", preimageSize.String()}, + {"Key-Value store", "Clique snapshots", cliqueSnapsSize.String()}, + {"Key-Value store", "Singleton metadata", metadata.String()}, {"Ancient store", "Headers", ancientHeaders.String()}, {"Ancient store", "Bodies", ancientBodies.String()}, {"Ancient store", "Receipts", ancientReceipts.String()}, {"Ancient store", "Difficulties", ancientTds.String()}, {"Ancient store", "Block number->hash", ancientHashes.String()}, - {"Light client", "CHT trie nodes", ChtTrieNodes.String()}, - {"Light client", "Bloom trie nodes", BloomTrieNodes.String()}, + {"Light client", "CHT trie nodes", chtTrieNodes.String()}, + {"Light client", "Bloom trie nodes", bloomTrieNodes.String()}, } table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Database", "Category", "Size"}) table.SetFooter([]string{"", "Total", total.String()}) table.AppendBulk(stats) table.Render() + + if unaccounted > 0 { + log.Error("Database contains unaccounted data", "size", unaccounted) + } return nil } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index f3a6bbb8f..67ed87d66 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" "github.com/prometheus/tsdb/fileutil" ) @@ -52,11 +53,6 @@ const ( // storage. freezerRecheckInterval = time.Minute - // freezerBlockGraduation is the number of confirmations a block must achieve - // before it becomes elligible for chain freezing. This must exceed any chain - // reorg depth, since the freezer also deletes all block siblings. - freezerBlockGraduation = 90000 - // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. freezerBatchLimit = 30000 @@ -268,12 +264,12 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { time.Sleep(freezerRecheckInterval) continue - case *number < freezerBlockGraduation: - log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) + case *number < params.ImmutabilityThreshold: + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) time.Sleep(freezerRecheckInterval) continue - case *number-freezerBlockGraduation <= f.frozen: + case *number-params.ImmutabilityThreshold <= f.frozen: log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) time.Sleep(freezerRecheckInterval) continue @@ -285,7 +281,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { continue } // Seems we have data ready to be frozen, process in usable batches - limit := *number - freezerBlockGraduation + limit := *number - params.ImmutabilityThreshold if limit-f.frozen > freezerBatchLimit { limit = f.frozen + freezerBatchLimit } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 9a7eec505..e63fb63a3 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -35,7 +35,7 @@ func init() { // Gets a chunk of data, filled with 'b' func getChunk(size int, b int) []byte { data := make([]byte, size) - for i, _ := range data { + for i := range data { data[i] = byte(b) } return data -- cgit v1.2.3 From 9eba3a9fff2f47f5e094c36a7c905380b0ac8b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 16 May 2019 14:30:11 +0300 Subject: cmd/geth, core/rawdb: seamless freezer consistency, friendly removedb --- core/blockchain_test.go | 5 +-- core/rawdb/accessors_metadata.go | 14 ------- core/rawdb/database.go | 80 +++++++++++++++++++++++++++++++++------- core/rawdb/freezer_table.go | 2 +- core/rawdb/schema.go | 3 -- 5 files changed, 68 insertions(+), 36 deletions(-) (limited to 'core') diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 09caf7e60..8dfcda6d4 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1717,10 +1717,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { } // Abort ancient receipt chain insertion deliberately ancient.terminateInsert = func(hash common.Hash, number uint64) bool { - if number == blocks[len(blocks)/2].NumberU64() { - return true - } - return false + return number == blocks[len(blocks)/2].NumberU64() } previousFastBlock := ancient.CurrentFastBlock() if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index e6235f010..f8d09fbdd 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -80,20 +80,6 @@ func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.Cha } } -// ReadAncientPath retrieves ancient database path which is recorded during the -// first node setup or forcibly changed by user. -func ReadAncientPath(db ethdb.KeyValueReader) string { - data, _ := db.Get(ancientKey) - return string(data) -} - -// WriteAncientPath writes ancient database path into the key-value database. -func WriteAncientPath(db ethdb.KeyValueWriter, path string) { - if err := db.Put(ancientKey, []byte(path)); err != nil { - log.Crit("Failed to store ancient path", "err", err) - } -} - // ReadPreimage retrieves a single preimage of the provided hash. func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { data, _ := db.Get(preimageKey(hash)) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 37379147c..353b7dce6 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -18,6 +18,7 @@ package rawdb import ( "bytes" + "errors" "fmt" "os" "time" @@ -104,10 +105,74 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { // value data store with a freezer moving immutable chain segments into cold // storage. func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string) (ethdb.Database, error) { + // Create the idle freezer instance frdb, err := newFreezer(freezer, namespace) if err != nil { return nil, err } + // Since the freezer can be stored separately from the user's key-value database, + // there's a fairly high probability that the user requests invalid combinations + // of the freezer and database. Ensure that we don't shoot ourselves in the foot + // by serving up conflicting data, leading to both datastores getting corrupted. + // + // - If both the freezer and key-value store is empty (no genesis), we just + // initialized a new empty freezer, so everything's fine. + // - If the key-value store is empty, but the freezer is not, we need to make + // sure the user's genesis matches the freezer. That will be checked in the + // blockchain, since we don't have the genesis block here (nor should we at + // this point care, the key-value/freezer combo is valid). + // - If neither the key-value store nor the freezer is empty, cross validate + // the genesis hashes to make sure they are compatible. If they are, also + // ensure that there's no gap between the freezer and sunsequently leveldb. + // - If the key-value store is not empty, but the freezer is we might just be + // upgrading to the freezer release, or we might have had a small chain and + // not frozen anything yet. Ensure that no blocks are missing yet from the + // key-value store, since that would mean we already had an old freezer. + + // If the genesis hash is empty, we have a new key-value store, so nothing to + // validate in this method. If, however, the genesis hash is not nil, compare + // it to the freezer content. + if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 { + if frozen, _ := frdb.Ancients(); frozen > 0 { + // If the freezer already contains something, ensure that the genesis blocks + // match, otherwise we might mix up freezers across chains and destroy both + // the freezer and the key-value store. + if frgenesis, _ := frdb.Ancient(freezerHashTable, 0); !bytes.Equal(kvgenesis, frgenesis) { + return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) + } + // Key-value store and freezer belong to the same network. Ensure that they + // are contiguous, otherwise we might end up with a non-functional freezer. + if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 { + // Subsequent header after the freezer limit is missing from the database. + // Reject startup is the database has a more recent head. + if *ReadHeaderNumber(db, ReadHeadHeaderHash(db)) > frozen-1 { + return nil, fmt.Errorf("gap (#%d) in the chain between ancients and leveldb", frozen) + } + // Database contains only older data than the freezer, this happens if the + // state was wiped and reinited from an existing freezer. + } else { + // Key-value store continues where the freezer left off, all is fine. We might + // have duplicate blocks (crash after freezer write but before kay-value store + // deletion, but that's fine). + } + } else { + // If the freezer is empty, ensure nothing was moved yet from the key-value + // store, otherwise we'll end up missing data. We check block #1 to decide + // if we froze anything previously or not, but do take care of databases with + // only the genesis block. + if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) { + // Key-value store contains more data than the genesis block, make sure we + // didn't freeze anything yet. + if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 { + return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") + } + // Block #1 is still in the database, we're allowed to init a new feezer + } else { + // The head header is still the genesis, we're allowed to init a new feezer + } + } + } + // Freezer is consistent with the key-value database, permit combining the two go frdb.freeze(db) return &freezerdb{ @@ -151,19 +216,6 @@ func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer kvdb.Close() return nil, err } - // Make sure we always use the same ancient store. - // - // | stored == nil | stored != nil - // ----------------+------------------+---------------------- - // freezer == nil | non-freezer mode | ancient store missing - // freezer != nil | initialize | ensure consistency - stored := ReadAncientPath(kvdb) - if stored == "" && freezer != "" { - WriteAncientPath(kvdb, freezer) - } else if stored != freezer { - log.Warn("Ancient path mismatch", "stored", stored, "given", freezer) - log.Crit("Please use a consistent ancient path or migrate it via the command line tool `geth migrate-ancient`") - } return frdb, nil } @@ -243,7 +295,7 @@ func InspectDatabase(db ethdb.Database) error { trieSize += size default: var accounted bool - for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey, ancientKey} { + for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey} { if bytes.Equal(key, meta) { metadata += size accounted = true diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index ebccf7816..673a181e4 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -259,7 +259,7 @@ func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) // Open all except head in RDONLY - for i := uint32(t.tailId); i < t.headId; i++ { + for i := t.tailId; i < t.headId; i++ { if _, err = t.openFile(i, os.O_RDONLY); err != nil { return err } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 0d54a3c8b..a44a2c99f 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -41,9 +41,6 @@ var ( // fastTrieProgressKey tracks the number of trie entries imported during fast sync. fastTrieProgressKey = []byte("TrieSync") - // ancientKey tracks the absolute path of ancient database. - ancientKey = []byte("AncientPath") - // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td -- cgit v1.2.3