aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-07-15 00:39:53 +0800
committerGitHub <noreply@github.com>2017-07-15 00:39:53 +0800
commit0ff35e170d1b913082313089d13e3e6d5490839b (patch)
tree42b8eafa61c6e5894768c41a97d51e4e5427b50f /eth
parent8d6a5a3581ce6221786eb464bfef7e8c31e7ad95 (diff)
downloaddexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.gz
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.bz2
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.lz
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.xz
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.zst
dexon-0ff35e170d1b913082313089d13e3e6d5490839b.zip
core: remove redundant storage of transactions and receipts (#14801)
* core: remove redundant storage of transactions and receipts * core, eth, internal: new transaction schema usage polishes * eth: implement upgrade mechanism for db deduplication * core, eth: drop old sequential key db upgrader * eth: close last iterator on successful db upgrage * core: prefix the lookup entries to make their purpose clearer
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go6
-rw-r--r--eth/backend_test.go9
-rw-r--r--eth/db_upgrade.go279
-rw-r--r--eth/filters/filter_test.go12
4 files changed, 84 insertions, 222 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 78478e86e..c7df517c0 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -59,8 +59,8 @@ type LesServer interface {
type Ethereum struct {
chainConfig *params.ChainConfig
// Channel for shutting down the service
- shutdownChan chan bool // Channel for shutting down the ethereum
- stopDbUpgrade func() // stop chain db sequential key upgrade
+ shutdownChan chan bool // Channel for shutting down the ethereum
+ stopDbUpgrade func() error // stop chain db sequential key upgrade
// Handlers
txPool *core.TxPool
blockchain *core.BlockChain
@@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
- stopDbUpgrade := upgradeSequentialKeys(chainDb)
+ stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
diff --git a/eth/backend_test.go b/eth/backend_test.go
index f60e3214c..4351b24cf 100644
--- a/eth/backend_test.go
+++ b/eth/backend_test.go
@@ -33,24 +33,15 @@ func TestMipmapUpgrade(t *testing.T) {
genesis := new(core.Genesis).MustCommit(db)
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr}}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 2:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr}}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
- }
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- t.Fatal(err)
}
})
for i, block := range chain {
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index 82cdd7e55..90111b2b3 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -19,237 +19,120 @@ package eth
import (
"bytes"
- "encoding/binary"
"fmt"
- "math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
-var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys")
+var deduplicateData = []byte("dbUpgrade_20170714deduplicateData")
-// upgradeSequentialKeys checks the chain database version and
+// upgradeDeduplicateData checks the chain database version and
// starts a background process to make upgrades if necessary.
// Returns a stop function that blocks until the process has
// been safely stopped.
-func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) {
- data, _ := db.Get(useSequentialKeys)
+func upgradeDeduplicateData(db ethdb.Database) func() error {
+ // If the database is already converted or empty, bail out
+ data, _ := db.Get(deduplicateData)
if len(data) > 0 && data[0] == 42 {
- return nil // already converted
+ return nil
}
-
if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 {
- db.Put(useSequentialKeys, []byte{42})
- return nil // empty database, nothing to do
+ db.Put(deduplicateData, []byte{42})
+ return nil
}
-
- log.Warn("Upgrading chain database to use sequential keys")
-
- stopChn := make(chan struct{})
- stoppedChn := make(chan struct{})
+ // Start the deduplication upgrade on a new goroutine
+ log.Warn("Upgrading database to use lookup entries")
+ stop := make(chan chan error)
go func() {
- stopFn := func() bool {
- select {
- case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved
- case <-stopChn:
- return true
- }
- return false
- }
-
- err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn)
- if err == nil && !stopped {
- err, stopped = upgradeSequentialBlocks(db, stopFn)
- }
- if err == nil && !stopped {
- err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn)
- }
- if err == nil && !stopped {
- log.Info("Database conversion successful")
- db.Put(useSequentialKeys, []byte{42})
- }
- if err != nil {
- log.Error("Database conversion failed", "err", err)
- }
- close(stoppedChn)
- }()
-
- return func() {
- close(stopChn)
- <-stoppedChn
- }
-}
-
-// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from
-// the database, writes them in new format and deletes the old ones if successful.
-func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("block-num-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer func() {
- it.Release()
- }()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- keyPtr := it.Key()
- if len(keyPtr) < 20 {
- cnt++
- if cnt%100000 == 0 {
+ // Create an iterator to read the entire database and covert old lookup entires
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ defer func() {
+ if it != nil {
it.Release()
- it = db.(*ethdb.LDBDatabase).NewIterator()
- it.Seek(keyPtr)
- log.Info("Converting canonical numbers", "count", cnt)
}
- number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64()
- newKey := []byte("h12345678n")
- binary.BigEndian.PutUint64(newKey[1:9], number)
- if err := db.Put(newKey, it.Value()); err != nil {
- return err, false
+ }()
+
+ var (
+ converted uint64
+ failed error
+ )
+ for failed == nil && it.Next() {
+ // Skip any entries that don't look like old transaction meta entires (<hash>0x01)
+ key := it.Key()
+ if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 {
+ continue
}
- if err := db.Delete(keyPtr); err != nil {
- return err, false
+ // Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>)
+ var meta struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
}
- }
-
- if stopFn() {
- return nil, true
- }
- it.Next()
- }
- if cnt > 0 {
- log.Info("converted canonical numbers", "count", cnt)
- }
- return nil, false
-}
-
-// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block
-// receipts from the database, writes them in new format and deletes the old ones
-// if successful.
-func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("block-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer func() {
- it.Release()
- }()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- keyPtr := it.Key()
- if len(keyPtr) >= 38 {
- cnt++
- if cnt%10000 == 0 {
- it.Release()
- it = db.(*ethdb.LDBDatabase).NewIterator()
- it.Seek(keyPtr)
- log.Info("Converting blocks", "count", cnt)
+ if err := rlp.DecodeBytes(it.Value(), &meta); err != nil {
+ continue
}
- // convert header, body, td and block receipts
- var keyPrefix [38]byte
- copy(keyPrefix[:], keyPtr[0:38])
- hash := keyPrefix[6:38]
- if err := upgradeSequentialBlockData(db, hash); err != nil {
- return err, false
- }
- // delete old db entries belonging to this hash
- for bytes.HasPrefix(it.Key(), keyPrefix[:]) {
- if err := db.Delete(it.Key()); err != nil {
- return err, false
+ // Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix))
+ hash := key[:common.HashLength]
+
+ if hash[0] == byte('l') {
+ // Potential clash, the "old" `hash` must point to a live transaction.
+ if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) {
+ continue
}
- it.Next()
}
- if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil {
- return err, false
+ // Convert the old metadata to a new lookup entry, delete duplicate data
+ if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry
+ if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data
+ if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data
+ if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata
+ break
+ }
+ }
+ }
}
- } else {
- it.Next()
- }
-
- if stopFn() {
- return nil, true
- }
- }
- if cnt > 0 {
- log.Info("Converted blocks", "count", cnt)
- }
- return nil, false
-}
+ // Bump the conversion counter, and recreate the iterator occasionally to
+ // avoid too high memory consumption.
+ converted++
+ if converted%100000 == 0 {
+ it.Release()
+ it = db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(key)
-// upgradeSequentialOrphanedReceipts removes any old format block receipts from the
-// database that did not have a corresponding block
-func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("receipts-block-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer it.Release()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- // phase 2 already converted receipts belonging to existing
- // blocks, just remove if there's anything left
- cnt++
- if err := db.Delete(it.Key()); err != nil {
- return err, false
+ log.Info("Deduplicating database entries", "deduped", converted)
+ }
+ // Check for termination, or continue after a bit of a timeout
+ select {
+ case errc := <-stop:
+ errc <- nil
+ return
+ case <-time.After(time.Microsecond * 100):
+ }
}
-
- if stopFn() {
- return nil, true
+ // Upgrade finished, mark a such and terminate
+ if failed == nil {
+ log.Info("Database deduplication successful", "deduped", converted)
+ db.Put(deduplicateData, []byte{42})
+ } else {
+ log.Error("Database deduplication failed", "deduped", converted, "err", failed)
}
- it.Next()
- }
- if cnt > 0 {
- log.Info("Removed orphaned block receipts", "count", cnt)
- }
- return nil, false
-}
+ it.Release()
+ it = nil
-// upgradeSequentialBlockData upgrades the header, body, td and block receipts
-// database entries belonging to a single hash (doesn't delete old data).
-func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error {
- // get old chain data and block number
- headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...))
- if len(headerRLP) == 0 {
- return nil
- }
- header := new(types.Header)
- if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil {
- return err
- }
- number := header.Number.Uint64()
- bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...))
- tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...))
- receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...))
- // store new hash -> number association
- encNum := make([]byte, 8)
- binary.BigEndian.PutUint64(encNum, number)
- if err := db.Put(append([]byte("H"), hash...), encNum); err != nil {
- return err
- }
- // store new chain data
- if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil {
- return err
- }
- if len(tdRLP) != 0 {
- if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil {
- return err
- }
- }
- if len(bodyRLP) != 0 {
- if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil {
- return err
- }
- }
- if len(receiptsRLP) != 0 {
- if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil {
- return err
- }
+ errc := <-stop
+ errc <- failed
+ }()
+ // Assembly the cancellation callback
+ return func() error {
+ errc := make(chan error)
+ stop <- errc
+ return <-errc
}
- return nil
}
func addMipmapBloomBins(db ethdb.Database) (err error) {
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index cd5e7cafd..b6cfd4bbc 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -82,12 +82,6 @@ func BenchmarkMipmaps(b *testing.B) {
gen.AddUncheckedReceipt(receipt)
}
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- b.Fatal(err)
- }
core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
@@ -183,12 +177,6 @@ func TestFilters(t *testing.T) {
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
}
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- t.Fatal(err)
- }
// i is used as block number for the writes but since the i
// starts at 0 and block 0 (genesis) is already present increment
// by one