diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 129 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 11 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 44 | ||||
-rw-r--r-- | eth/handler.go | 52 | ||||
-rw-r--r-- | eth/peer.go | 7 |
5 files changed, 135 insertions, 108 deletions
diff --git a/eth/backend.go b/eth/backend.go index ad2a2c1f9..deb6d3d0f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "bytes" "crypto/ecdsa" "encoding/json" "fmt" @@ -73,6 +74,8 @@ var ( ) type Config struct { + DevMode bool + Name string NetworkId int GenesisNonce int @@ -267,11 +270,7 @@ func New(config *Config) (*Ethereum, error) { newdb = func(path string) (common.Database, error) { return ethdb.NewLDBDatabase(path, config.DatabaseCache) } } - // attempt to merge database together, upgrading from an old version - if err := mergeDatabases(config.DataDir, newdb); err != nil { - return nil, err - } - + // Open the chain database and perform any upgrades needed chainDb, err := newdb(filepath.Join(config.DataDir, "chaindata")) if err != nil { return nil, fmt.Errorf("blockchain db err: %v", err) @@ -279,6 +278,10 @@ func New(config *Config) (*Ethereum, error) { if db, ok := chainDb.(*ethdb.LDBDatabase); ok { db.Meter("eth/db/chaindata/") } + if err := upgradeChainDatabase(chainDb); err != nil { + return nil, err + } + dappDb, err := newdb(filepath.Join(config.DataDir, "dapp")) if err != nil { return nil, fmt.Errorf("dapp db err: %v", err) @@ -303,18 +306,23 @@ func New(config *Config) (*Ethereum, error) { glog.V(logger.Info).Infof("Successfully wrote genesis block. New genesis hash = %x\n", block.Hash()) } - if config.Olympic { + // different modes + switch { + case config.Olympic: + glog.V(logger.Error).Infoln("Starting Olympic network") + fallthrough + case config.DevMode: _, err := core.WriteTestNetGenesisBlock(chainDb, 42) if err != nil { return nil, err } - glog.V(logger.Error).Infoln("Starting Olympic network") } - // This is for testing only. if config.GenesisBlock != nil { + core.WriteTd(chainDb, config.GenesisBlock.Hash(), config.GenesisBlock.Difficulty()) core.WriteBlock(chainDb, config.GenesisBlock) - core.WriteHead(chainDb, config.GenesisBlock) + core.WriteCanonicalHash(chainDb, config.GenesisBlock.Hash(), config.GenesisBlock.NumberU64()) + core.WriteHeadBlockHash(chainDb, config.GenesisBlock.Hash()) } if !config.SkipBcVersionCheck { @@ -718,74 +726,61 @@ func saveBlockchainVersion(db common.Database, bcVersion int) { } } -// mergeDatabases when required merge old database layout to one single database -func mergeDatabases(datadir string, newdb func(path string) (common.Database, error)) error { - // Check if already upgraded - data := filepath.Join(datadir, "chaindata") - if _, err := os.Stat(data); !os.IsNotExist(err) { - return nil - } - // make sure it's not just a clean path - chainPath := filepath.Join(datadir, "blockchain") - if _, err := os.Stat(chainPath); os.IsNotExist(err) { +// upgradeChainDatabase ensures that the chain database stores block split into +// separate header and body entries. +func upgradeChainDatabase(db common.Database) error { + // Short circuit if the head block is stored already as separate header and body + data, err := db.Get([]byte("LastBlock")) + if err != nil { return nil } - glog.Infoln("Database upgrade required. Upgrading...") + head := common.BytesToHash(data) - database, err := newdb(data) - if err != nil { - return fmt.Errorf("creating data db err: %v", err) + if block := core.GetBlockByHashOld(db, head); block == nil { + return nil } - defer database.Close() + // At least some of the database is still the old format, upgrade (skip the head block!) + glog.V(logger.Info).Info("Old database detected, upgrading...") - // Migrate blocks - chainDb, err := newdb(chainPath) - if err != nil { - return fmt.Errorf("state db err: %v", err) - } - defer chainDb.Close() + if db, ok := db.(*ethdb.LDBDatabase); ok { + blockPrefix := []byte("block-hash-") + for it := db.NewIterator(); it.Next(); { + // Skip anything other than a combined block + if !bytes.HasPrefix(it.Key(), blockPrefix) { + continue + } + // Skip the head block (merge last to signal upgrade completion) + if bytes.HasSuffix(it.Key(), head.Bytes()) { + continue + } + // Load the block, split and serialize (order!) + block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix))) - if chain, ok := chainDb.(*ethdb.LDBDatabase); ok { - glog.Infoln("Merging blockchain database...") - it := chain.NewIterator() - for it.Next() { - database.Put(it.Key(), it.Value()) + if err := core.WriteTd(db, block.Hash(), block.DeprecatedTd()); err != nil { + return err + } + if err := core.WriteBody(db, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil { + return err + } + if err := core.WriteHeader(db, block.Header()); err != nil { + return err + } + if err := db.Delete(it.Key()); err != nil { + return err + } } - it.Release() - } - - // Migrate state - stateDb, err := newdb(filepath.Join(datadir, "state")) - if err != nil { - return fmt.Errorf("state db err: %v", err) - } - defer stateDb.Close() + // Lastly, upgrade the head block, disabling the upgrade mechanism + current := core.GetBlockByHashOld(db, head) - if state, ok := stateDb.(*ethdb.LDBDatabase); ok { - glog.Infoln("Merging state database...") - it := state.NewIterator() - for it.Next() { - database.Put(it.Key(), it.Value()) + if err := core.WriteTd(db, current.Hash(), current.DeprecatedTd()); err != nil { + return err } - it.Release() - } - - // Migrate transaction / receipts - extraDb, err := newdb(filepath.Join(datadir, "extra")) - if err != nil { - return fmt.Errorf("state db err: %v", err) - } - defer extraDb.Close() - - if extra, ok := extraDb.(*ethdb.LDBDatabase); ok { - glog.Infoln("Merging transaction database...") - - it := extra.NewIterator() - for it.Next() { - database.Put(it.Key(), it.Value()) + if err := core.WriteBody(db, current.Hash(), &types.Body{current.Transactions(), current.Uncles()}); err != nil { + return err + } + if err := core.WriteHeader(db, current.Header()); err != nil { + return err } - it.Release() } - return nil } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 73f95bf64..d28985b3e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -87,6 +87,9 @@ type blockRetrievalFn func(common.Hash) *types.Block // headRetrievalFn is a callback type for retrieving the head block from the local chain. type headRetrievalFn func() *types.Block +// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. +type tdRetrievalFn func(common.Hash) *big.Int + // chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) @@ -136,6 +139,7 @@ type Downloader struct { hasBlock hashCheckFn // Checks if a block is present in the chain getBlock blockRetrievalFn // Retrieves a block from the chain headBlock headRetrievalFn // Retrieves the head block from the chain + getTd tdRetrievalFn // Retrieves the TD of a block from the chain insertChain chainInsertFn // Injects a batch of blocks into the chain dropPeer peerDropFn // Drops a peer for misbehaving @@ -168,7 +172,7 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, getTd tdRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { return &Downloader{ mux: mux, queue: newQueue(), @@ -176,6 +180,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he hasBlock: hasBlock, getBlock: getBlock, headBlock: headBlock, + getTd: getTd, insertChain: insertChain, dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), @@ -582,7 +587,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { // L: Sync begins, and finds common ancestor at 11 // L: Request new hashes up from 11 (R's TD was higher, it must have something) // R: Nothing to give - if !gotHashes && td.Cmp(d.headBlock().Td) > 0 { + if !gotHashes && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { return errStallingPeer } return nil @@ -958,7 +963,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // L: Sync begins, and finds common ancestor at 11 // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give - if !gotHeaders && td.Cmp(d.headBlock().Td) > 0 { + if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { return errStallingPeer } return nil diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8d009b671..dbcf93607 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -93,21 +93,25 @@ func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 type downloadTester struct { downloader *Downloader - ownHashes []common.Hash // Hash chain belonging to the tester - ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester - peerHashes map[string][]common.Hash // Hash chain belonging to different test peers - peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers + ownHashes []common.Hash // Hash chain belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers + peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains } // newTester creates a new downloader test mocker. func newTester() *downloadTester { tester := &downloadTester{ - ownHashes: []common.Hash{genesis.Hash()}, - ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, - peerHashes: make(map[string][]common.Hash), - peerBlocks: make(map[string]map[common.Hash]*types.Block), + ownHashes: []common.Hash{genesis.Hash()}, + ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, + peerHashes: make(map[string][]common.Hash), + peerBlocks: make(map[string]map[common.Hash]*types.Block), + peerChainTds: make(map[string]map[common.Hash]*big.Int), } - tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.insertChain, tester.dropPeer) + tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.getTd, tester.insertChain, tester.dropPeer) return tester } @@ -119,8 +123,8 @@ func (dl *downloadTester) sync(id string, td *big.Int) error { // If no particular TD was requested, load from the peer's blockchain if td == nil { td = big.NewInt(1) - if block, ok := dl.peerBlocks[id][hash]; ok { - td = block.Td + if diff, ok := dl.peerChainTds[id][hash]; ok { + td = diff } } err := dl.downloader.synchronise(id, hash, td) @@ -152,6 +156,11 @@ func (dl *downloadTester) headBlock() *types.Block { return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) } +// getTd retrieves the block's total difficulty from the canonical chain. +func (dl *downloadTester) getTd(hash common.Hash) *big.Int { + return dl.ownChainTd[hash] +} + // insertChain injects a new batch of blocks into the simulated chain. func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { for i, block := range blocks { @@ -160,6 +169,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { } dl.ownHashes = append(dl.ownHashes, block.Hash()) dl.ownBlocks[block.Hash()] = block + dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()] } return len(blocks), nil } @@ -180,9 +190,16 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) copy(dl.peerHashes[id], hashes) + dl.peerBlocks[id] = make(map[common.Hash]*types.Block) - for hash, block := range blocks { - dl.peerBlocks[id][hash] = block + dl.peerChainTds[id] = make(map[common.Hash]*big.Int) + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + dl.peerBlocks[id][hash] = block + if parent, ok := dl.peerBlocks[id][block.ParentHash()]; ok { + dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][parent.Hash()]) + } + } } } return err @@ -192,6 +209,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha func (dl *downloadTester) dropPeer(id string) { delete(dl.peerHashes, id) delete(dl.peerBlocks, id) + delete(dl.peerChainTds, id) dl.downloader.UnregisterPeer(id) } diff --git a/eth/handler.go b/eth/handler.go index f22afecb7..4aef69043 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,8 +36,10 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -// This is the target maximum size of returned blocks, headers or node data. -const softResponseLimit = 2 * 1024 * 1024 +const ( + softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. + estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header +) func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) @@ -113,7 +115,7 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po } } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.InsertChain, manager.removePeer) + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.GetTd, manager.chainman.InsertChain, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent, true, false) @@ -345,33 +347,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - // Gather blocks until the fetch or network limits is reached + // Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize headers []*types.Header unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { - // Retrieve the next block satisfying the query - var origin *types.Block + // Retrieve the next header satisfying the query + var origin *types.Header if query.Origin.Hash != (common.Hash{}) { - origin = pm.chainman.GetBlock(query.Origin.Hash) + origin = pm.chainman.GetHeader(query.Origin.Hash) } else { - origin = pm.chainman.GetBlockByNumber(query.Origin.Number) + origin = pm.chainman.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } - headers = append(headers, origin.Header()) - bytes += origin.Size() + headers = append(headers, origin) + bytes += estHeaderRlpSize - // Advance to the next block of the query + // Advance to the next header of the query switch { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // Hash based traversal towards the genesis block for i := 0; i < int(query.Skip)+1; i++ { - if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil { - query.Origin.Hash = block.ParentHash() + if header := pm.chainman.GetHeader(query.Origin.Hash); header != nil { + query.Origin.Hash = header.ParentHash } else { unknown = true break @@ -379,9 +381,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case query.Origin.Hash != (common.Hash{}) && !query.Reverse: // Hash based traversal towards the leaf block - if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil { - if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { - query.Origin.Hash = block.Hash() + if header := pm.chainman.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil { + if pm.chainman.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { + query.Origin.Hash = header.Hash() } else { unknown = true } @@ -452,23 +454,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Gather blocks until the fetch or network limits is reached var ( hash common.Hash - bytes common.StorageSize - bodies []*blockBody + bytes int + bodies []rlp.RawValue ) for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { - //Retrieve the hash of the next block + // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Retrieve the requested block, stopping if enough was found - if block := pm.chainman.GetBlock(hash); block != nil { - bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) - bytes += block.Size() + // Retrieve the requested block body, stopping if enough was found + if data := pm.chainman.GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) } } - return p.SendBlockBodies(bodies) + return p.SendBlockBodiesRLP(bodies) case p.version >= eth63 && msg.Code == GetNodeDataMsg: // Decode the retrieval message @@ -643,7 +645,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.chainman.GetBlock(block.ParentHash()); parent != nil { - td = new(big.Int).Add(parent.Td, block.Difficulty()) + td = new(big.Int).Add(block.Difficulty(), pm.chainman.GetTd(block.ParentHash())) } else { glog.V(logger.Error).Infof("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4]) return diff --git a/eth/peer.go b/eth/peer.go index 8d7c48885..603b49b88 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" ) @@ -184,6 +185,12 @@ func (p *peer) SendBlockBodies(bodies []*blockBody) error { return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies)) } +// SendBlockBodiesRLP sends a batch of block contents to the remote peer from +// an already RLP encoded format. +func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { + return p2p.Send(p.rw, BlockBodiesMsg, bodies) +} + // SendNodeData sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *peer) SendNodeData(data [][]byte) error { |