From 0ff35e170d1b913082313089d13e3e6d5490839b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 14 Jul 2017 19:39:53 +0300 Subject: core: remove redundant storage of transactions and receipts (#14801) * core: remove redundant storage of transactions and receipts * core, eth, internal: new transaction schema usage polishes * eth: implement upgrade mechanism for db deduplication * core, eth: drop old sequential key db upgrader * eth: close last iterator on successful db upgrage * core: prefix the lookup entries to make their purpose clearer --- core/blockchain.go | 33 +++----- core/blockchain_test.go | 8 +- core/database_util.go | 194 ++++++++++++++++++--------------------------- core/database_util_test.go | 74 ++--------------- 4 files changed, 98 insertions(+), 211 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 6772ea284..bb1c14f43 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -759,16 +759,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ log.Crit("Failed to write log blooms", "err", err) return } - if err := WriteTransactions(bc.chainDb, block); err != nil { - errs[index] = fmt.Errorf("failed to write individual transactions: %v", err) + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { + errs[index] = fmt.Errorf("failed to write lookup metadata: %v", err) atomic.AddInt32(&failed, 1) - log.Crit("Failed to write individual transactions", "err", err) - return - } - if err := WriteReceipts(bc.chainDb, receipts); err != nil { - errs[index] = fmt.Errorf("failed to write individual receipts: %v", err) - atomic.AddInt32(&failed, 1) - log.Crit("Failed to write individual receipts", "err", err) + log.Crit("Failed to write lookup metadata", "err", err) return } atomic.AddInt32(&stats.processed, 1) @@ -1002,12 +996,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) - // This puts transactions in a extra db for rpc - if err := WriteTransactions(bc.chainDb, block); err != nil { - return i, err - } - // store the receipts - if err := WriteReceipts(bc.chainDb, receipts); err != nil { + // Write the positional metadata for transaction and receipt lookups + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { return i, err } // Write map map bloom filters @@ -1167,16 +1157,12 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for _, block := range newChain { // insert the block in the canonical way, re-writing history bc.insert(block) - // write canonical receipts and transactions - if err := WriteTransactions(bc.chainDb, block); err != nil { - return err - } - receipts := GetBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64()) - // write receipts - if err := WriteReceipts(bc.chainDb, receipts); err != nil { + // write lookup entries for hash based transaction/receipt searches + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { return err } // Write map map bloom filters + receipts := GetBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64()) if err := WriteMipmapBloom(bc.chainDb, block.NumberU64(), receipts); err != nil { return err } @@ -1188,8 +1174,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // When transactions get deleted from the database that means the // receipts that were created in the fork must also be deleted for _, tx := range diff { - DeleteReceipt(bc.chainDb, tx.Hash()) - DeleteTransaction(bc.chainDb, tx.Hash()) + DeleteTxLookupEntry(bc.chainDb, tx.Hash()) } // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 371522ab7..5fa671e2b 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -806,8 +806,8 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil { t.Errorf("drop %d: tx %v found while shouldn't have been", i, txn) } - if GetReceipt(db, tx.Hash()) != nil { - t.Errorf("drop %d: receipt found while shouldn't have been", i) + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt != nil { + t.Errorf("drop %d: receipt %v found while shouldn't have been", i, rcpt) } } // added tx @@ -815,7 +815,7 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil { t.Errorf("add %d: expected tx to be found", i) } - if GetReceipt(db, tx.Hash()) == nil { + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt == nil { t.Errorf("add %d: expected receipt to be found", i) } } @@ -824,7 +824,7 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil { t.Errorf("share %d: expected tx to be found", i) } - if GetReceipt(db, tx.Hash()) == nil { + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt == nil { t.Errorf("share %d: expected receipt to be found", i) } } diff --git a/core/database_util.go b/core/database_util.go index b4a230c9c..697111394 100644 --- a/core/database_util.go +++ b/core/database_util.go @@ -45,24 +45,17 @@ var ( blockHashPrefix = []byte("H") // blockHashPrefix + hash -> num (uint64 big endian) bodyPrefix = []byte("b") // bodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts + lookupPrefix = []byte("l") // lookupPrefix + hash -> transaction/receipt lookup metadata preimagePrefix = "secure-key-" // preimagePrefix + hash -> preimage - txMetaSuffix = []byte{0x01} - receiptsPrefix = []byte("receipts-") - mipmapPre = []byte("mipmap-log-bloom-") MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000} configPrefix = []byte("ethereum-config-") // config prefix for the db - // used by old (non-sequential keys) db, now only used for conversion - oldBlockPrefix = []byte("block-") - oldHeaderSuffix = []byte("-header") - oldTdSuffix = []byte("-td") // headerPrefix + num (uint64 big endian) + hash + tdSuffix -> td - oldBodySuffix = []byte("-body") - oldBlockNumPrefix = []byte("block-num-") - oldBlockReceiptsPrefix = []byte("receipts-block-") - oldBlockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually] + // used by old db, now only used for conversion + oldReceiptsPrefix = []byte("receipts-") + oldTxMetaSuffix = []byte{0x01} ErrChainConfigNotFound = errors.New("ChainConfig not found") // general config not found error @@ -72,6 +65,14 @@ var ( preimageHitCounter = metrics.NewCounter("db/preimage/hits") ) +// txLookupEntry is a positional metadata to help looking up the data content of +// a transaction or receipt given only its hash. +type txLookupEntry struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 +} + // encodeBlockNumber encodes a block number as big endian uint64 func encodeBlockNumber(number uint64) []byte { enc := make([]byte, 8) @@ -83,10 +84,7 @@ func encodeBlockNumber(number uint64) []byte { func GetCanonicalHash(db ethdb.Database, number uint64) common.Hash { data, _ := db.Get(append(append(headerPrefix, encodeBlockNumber(number)...), numSuffix...)) if len(data) == 0 { - data, _ = db.Get(append(oldBlockNumPrefix, big.NewInt(int64(number)).Bytes()...)) - if len(data) == 0 { - return common.Hash{} - } + return common.Hash{} } return common.BytesToHash(data) } @@ -100,15 +98,7 @@ const missingNumber = uint64(0xffffffffffffffff) func GetBlockNumber(db ethdb.Database, hash common.Hash) uint64 { data, _ := db.Get(append(blockHashPrefix, hash.Bytes()...)) if len(data) != 8 { - data, _ := db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldHeaderSuffix...)) - if len(data) == 0 { - return missingNumber - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(data), header); err != nil { - log.Crit("Failed to decode block header", "err", err) - } - return header.Number.Uint64() + return missingNumber } return binary.BigEndian.Uint64(data) } @@ -151,9 +141,6 @@ func GetHeadFastBlockHash(db ethdb.Database) common.Hash { // if the header's not found. func GetHeaderRLP(db ethdb.Database, hash common.Hash, number uint64) rlp.RawValue { data, _ := db.Get(append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) - if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldHeaderSuffix...)) - } return data } @@ -175,9 +162,6 @@ func GetHeader(db ethdb.Database, hash common.Hash, number uint64) *types.Header // GetBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func GetBodyRLP(db ethdb.Database, hash common.Hash, number uint64) rlp.RawValue { data, _ := db.Get(append(append(bodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) - if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldBodySuffix...)) - } return data } @@ -201,10 +185,7 @@ func GetBody(db ethdb.Database, hash common.Hash, number uint64) *types.Body { func GetTd(db ethdb.Database, hash common.Hash, number uint64) *big.Int { data, _ := db.Get(append(append(append(headerPrefix, encodeBlockNumber(number)...), hash[:]...), tdSuffix...)) if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldTdSuffix...)) - if len(data) == 0 { - return nil - } + return nil } td := new(big.Int) if err := rlp.Decode(bytes.NewReader(data), td); err != nil { @@ -239,10 +220,7 @@ func GetBlock(db ethdb.Database, hash common.Hash, number uint64) *types.Block { func GetBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) types.Receipts { data, _ := db.Get(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash[:]...)) if len(data) == 0 { - data, _ = db.Get(append(oldBlockReceiptsPrefix, hash.Bytes()...)) - if len(data) == 0 { - return nil - } + return nil } storageReceipts := []*types.ReceiptForStorage{} if err := rlp.DecodeBytes(data, &storageReceipts); err != nil { @@ -256,10 +234,38 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) types. return receipts } +// GetTxLookupEntry retrieves the positional metadata associated with a transaction +// hash to allow retrieving the transaction or receipt by hash. +func GetTxLookupEntry(db ethdb.Database, hash common.Hash) (common.Hash, uint64, uint64) { + // Load the positional metadata from disk and bail if it fails + data, _ := db.Get(append(lookupPrefix, hash.Bytes()...)) + if len(data) == 0 { + return common.Hash{}, 0, 0 + } + // Parse and return the contents of the lookup entry + var entry txLookupEntry + if err := rlp.DecodeBytes(data, &entry); err != nil { + log.Error("Invalid lookup entry RLP", "hash", hash, "err", err) + return common.Hash{}, 0, 0 + } + return entry.BlockHash, entry.BlockIndex, entry.Index +} + // GetTransaction retrieves a specific transaction from the database, along with // its added positional metadata. func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { - // Retrieve the transaction itself from the database + // Retrieve the lookup metadata and resolve the transaction from the body + blockHash, blockNumber, txIndex := GetTxLookupEntry(db, hash) + + if blockHash != (common.Hash{}) { + body := GetBody(db, blockHash, blockNumber) + if body == nil || len(body.Transactions) <= int(txIndex) { + log.Error("Transaction referenced missing", "number", blockNumber, "hash", blockHash, "index", txIndex) + return nil, common.Hash{}, 0, 0 + } + return body.Transactions[txIndex], blockHash, blockNumber, txIndex + } + // Old transaction representation, load the transaction and it's metadata separately data, _ := db.Get(hash.Bytes()) if len(data) == 0 { return nil, common.Hash{}, 0, 0 @@ -269,33 +275,42 @@ func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, co return nil, common.Hash{}, 0, 0 } // Retrieve the blockchain positional metadata - data, _ = db.Get(append(hash.Bytes(), txMetaSuffix...)) + data, _ = db.Get(append(hash.Bytes(), oldTxMetaSuffix...)) if len(data) == 0 { return nil, common.Hash{}, 0, 0 } - var meta struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - } - if err := rlp.DecodeBytes(data, &meta); err != nil { + var entry txLookupEntry + if err := rlp.DecodeBytes(data, &entry); err != nil { return nil, common.Hash{}, 0, 0 } - return &tx, meta.BlockHash, meta.BlockIndex, meta.Index + return &tx, entry.BlockHash, entry.BlockIndex, entry.Index } -// GetReceipt returns a receipt by hash -func GetReceipt(db ethdb.Database, hash common.Hash) *types.Receipt { - data, _ := db.Get(append(receiptsPrefix, hash[:]...)) +// GetReceipt retrieves a specific transaction receipt from the database, along with +// its added positional metadata. +func GetReceipt(db ethdb.Database, hash common.Hash) (*types.Receipt, common.Hash, uint64, uint64) { + // Retrieve the lookup metadata and resolve the receipt from the receipts + blockHash, blockNumber, receiptIndex := GetTxLookupEntry(db, hash) + + if blockHash != (common.Hash{}) { + receipts := GetBlockReceipts(db, blockHash, blockNumber) + if len(receipts) <= int(receiptIndex) { + log.Error("Receipt refereced missing", "number", blockNumber, "hash", blockHash, "index", receiptIndex) + return nil, common.Hash{}, 0, 0 + } + return receipts[receiptIndex], blockHash, blockNumber, receiptIndex + } + // Old receipt representation, load the receipt and set an unknown metadata + data, _ := db.Get(append(oldReceiptsPrefix, hash[:]...)) if len(data) == 0 { - return nil + return nil, common.Hash{}, 0, 0 } var receipt types.ReceiptForStorage err := rlp.DecodeBytes(data, &receipt) if err != nil { log.Error("Invalid receipt RLP", "hash", hash, "err", err) } - return (*types.Receipt)(&receipt) + return (*types.Receipt)(&receipt), common.Hash{}, 0, 0 } // WriteCanonicalHash stores the canonical hash for the given block number. @@ -416,76 +431,29 @@ func WriteBlockReceipts(db ethdb.Database, hash common.Hash, number uint64, rece return nil } -// WriteTransactions stores the transactions associated with a specific block -// into the given database. Beside writing the transaction, the function also -// stores a metadata entry along with the transaction, detailing the position -// of this within the blockchain. -func WriteTransactions(db ethdb.Database, block *types.Block) error { +// WriteTxLookupEntries stores a positional metadata for every transaction from +// a block, enabling hash based transaction and receipt lookups. +func WriteTxLookupEntries(db ethdb.Database, block *types.Block) error { batch := db.NewBatch() - // Iterate over each transaction and encode it with its metadata + // Iterate over each transaction and encode its metadata for i, tx := range block.Transactions() { - // Encode and queue up the transaction for storage - data, err := rlp.EncodeToBytes(tx) - if err != nil { - return err - } - if err = batch.Put(tx.Hash().Bytes(), data); err != nil { - return err - } - // Encode and queue up the transaction metadata for storage - meta := struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - }{ + entry := txLookupEntry{ BlockHash: block.Hash(), BlockIndex: block.NumberU64(), Index: uint64(i), } - data, err = rlp.EncodeToBytes(meta) - if err != nil { - return err - } - if err := batch.Put(append(tx.Hash().Bytes(), txMetaSuffix...), data); err != nil { - return err - } - } - // Write the scheduled data into the database - if err := batch.Write(); err != nil { - log.Crit("Failed to store transactions", "err", err) - } - return nil -} - -// WriteReceipt stores a single transaction receipt into the database. -func WriteReceipt(db ethdb.Database, receipt *types.Receipt) error { - storageReceipt := (*types.ReceiptForStorage)(receipt) - data, err := rlp.EncodeToBytes(storageReceipt) - if err != nil { - return err - } - return db.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data) -} - -// WriteReceipts stores a batch of transaction receipts into the database. -func WriteReceipts(db ethdb.Database, receipts types.Receipts) error { - batch := db.NewBatch() - - // Iterate over all the receipts and queue them for database injection - for _, receipt := range receipts { - storageReceipt := (*types.ReceiptForStorage)(receipt) - data, err := rlp.EncodeToBytes(storageReceipt) + data, err := rlp.EncodeToBytes(entry) if err != nil { return err } - if err := batch.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data); err != nil { + if err := batch.Put(append(lookupPrefix, tx.Hash().Bytes()...), data); err != nil { return err } } // Write the scheduled data into the database if err := batch.Write(); err != nil { - log.Crit("Failed to store receipts", "err", err) + log.Crit("Failed to store lookup entries", "err", err) } return nil } @@ -524,15 +492,9 @@ func DeleteBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) { db.Delete(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) } -// DeleteTransaction removes all transaction data associated with a hash. -func DeleteTransaction(db ethdb.Database, hash common.Hash) { - db.Delete(hash.Bytes()) - db.Delete(append(hash.Bytes(), txMetaSuffix...)) -} - -// DeleteReceipt removes all receipt data associated with a transaction hash. -func DeleteReceipt(db ethdb.Database, hash common.Hash) { - db.Delete(append(receiptsPrefix, hash.Bytes()...)) +// DeleteTxLookupEntry removes all transaction data associated with a hash. +func DeleteTxLookupEntry(db ethdb.Database, hash common.Hash) { + db.Delete(append(lookupPrefix, hash.Bytes()...)) } // returns a formatted MIP mapped key by adding prefix, canonical number and level diff --git a/core/database_util_test.go b/core/database_util_test.go index 9f16b660a..e9a6df97b 100644 --- a/core/database_util_test.go +++ b/core/database_util_test.go @@ -290,8 +290,8 @@ func TestHeadStorage(t *testing.T) { } } -// Tests that transactions and associated metadata can be stored and retrieved. -func TestTransactionStorage(t *testing.T) { +// Tests that positional lookup metadata can be stored and retrieved. +func TestLookupStorage(t *testing.T) { db, _ := ethdb.NewMemDatabase() tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), big.NewInt(111), big.NewInt(1111), big.NewInt(11111), []byte{0x11, 0x11, 0x11}) @@ -308,7 +308,10 @@ func TestTransactionStorage(t *testing.T) { } } // Insert all the transactions into the database, and verify contents - if err := WriteTransactions(db, block); err != nil { + if err := WriteBlock(db, block); err != nil { + t.Fatalf("failed to write block contents: %v", err) + } + if err := WriteTxLookupEntries(db, block); err != nil { t.Fatalf("failed to write transactions: %v", err) } for i, tx := range txs { @@ -325,72 +328,13 @@ func TestTransactionStorage(t *testing.T) { } // Delete the transactions and check purge for i, tx := range txs { - DeleteTransaction(db, tx.Hash()) + DeleteTxLookupEntry(db, tx.Hash()) if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil { t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn) } } } -// Tests that receipts can be stored and retrieved. -func TestReceiptStorage(t *testing.T) { - db, _ := ethdb.NewMemDatabase() - - receipt1 := &types.Receipt{ - PostState: []byte{0x01}, - CumulativeGasUsed: big.NewInt(1), - Logs: []*types.Log{ - {Address: common.BytesToAddress([]byte{0x11})}, - {Address: common.BytesToAddress([]byte{0x01, 0x11})}, - }, - TxHash: common.BytesToHash([]byte{0x11, 0x11}), - ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}), - GasUsed: big.NewInt(111111), - } - receipt2 := &types.Receipt{ - PostState: []byte{0x02}, - CumulativeGasUsed: big.NewInt(2), - Logs: []*types.Log{ - {Address: common.BytesToAddress([]byte{0x22})}, - {Address: common.BytesToAddress([]byte{0x02, 0x22})}, - }, - TxHash: common.BytesToHash([]byte{0x22, 0x22}), - ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}), - GasUsed: big.NewInt(222222), - } - receipts := []*types.Receipt{receipt1, receipt2} - - // Check that no receipt entries are in a pristine database - for i, receipt := range receipts { - if r := GetReceipt(db, receipt.TxHash); r != nil { - t.Fatalf("receipt #%d [%x]: non existent receipt returned: %v", i, receipt.TxHash, r) - } - } - // Insert all the receipts into the database, and verify contents - if err := WriteReceipts(db, receipts); err != nil { - t.Fatalf("failed to write receipts: %v", err) - } - for i, receipt := range receipts { - if r := GetReceipt(db, receipt.TxHash); r == nil { - t.Fatalf("receipt #%d [%x]: receipt not found", i, receipt.TxHash) - } else { - rlpHave, _ := rlp.EncodeToBytes(r) - rlpWant, _ := rlp.EncodeToBytes(receipt) - - if !bytes.Equal(rlpHave, rlpWant) { - t.Fatalf("receipt #%d [%x]: receipt mismatch: have %v, want %v", i, receipt.TxHash, r, receipt) - } - } - } - // Delete the receipts and check purge - for i, receipt := range receipts { - DeleteReceipt(db, receipt.TxHash) - if r := GetReceipt(db, receipt.TxHash); r != nil { - t.Fatalf("receipt #%d [%x]: deleted receipt returned: %v", i, receipt.TxHash, r) - } - } -} - // Tests that receipts associated with a single block can be stored and retrieved. func TestBlockReceiptStorage(t *testing.T) { db, _ := ethdb.NewMemDatabase() @@ -530,10 +474,6 @@ func TestMipmapChain(t *testing.T) { } // store the receipts - err := WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) - } WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { -- cgit v1.2.3