aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-09-01 01:21:02 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-09-11 22:42:25 +0800
commit2b339cbbd8bb475d2195d54a71dcced700003430 (patch)
tree7520c67d5bb5c66fe2430394a72f83d052bbe2b7 /eth
parent4e075e401354b4ee068cf78b1f283763fe927245 (diff)
downloaddexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar.gz
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar.bz2
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar.lz
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar.xz
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.tar.zst
dexon-2b339cbbd8bb475d2195d54a71dcced700003430.zip
core, eth: split the db blocks into headers and bodies
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go108
-rw-r--r--eth/handler.go43
-rw-r--r--eth/peer.go6
-rw-r--r--eth/protocol.go16
4 files changed, 89 insertions, 84 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 639aaaaec..59f2ab01a 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -18,6 +18,7 @@
package eth
import (
+ "bytes"
"crypto/ecdsa"
"encoding/json"
"fmt"
@@ -269,11 +270,7 @@ func New(config *Config) (*Ethereum, error) {
newdb = func(path string) (common.Database, error) { return ethdb.NewLDBDatabase(path, config.DatabaseCache) }
}
- // attempt to merge database together, upgrading from an old version
- if err := mergeDatabases(config.DataDir, newdb); err != nil {
- return nil, err
- }
-
+ // Open the chain database and perform any upgrades needed
chainDb, err := newdb(filepath.Join(config.DataDir, "chaindata"))
if err != nil {
return nil, fmt.Errorf("blockchain db err: %v", err)
@@ -281,6 +278,10 @@ func New(config *Config) (*Ethereum, error) {
if db, ok := chainDb.(*ethdb.LDBDatabase); ok {
db.Meter("eth/db/chaindata/")
}
+ if err := upgradeChainDatabase(chainDb); err != nil {
+ return nil, err
+ }
+
dappDb, err := newdb(filepath.Join(config.DataDir, "dapp"))
if err != nil {
return nil, fmt.Errorf("dapp db err: %v", err)
@@ -721,74 +722,55 @@ func saveBlockchainVersion(db common.Database, bcVersion int) {
}
}
-// mergeDatabases when required merge old database layout to one single database
-func mergeDatabases(datadir string, newdb func(path string) (common.Database, error)) error {
- // Check if already upgraded
- data := filepath.Join(datadir, "chaindata")
- if _, err := os.Stat(data); !os.IsNotExist(err) {
- return nil
- }
- // make sure it's not just a clean path
- chainPath := filepath.Join(datadir, "blockchain")
- if _, err := os.Stat(chainPath); os.IsNotExist(err) {
+// upgradeChainDatabase ensures that the chain database stores block split into
+// separate header and body entries.
+func upgradeChainDatabase(db common.Database) error {
+ // Short circuit if the head block is stored already as separate header and body
+ data, err := db.Get([]byte("LastBlock"))
+ if err != nil {
return nil
}
- glog.Infoln("Database upgrade required. Upgrading...")
+ head := common.BytesToHash(data)
- database, err := newdb(data)
- if err != nil {
- return fmt.Errorf("creating data db err: %v", err)
+ if block := core.GetBlockByHashOld(db, head); block == nil {
+ return nil
}
- defer database.Close()
+ // At least some of the database is still the old format, upgrade (skip the head block!)
+ glog.V(logger.Info).Info("Old database detected, upgrading...")
- // Migrate blocks
- chainDb, err := newdb(chainPath)
- if err != nil {
- return fmt.Errorf("state db err: %v", err)
- }
- defer chainDb.Close()
+ if db, ok := db.(*ethdb.LDBDatabase); ok {
+ blockPrefix := []byte("block-hash-")
+ for it := db.NewIterator(); it.Next(); {
+ // Skip anything other than a combined block
+ if !bytes.HasPrefix(it.Key(), blockPrefix) {
+ continue
+ }
+ // Skip the head block (merge last to signal upgrade completion)
+ if bytes.HasSuffix(it.Key(), head.Bytes()) {
+ continue
+ }
+ // Load the block, split and serialize (order!)
+ block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix)))
- if chain, ok := chainDb.(*ethdb.LDBDatabase); ok {
- glog.Infoln("Merging blockchain database...")
- it := chain.NewIterator()
- for it.Next() {
- database.Put(it.Key(), it.Value())
+ if err := core.WriteBody(db, block); err != nil {
+ return err
+ }
+ if err := core.WriteHeader(db, block.Header()); err != nil {
+ return err
+ }
+ if err := db.Delete(it.Key()); err != nil {
+ return err
+ }
}
- it.Release()
- }
-
- // Migrate state
- stateDb, err := newdb(filepath.Join(datadir, "state"))
- if err != nil {
- return fmt.Errorf("state db err: %v", err)
- }
- defer stateDb.Close()
+ // Lastly, upgrade the head block, disabling the upgrade mechanism
+ current := core.GetBlockByHashOld(db, head)
- if state, ok := stateDb.(*ethdb.LDBDatabase); ok {
- glog.Infoln("Merging state database...")
- it := state.NewIterator()
- for it.Next() {
- database.Put(it.Key(), it.Value())
+ if err := core.WriteBody(db, current); err != nil {
+ return err
}
- it.Release()
- }
-
- // Migrate transaction / receipts
- extraDb, err := newdb(filepath.Join(datadir, "extra"))
- if err != nil {
- return fmt.Errorf("state db err: %v", err)
- }
- defer extraDb.Close()
-
- if extra, ok := extraDb.(*ethdb.LDBDatabase); ok {
- glog.Infoln("Merging transaction database...")
-
- it := extra.NewIterator()
- for it.Next() {
- database.Put(it.Key(), it.Value())
+ if err := core.WriteHeader(db, current.Header()); err != nil {
+ return err
}
- it.Release()
}
-
return nil
}
diff --git a/eth/handler.go b/eth/handler.go
index f22afecb7..95f4e8ce2 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -345,33 +345,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
- // Gather blocks until the fetch or network limits is reached
+ // Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
- // Retrieve the next block satisfying the query
- var origin *types.Block
+ // Retrieve the next header satisfying the query
+ var origin *types.Header
if query.Origin.Hash != (common.Hash{}) {
- origin = pm.chainman.GetBlock(query.Origin.Hash)
+ origin = pm.chainman.GetHeader(query.Origin.Hash)
} else {
- origin = pm.chainman.GetBlockByNumber(query.Origin.Number)
+ origin = pm.chainman.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
- headers = append(headers, origin.Header())
- bytes += origin.Size()
+ headers = append(headers, origin)
+ bytes += 500 // Approximate, should be good enough estimate
- // Advance to the next block of the query
+ // Advance to the next header of the query
switch {
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
// Hash based traversal towards the genesis block
for i := 0; i < int(query.Skip)+1; i++ {
- if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil {
- query.Origin.Hash = block.ParentHash()
+ if header := pm.chainman.GetHeader(query.Origin.Hash); header != nil {
+ query.Origin.Hash = header.ParentHash
} else {
unknown = true
break
@@ -379,9 +379,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
// Hash based traversal towards the leaf block
- if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil {
- if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
- query.Origin.Hash = block.Hash()
+ if header := pm.chainman.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
+ if pm.chainman.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
+ query.Origin.Hash = header.Hash()
} else {
unknown = true
}
@@ -452,23 +452,24 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
- bytes common.StorageSize
- bodies []*blockBody
+ bytes int
+ bodies []*blockBodyRLP
)
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
- //Retrieve the hash of the next block
+ // Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- // Retrieve the requested block, stopping if enough was found
- if block := pm.chainman.GetBlock(hash); block != nil {
- bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
- bytes += block.Size()
+ // Retrieve the requested block body, stopping if enough was found
+ if data := pm.chainman.GetBodyRLP(hash); len(data) != 0 {
+ body := blockBodyRLP(data)
+ bodies = append(bodies, &body)
+ bytes += len(body)
}
}
- return p.SendBlockBodies(bodies)
+ return p.SendBlockBodiesRLP(bodies)
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
// Decode the retrieval message
diff --git a/eth/peer.go b/eth/peer.go
index 8d7c48885..f1ddd9726 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -184,6 +184,12 @@ func (p *peer) SendBlockBodies(bodies []*blockBody) error {
return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
}
+// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
+// an already RLP encoded format.
+func (p *peer) SendBlockBodiesRLP(bodies []*blockBodyRLP) error {
+ return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesRLPData(bodies))
+}
+
// SendNodeData sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *peer) SendNodeData(data [][]byte) error {
diff --git a/eth/protocol.go b/eth/protocol.go
index 49f096a3b..24007bbb5 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -213,6 +213,22 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody
+// blockBodyRLP represents the RLP encoded data content of a single block.
+type blockBodyRLP []byte
+
+// EncodeRLP is a specialized encoder for a block body to pass the already
+// encoded body RLPs from the database on, without double encoding.
+func (b *blockBodyRLP) EncodeRLP(w io.Writer) error {
+ if _, err := w.Write([]byte(*b)); err != nil {
+ return err
+ }
+ return nil
+}
+
+// blockBodiesRLPData is the network packet for block content distribution
+// based on original RLP formatting (i.e. skip the db-decode/proto-encode).
+type blockBodiesRLPData []*blockBodyRLP
+
// nodeDataData is the network response packet for a node data retrieval.
type nodeDataData []struct {
Value []byte