diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api.go | 4 | ||||
-rw-r--r-- | eth/api_test.go | 2 | ||||
-rw-r--r-- | eth/api_tracer.go | 141 | ||||
-rw-r--r-- | eth/backend.go | 14 | ||||
-rw-r--r-- | eth/bind.go | 138 | ||||
-rw-r--r-- | eth/config.go | 14 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 356 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 207 | ||||
-rw-r--r-- | eth/downloader/queue.go | 169 | ||||
-rw-r--r-- | eth/downloader/statesync.go | 36 | ||||
-rw-r--r-- | eth/fetcher/fetcher_test.go | 2 | ||||
-rw-r--r-- | eth/filters/api.go | 7 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 11 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 29 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 10 | ||||
-rw-r--r-- | eth/gasprice/gasprice.go | 49 | ||||
-rw-r--r-- | eth/gen_config.go | 58 | ||||
-rw-r--r-- | eth/handler.go | 20 | ||||
-rw-r--r-- | eth/handler_test.go | 30 | ||||
-rw-r--r-- | eth/helper_test.go | 24 | ||||
-rw-r--r-- | eth/protocol_test.go | 20 | ||||
-rw-r--r-- | eth/sync.go | 15 | ||||
-rw-r--r-- | eth/sync_test.go | 4 | ||||
-rw-r--r-- | eth/tracers/internal/tracers/assets.go | 40 | ||||
-rw-r--r-- | eth/tracers/tracer.go | 2 | ||||
-rw-r--r-- | eth/tracers/tracer_test.go | 2 | ||||
-rw-r--r-- | eth/tracers/tracers_test.go | 4 |
27 files changed, 561 insertions, 847 deletions
diff --git a/eth/api.go b/eth/api.go index 0db3eb554..a345b57e4 100644 --- a/eth/api.go +++ b/eth/api.go @@ -462,11 +462,11 @@ func (api *PrivateDebugAPI) getModifiedAccounts(startBlock, endBlock *types.Bloc return nil, fmt.Errorf("start block height (%d) must be less than end block height (%d)", startBlock.Number().Uint64(), endBlock.Number().Uint64()) } - oldTrie, err := trie.NewSecure(startBlock.Root(), api.eth.chainDb, 0) + oldTrie, err := trie.NewSecure(startBlock.Root(), trie.NewDatabase(api.eth.chainDb), 0) if err != nil { return nil, err } - newTrie, err := trie.NewSecure(endBlock.Root(), api.eth.chainDb, 0) + newTrie, err := trie.NewSecure(endBlock.Root(), trie.NewDatabase(api.eth.chainDb), 0) if err != nil { return nil, err } diff --git a/eth/api_test.go b/eth/api_test.go index 248bc3ab6..900a82bb6 100644 --- a/eth/api_test.go +++ b/eth/api_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2017 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 0d0e2a73c..07c4457bc 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "runtime" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -34,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/tracers" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" @@ -72,6 +70,7 @@ type txTraceResult struct { type blockTraceTask struct { statedb *state.StateDB // Intermediate state prepped for tracing block *types.Block // Block to trace the transactions from + rootref common.Hash // Trie root reference held for this task results []*txTraceResult // Trace results procudes by the task } @@ -90,59 +89,6 @@ type txTraceTask struct { index int // Transaction offset in the block } -// ephemeralDatabase is a memory wrapper around a proper database, which acts as -// an ephemeral write layer. This construct is used by the chain tracer to write -// state tries for intermediate blocks without serializing to disk, but at the -// same time to allow disk fallback for reads that do no hit the memory layer. -type ephemeralDatabase struct { - diskdb ethdb.Database // Persistent disk database to fall back to with reads - memdb *ethdb.MemDatabase // Ephemeral memory database for primary reads and writes -} - -func (db *ephemeralDatabase) Put(key []byte, value []byte) error { return db.memdb.Put(key, value) } -func (db *ephemeralDatabase) Delete(key []byte) error { return errors.New("delete not supported") } -func (db *ephemeralDatabase) Close() { db.memdb.Close() } -func (db *ephemeralDatabase) NewBatch() ethdb.Batch { - return db.memdb.NewBatch() -} -func (db *ephemeralDatabase) Has(key []byte) (bool, error) { - if has, _ := db.memdb.Has(key); has { - return has, nil - } - return db.diskdb.Has(key) -} -func (db *ephemeralDatabase) Get(key []byte) ([]byte, error) { - if blob, _ := db.memdb.Get(key); blob != nil { - return blob, nil - } - return db.diskdb.Get(key) -} - -// Prune does a state sync into a new memory write layer and replaces the old one. -// This allows us to discard entries that are no longer referenced from the current -// state. -func (db *ephemeralDatabase) Prune(root common.Hash) { - // Pull the still relevant state data into memory - sync := state.NewStateSync(root, db.diskdb) - for sync.Pending() > 0 { - hash := sync.Missing(1)[0] - - // Move the next trie node from the memory layer into a sync struct - node, err := db.memdb.Get(hash[:]) - if err != nil { - panic(err) // memdb must have the data - } - if _, _, err := sync.Process([]trie.SyncResult{{Hash: hash, Data: node}}); err != nil { - panic(err) // it's not possible to fail processing a node - } - } - // Discard the old memory layer and write a new one - db.memdb, _ = ethdb.NewMemDatabaseWithCap(db.memdb.Len()) - if _, err := sync.Commit(db); err != nil { - panic(err) // writing into a memdb cannot fail - } -} - // TraceChain returns the structured logs created during the execution of EVM // between two blocks (excluding start) and returns them as a JSON object. func (api *PrivateDebugAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber, config *TraceConfig) (*rpc.Subscription, error) { @@ -188,23 +134,19 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl // Ensure we have a valid starting state before doing any work origin := start.NumberU64() + database := state.NewDatabase(api.eth.ChainDb()) - memdb, _ := ethdb.NewMemDatabase() - db := &ephemeralDatabase{ - diskdb: api.eth.ChainDb(), - memdb: memdb, - } if number := start.NumberU64(); number > 0 { start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1) if start == nil { return nil, fmt.Errorf("parent block #%d not found", number-1) } } - statedb, err := state.New(start.Root(), state.NewDatabase(db)) + statedb, err := state.New(start.Root(), database) if err != nil { // If the starting state is missing, allow some number of blocks to be reexecuted reexec := defaultTraceReexec - if config.Reexec != nil { + if config != nil && config.Reexec != nil { reexec = *config.Reexec } // Find the most recent block that has the state available @@ -213,7 +155,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl if start == nil { break } - if statedb, err = state.New(start.Root(), state.NewDatabase(db)); err == nil { + if statedb, err = state.New(start.Root(), database); err == nil { break } } @@ -256,7 +198,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl res, err := api.traceTx(ctx, msg, vmctx, task.statedb, config) if err != nil { task.results[i] = &txTraceResult{Error: err.Error()} - log.Warn("Tracing failed", "err", err) + log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err) break } task.statedb.DeleteSuicides() @@ -273,7 +215,6 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl } // Start a goroutine to feed all the blocks into the tracers begin := time.Now() - complete := start.NumberU64() go func() { var ( @@ -281,6 +222,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl number uint64 traced uint64 failed error + proot common.Hash ) // Ensure everything is properly cleaned up on any exit path defer func() { @@ -308,7 +250,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl // Print progress logs if long enough time elapsed if time.Since(logged) > 8*time.Second { if number > origin { - log.Info("Tracing chain segment", "start", origin, "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin)) + log.Info("Tracing chain segment", "start", origin, "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin), "memory", database.TrieDB().Size()) } else { log.Info("Preparing state for chain trace", "block", number, "start", origin, "elapsed", time.Since(begin)) } @@ -325,13 +267,11 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl txs := block.Transactions() select { - case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: block, results: make([]*txTraceResult, len(txs))}: + case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: block, rootref: proot, results: make([]*txTraceResult, len(txs))}: case <-notifier.Closed(): return } traced += uint64(len(txs)) - } else { - atomic.StoreUint64(&complete, number) } // Generate the next state snapshot fast without tracing _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) @@ -340,7 +280,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl break } // Finalize the state so any modifications are written to the trie - root, err := statedb.CommitTo(db, true) + root, err := statedb.Commit(true) if err != nil { failed = err break @@ -349,26 +289,14 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl failed = err break } - // After every N blocks, prune the database to only retain relevant data - if (number-start.NumberU64())%4096 == 0 { - // Wait until currently pending trace jobs finish - for atomic.LoadUint64(&complete) != number { - select { - case <-time.After(100 * time.Millisecond): - case <-notifier.Closed(): - return - } - } - // No more concurrent access at this point, prune the database - var ( - nodes = db.memdb.Len() - start = time.Now() - ) - db.Prune(root) - log.Info("Pruned tracer state entries", "deleted", nodes-db.memdb.Len(), "left", db.memdb.Len(), "elapsed", time.Since(start)) - - statedb, _ = state.New(root, state.NewDatabase(db)) + // Reference the trie twice, once for us, once for the trancer + database.TrieDB().Reference(root, common.Hash{}) + if number >= origin { + database.TrieDB().Reference(root, common.Hash{}) } + // Dereference all past tries we ourselves are done working with + database.TrieDB().Dereference(proot, common.Hash{}) + proot = root } }() @@ -387,12 +315,14 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl } done[uint64(result.Block)] = result + // Dereference any paret tries held in memory by this task + database.TrieDB().Dereference(res.rootref, common.Hash{}) + // Stream completed traces to the user, aborting on the first error for result, ok := done[next]; ok; result, ok = done[next] { if len(result.Traces) > 0 || next == end.NumberU64() { notifier.Notify(sub.ID, result) } - atomic.StoreUint64(&complete, next) delete(done, next) next++ } @@ -465,7 +395,7 @@ func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block, return nil, fmt.Errorf("parent %x not found", block.ParentHash()) } reexec := defaultTraceReexec - if config.Reexec != nil { + if config != nil && config.Reexec != nil { reexec = *config.Reexec } statedb, err := api.computeStateDB(parent, reexec) @@ -544,18 +474,14 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* } // Otherwise try to reexec blocks until we find a state or reach our limit origin := block.NumberU64() + database := state.NewDatabase(api.eth.ChainDb()) - memdb, _ := ethdb.NewMemDatabase() - db := &ephemeralDatabase{ - diskdb: api.eth.ChainDb(), - memdb: memdb, - } for i := uint64(0); i < reexec; i++ { block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if block == nil { break } - if statedb, err = state.New(block.Root(), state.NewDatabase(db)); err == nil { + if statedb, err = state.New(block.Root(), database); err == nil { break } } @@ -571,6 +497,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* var ( start = time.Now() logged time.Time + proot common.Hash ) for block.NumberU64() < origin { // Print progress logs if long enough time elapsed @@ -587,26 +514,18 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* return nil, err } // Finalize the state so any modifications are written to the trie - root, err := statedb.CommitTo(db, true) + root, err := statedb.Commit(true) if err != nil { return nil, err } if err := statedb.Reset(root); err != nil { return nil, err } - // After every N blocks, prune the database to only retain relevant data - if block.NumberU64()%4096 == 0 || block.NumberU64() == origin { - var ( - nodes = db.memdb.Len() - begin = time.Now() - ) - db.Prune(root) - log.Info("Pruned tracer state entries", "deleted", nodes-db.memdb.Len(), "left", db.memdb.Len(), "elapsed", time.Since(begin)) - - statedb, _ = state.New(root, state.NewDatabase(db)) - } + database.TrieDB().Reference(root, common.Hash{}) + database.TrieDB().Dereference(proot, common.Hash{}) + proot = root } - log.Info("Historical state regenerated", "block", block.NumberU64(), "elapsed", time.Since(start)) + log.Info("Historical state regenerated", "block", block.NumberU64(), "elapsed", time.Since(start), "size", database.TrieDB().Size()) return statedb, nil } @@ -619,7 +538,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, hash common.Ha return nil, fmt.Errorf("transaction %x not found", hash) } reexec := defaultTraceReexec - if config.Reexec != nil { + if config != nil && config.Reexec != nil { reexec = *config.Reexec } msg, vmctx, statedb, err := api.computeTxEnv(blockHash, int(index), reexec) diff --git a/eth/backend.go b/eth/backend.go index c39974a2c..94aad2310 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -144,9 +144,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } core.WriteBlockChainVersion(chainDb, core.BlockChainVersion) } - - vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) + var ( + vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} + cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout} + ) + eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -393,10 +395,10 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { // Figure out a max peers count based on the server limits maxPeers := srvr.MaxPeers if s.config.LightServ > 0 { - maxPeers -= s.config.LightPeers - if maxPeers < srvr.MaxPeers/2 { - maxPeers = srvr.MaxPeers / 2 + if s.config.LightPeers >= srvr.MaxPeers { + return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) } + maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested s.protocolManager.Start(maxPeers) diff --git a/eth/bind.go b/eth/bind.go deleted file mode 100644 index d09977dbc..000000000 --- a/eth/bind.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package eth - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/internal/ethapi" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" -) - -// ContractBackend implements bind.ContractBackend with direct calls to Ethereum -// internals to support operating on contracts within subprotocols like eth and -// swarm. -// -// Internally this backend uses the already exposed API endpoints of the Ethereum -// object. These should be rewritten to internal Go method calls when the Go API -// is refactored to support a clean library use. -type ContractBackend struct { - eapi *ethapi.PublicEthereumAPI // Wrapper around the Ethereum object to access metadata - bcapi *ethapi.PublicBlockChainAPI // Wrapper around the blockchain to access chain data - txapi *ethapi.PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data -} - -// NewContractBackend creates a new native contract backend using an existing -// Ethereum object. -func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend { - return &ContractBackend{ - eapi: ethapi.NewPublicEthereumAPI(apiBackend), - bcapi: ethapi.NewPublicBlockChainAPI(apiBackend), - txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend, new(ethapi.AddrLocker)), - } -} - -// CodeAt retrieves any code associated with the contract from the local API. -func (b *ContractBackend) CodeAt(ctx context.Context, contract common.Address, blockNum *big.Int) ([]byte, error) { - return b.bcapi.GetCode(ctx, contract, toBlockNumber(blockNum)) -} - -// CodeAt retrieves any code associated with the contract from the local API. -func (b *ContractBackend) PendingCodeAt(ctx context.Context, contract common.Address) ([]byte, error) { - return b.bcapi.GetCode(ctx, contract, rpc.PendingBlockNumber) -} - -// ContractCall implements bind.ContractCaller executing an Ethereum contract -// call with the specified data as the input. The pending flag requests execution -// against the pending block, not the stable head of the chain. -func (b *ContractBackend) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNum *big.Int) ([]byte, error) { - out, err := b.bcapi.Call(ctx, toCallArgs(msg), toBlockNumber(blockNum)) - return out, err -} - -// ContractCall implements bind.ContractCaller executing an Ethereum contract -// call with the specified data as the input. The pending flag requests execution -// against the pending block, not the stable head of the chain. -func (b *ContractBackend) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { - out, err := b.bcapi.Call(ctx, toCallArgs(msg), rpc.PendingBlockNumber) - return out, err -} - -func toCallArgs(msg ethereum.CallMsg) ethapi.CallArgs { - args := ethapi.CallArgs{ - To: msg.To, - From: msg.From, - Data: msg.Data, - } - if msg.Gas != nil { - args.Gas = hexutil.Big(*msg.Gas) - } - if msg.GasPrice != nil { - args.GasPrice = hexutil.Big(*msg.GasPrice) - } - if msg.Value != nil { - args.Value = hexutil.Big(*msg.Value) - } - return args -} - -func toBlockNumber(num *big.Int) rpc.BlockNumber { - if num == nil { - return rpc.LatestBlockNumber - } - return rpc.BlockNumber(num.Int64()) -} - -// PendingAccountNonce implements bind.ContractTransactor retrieving the current -// pending nonce associated with an account. -func (b *ContractBackend) PendingNonceAt(ctx context.Context, account common.Address) (nonce uint64, err error) { - out, err := b.txapi.GetTransactionCount(ctx, account, rpc.PendingBlockNumber) - if out != nil { - nonce = uint64(*out) - } - return nonce, err -} - -// SuggestGasPrice implements bind.ContractTransactor retrieving the currently -// suggested gas price to allow a timely execution of a transaction. -func (b *ContractBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) { - return b.eapi.GasPrice(ctx) -} - -// EstimateGasLimit implements bind.ContractTransactor triing to estimate the gas -// needed to execute a specific transaction based on the current pending state of -// the backend blockchain. There is no guarantee that this is the true gas limit -// requirement as other transactions may be added or removed by miners, but it -// should provide a basis for setting a reasonable default. -func (b *ContractBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (*big.Int, error) { - out, err := b.bcapi.EstimateGas(ctx, toCallArgs(msg)) - return out.ToInt(), err -} - -// SendTransaction implements bind.ContractTransactor injects the transaction -// into the pending pool for execution. -func (b *ContractBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error { - raw, _ := rlp.EncodeToBytes(tx) - _, err := b.txapi.SendRawTransaction(ctx, raw) - return err -} diff --git a/eth/config.go b/eth/config.go index 383cd6783..dd7f42c7d 100644 --- a/eth/config.go +++ b/eth/config.go @@ -22,6 +22,7 @@ import ( "os/user" "path/filepath" "runtime" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -43,14 +44,16 @@ var DefaultConfig = Config{ DatasetsOnDisk: 2, }, NetworkId: 1, - LightPeers: 20, - DatabaseCache: 128, + LightPeers: 100, + DatabaseCache: 768, + TrieCache: 256, + TrieTimeout: 5 * time.Minute, GasPrice: big.NewInt(18 * params.Shannon), TxPool: core.DefaultTxPoolConfig, GPO: gasprice.Config{ - Blocks: 10, - Percentile: 50, + Blocks: 20, + Percentile: 60, }, } @@ -78,6 +81,7 @@ type Config struct { // Protocol options NetworkId uint64 // Network ID to use for selecting peers to connect to SyncMode downloader.SyncMode + NoPruning bool // Light client options LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests @@ -87,6 +91,8 @@ type Config struct { SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int + TrieCache int + TrieTimeout time.Duration // Mining-related options Etherbase common.Address `toml:",omitempty"` diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b338129e0..7ede530a9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -18,10 +18,8 @@ package downloader import ( - "crypto/rand" "errors" "fmt" - "math" "math/big" "sync" "sync/atomic" @@ -61,12 +59,11 @@ var ( maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync - fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing + fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -102,9 +99,6 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database - fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) - fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section - rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -124,6 +118,7 @@ type Downloader struct { synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 notified int32 + committed int32 // Channels headerCh chan dataPack // [eth/62] Channel receiving inbound block headers @@ -156,7 +151,7 @@ type Downloader struct { // LightChain encapsulates functions required to synchronise a light chain. type LightChain interface { // HasHeader verifies a header's presence in the local chain. - HasHeader(h common.Hash, number uint64) bool + HasHeader(common.Hash, uint64) bool // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header @@ -164,8 +159,8 @@ type LightChain interface { // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header - // GetTdByHash returns the total difficulty of a local block. - GetTdByHash(common.Hash) *big.Int + // GetTd returns the total difficulty of a local block. + GetTd(common.Hash, uint64) *big.Int // InsertHeaderChain inserts a batch of headers into the local chain. InsertHeaderChain([]*types.Header, int) (int, error) @@ -178,8 +173,8 @@ type LightChain interface { type BlockChain interface { LightChain - // HasBlockAndState verifies block and associated states' presence in the local chain. - HasBlockAndState(common.Hash) bool + // HasBlock verifies a block's presence in the local chain. + HasBlock(common.Hash, uint64) bool // GetBlockByHash retrieves a block from the local chain. GetBlockByHash(common.Hash) *types.Block @@ -271,7 +266,6 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { - logger := log.New("peer", id) logger.Trace("Registering sync peer") if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { @@ -324,8 +318,13 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode errEmptyHeaderSet, errPeersUnavailable, errTooOld, errInvalidAncestor, errInvalidChain: log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) - d.dropPeer(id) - + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) + } else { + d.dropPeer(id) + } default: log.Warn("Synchronisation failed, retrying", "err", err) } @@ -386,9 +385,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials { - d.mode = FullSync - } + // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) if p == nil { @@ -436,57 +433,40 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.syncStatsChainHeight = height d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent header and content retrieval algorithm + // Ensure our origin point is below any fast sync pivot point pivot := uint64(0) - switch d.mode { - case LightSync: - pivot = height - case FastSync: - // Calculate the new fast/slow sync pivot point - if d.fsPivotLock == nil { - pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) - if err != nil { - panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) - } - if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() - } + if d.mode == FastSync { + if height <= uint64(fsMinFullBlocks) { + origin = 0 } else { - // Pivot point locked in, use this and do not pick a new one! - pivot = d.fsPivotLock.Number.Uint64() - } - // If the point is below the origin, move origin back to ensure state download - if pivot < origin { - if pivot > 0 { + pivot = height - uint64(fsMinFullBlocks) + if pivot <= origin { origin = pivot - 1 - } else { - origin = 0 } } - log.Debug("Fast syncing until pivot block", "pivot", pivot) } - d.queue.Prepare(origin+1, d.mode, pivot, latest) + d.committed = 1 + if d.mode == FastSync && pivot != 0 { + d.committed = 0 + } + // Initiate the sync using a concurrent header and content retrieval algorithm + d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.processHeaders(origin+1, td) }, + func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, pivot, td) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } - err = d.spawnSync(fetchers) - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - // If sync failed in the critical section, bump the fail counter. - atomic.AddUint32(&d.fsPivotFails, 1) - } - return err + return d.spawnSync(fetchers) } // spawnSync runs d.process and all given fetcher functions to completion in @@ -602,7 +582,6 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err // Figure out the valid ancestor range to prevent rewrite attacks floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() - p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { ceil = d.blockchain.CurrentBlock().NumberU64() } else if d.mode == FastSync { @@ -611,6 +590,8 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err if ceil >= MaxForkAncestry { floor = int64(ceil - MaxForkAncestry) } + p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) + // Request the topmost blocks to short circuit binary ancestor lookup head := ceil if head > height { @@ -666,7 +647,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { + if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() // If every header is known, even future ones, the peer straight out lied about its head @@ -731,7 +712,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { + if (d.mode == FullSync && !d.blockchain.HasBlock(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { end = check break } @@ -769,7 +750,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") @@ -820,6 +801,18 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { + // Don't abort header fetches while the pivot is downloading + if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { + p.log.Debug("No headers, waiting for pivot commit") + select { + case <-time.After(fsHeaderContCheck): + getHeaders(from) + continue + case <-d.cancelCh: + return errCancelHeaderFetch + } + } + // Pivot done (or not in fast sync) and no more headers, terminate the process p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: @@ -853,6 +846,12 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { getHeaders(from) case <-timeout.C: + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id) + break + } // Header retrieval timed out, consider the peer bad and drop p.log.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) @@ -1071,7 +1070,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv setIdle(peer, 0) } else { peer.log.Debug("Stalling delivery, dropping", "type", kind) - d.dropPeer(pid) + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) + } else { + d.dropPeer(pid) + } } } } @@ -1112,10 +1117,8 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if request.From > 0 { peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From) - } else if len(request.Headers) > 0 { - peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) } else { - peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes)) + peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { @@ -1143,10 +1146,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { - // Calculate the pivoting point for switching from fast to slow sync - pivot := d.queue.FastSyncPivot() - +func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back rollback := []*types.Header{} defer func() { @@ -1171,19 +1171,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) - - // If we're already past the pivot point, this could be an attack, thread carefully - if rollback[len(rollback)-1].Number.Uint64() > pivot { - // If we didn't ever fail, lock in the pivot header (must! not! change!) - if atomic.LoadUint32(&d.fsPivotFails) == 0 { - for _, header := range rollback { - if header.Number.Uint64() == pivot { - log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash()) - d.fsPivotLock = header - } - } - } - } } }() @@ -1218,7 +1205,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give if d.mode != LightSync { - if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 { + head := d.blockchain.CurrentBlock() + if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer } } @@ -1230,7 +1218,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // queued for processing when the header download completes. However, as long as the // peer gave us something useful, we're already happy/progressed (above check). if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 { + head := d.lightchain.CurrentHeader() + if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer } } @@ -1283,13 +1272,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } - // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in - if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { - if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { - log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash()) - return errInvalidChain - } - } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit @@ -1324,7 +1306,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // processFullSyncContent takes fetch results from the queue and imports them into the chain. func (d *Downloader) processFullSyncContent() error { for { - results := d.queue.WaitResults() + results := d.queue.Results(true) if len(results) == 0 { return nil } @@ -1338,30 +1320,28 @@ func (d *Downloader) processFullSyncContent() error { } func (d *Downloader) importBlockResults(results []*fetchResult) error { - for len(results) != 0 { - // Check for any termination requests. This makes clean shutdown faster. - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - first, last := results[0].Header, results[items-1].Header - log.Debug("Inserting downloaded chain", "items", len(results), - "firstnum", first.Number, "firsthash", first.Hash(), - "lastnum", last.Number, "lasthash", last.Hash(), - ) - blocks := make([]*types.Block, items) - for i, result := range results[:items] { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - } - if index, err := d.blockchain.InsertChain(blocks); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain - } - // Shift the results to the next batch - results = results[items:] + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting downloaded chain", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnum", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.blockchain.InsertChain(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } return nil } @@ -1369,35 +1349,92 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // processFastSyncContent takes fetch results from the queue and writes them to the // database. It also controls the synchronisation of state nodes of the pivot block. func (d *Downloader) processFastSyncContent(latest *types.Header) error { - // Start syncing state of the reported head block. - // This should get us most of the state of the pivot block. + // Start syncing state of the reported head block. This should get us most of + // the state of the pivot block. stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() - - pivot := d.queue.FastSyncPivot() + // Figure out the ideal pivot block. Note, that this goalpost may move if the + // sync takes long enough for the chain head to move significantly. + pivot := uint64(0) + if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { + pivot = height - uint64(fsMinFullBlocks) + } + // To cater for moving pivot points, track the pivot block and subsequently + // accumulated download results separatey. + var ( + oldPivot *fetchResult // Locked in pivot block, might change eventually + oldTail []*fetchResult // Downloaded content after the pivot + ) for { - results := d.queue.WaitResults() + // Wait for the next batch of downloaded data to be available, and if the pivot + // block became stale, move the goalpost + results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness if len(results) == 0 { - return stateSync.Cancel() + // If pivot sync is done, stop + if oldPivot == nil { + return stateSync.Cancel() + } + // If sync failed, stop + select { + case <-d.cancelCh: + return stateSync.Cancel() + default: + } } if d.chainInsertHook != nil { d.chainInsertHook(results) } + if oldPivot != nil { + results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) + } + // Split around the pivot block and process the two sides via fast/full sync + if atomic.LoadInt32(&d.committed) == 0 { + latest = results[len(results)-1].Header + if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { + log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) + pivot = height - uint64(fsMinFullBlocks) + } + } P, beforeP, afterP := splitAroundPivot(pivot, results) if err := d.commitFastSyncData(beforeP, stateSync); err != nil { return err } if P != nil { - stateSync.Cancel() - if err := d.commitPivotBlock(P); err != nil { - return err + // If new pivot block found, cancel old state retrieval and restart + if oldPivot != P { + stateSync.Cancel() + + stateSync = d.syncState(P.Header.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + d.queue.Close() // wake up WaitResults + } + }() + oldPivot = P + } + // Wait for completion, occasionally checking for pivot staleness + select { + case <-stateSync.done: + if stateSync.err != nil { + return stateSync.err + } + if err := d.commitPivotBlock(P); err != nil { + return err + } + oldPivot = nil + + case <-time.After(time.Second): + oldTail = afterP + continue } } + // Fast sync done, pivot commit done, full import if err := d.importBlockResults(afterP); err != nil { return err } @@ -1420,52 +1457,49 @@ func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, bef } func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { - for len(results) != 0 { - // Check for any termination requests. - select { - case <-d.quitCh: - return errCancelContentProcessing - case <-stateSync.done: - if err := stateSync.Wait(); err != nil { - return err - } - default: - } - // Retrieve the a batch of results to import - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - first, last := results[0].Header, results[items-1].Header - log.Debug("Inserting fast-sync blocks", "items", len(results), - "firstnum", first.Number, "firsthash", first.Hash(), - "lastnumn", last.Number, "lasthash", last.Hash(), - ) - blocks := make([]*types.Block, items) - receipts := make([]types.Receipts, items) - for i, result := range results[:items] { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - receipts[i] = result.Receipts - } - if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + receipts := make([]types.Receipts, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } return nil } func (d *Downloader) commitPivotBlock(result *fetchResult) error { - b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - // Sync the pivot block state. This should complete reasonably quickly because - // we've already synced up to the reported head block state earlier. - if err := d.syncState(b.Root()).Wait(); err != nil { + block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { return err } - log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) - if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { return err } - return d.blockchain.FastSyncCommitHead(b.Hash()) + atomic.StoreInt32(&d.committed, 1) + return nil } // DeliverHeaders injects a new batch of block headers received from a remote diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e14264944..cb671a7df 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -45,8 +44,8 @@ var ( // Reduce some of the parameters to make the tester faster. func init() { MaxForkAncestry = uint64(10000) - blockCacheLimit = 1024 - fsCriticalTrials = 10 + blockCacheItems = 1024 + fsHeaderContCheck = 500 * time.Millisecond } // downloadTester is a test simulator for mocking out local block chain. @@ -118,7 +117,7 @@ func (dl *downloadTester) makeChain(n int, seed byte, parent *types.Block, paren // If the block number is multiple of 3, send a bonus transaction to the miner if parent == dl.genesis && i%3 == 0 { signer := types.MakeSigner(params.TestChainConfig, block.Number()) - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), new(big.Int).SetUint64(params.TxGas), nil, nil), signer, testKey) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) if err != nil { panic(err) } @@ -222,14 +221,9 @@ func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool { return dl.GetHeaderByHash(hash) != nil } -// HasBlockAndState checks if a block and associated state is present in the testers canonical chain. -func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool { - block := dl.GetBlockByHash(hash) - if block == nil { - return false - } - _, err := dl.stateDb.Get(block.Root().Bytes()) - return err == nil +// HasBlock checks if a block is present in the testers canonical chain. +func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool { + return dl.GetBlockByHash(hash) != nil } // GetHeader retrieves a header from the testers canonical chain. @@ -293,14 +287,14 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block { func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { // For now only check that the state trie is correct if block := dl.GetBlockByHash(hash); block != nil { - _, err := trie.NewSecure(block.Root(), dl.stateDb, 0) + _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0) return err } return fmt.Errorf("non existent block: %x", hash[:4]) } -// GetTdByHash retrieves the block's total difficulty from the canonical chain. -func (dl *downloadTester) GetTdByHash(hash common.Hash) *big.Int { +// GetTd retrieves the block's total difficulty from the canonical chain. +func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { dl.lock.RLock() defer dl.lock.RUnlock() @@ -619,28 +613,22 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { // number of items of the various chain components. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { // Initialize the counters for the first fork - headers, blocks := lengths[0], lengths[0] + headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks - minReceipts, maxReceipts := lengths[0]-fsMinFullBlocks-fsPivotInterval, lengths[0]-fsMinFullBlocks - if minReceipts < 0 { - minReceipts = 1 - } - if maxReceipts < 0 { - maxReceipts = 1 + if receipts < 0 { + receipts = 1 } // Update the counters for each subsequent fork for _, length := range lengths[1:] { headers += length - common blocks += length - common - - minReceipts += length - common - fsMinFullBlocks - fsPivotInterval - maxReceipts += length - common - fsMinFullBlocks + receipts += length - common - fsMinFullBlocks } switch tester.downloader.mode { case FullSync: - minReceipts, maxReceipts = 1, 1 + receipts = 1 case LightSync: - blocks, minReceipts, maxReceipts = 1, 1, 1 + blocks, receipts = 1, 1 } if hs := len(tester.ownHeaders); hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) @@ -648,11 +636,12 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng if bs := len(tester.ownBlocks); bs != blocks { t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) } - if rs := len(tester.ownReceipts); rs < minReceipts || rs > maxReceipts { - t.Fatalf("synchronised receipts mismatch: have %v, want between [%v, %v]", rs, minReceipts, maxReceipts) + if rs := len(tester.ownReceipts); rs != receipts { + t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } // Verify the state trie too for fast syncs - if tester.downloader.mode == FastSync { + /*if tester.downloader.mode == FastSync { + pivot := uint64(0) var index int if pivot := int(tester.downloader.queue.fastSyncPivot); pivot < common { index = pivot @@ -660,11 +649,11 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot) } if index > 0 { - if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(tester.stateDb)); statedb == nil || err != nil { + if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(trie.NewDatabase(tester.stateDb))); statedb == nil || err != nil { t.Fatalf("state reconstruction failed: %v", err) } } - } + }*/ } // Tests that simple synchronization against a canonical chain works correctly. @@ -684,7 +673,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -710,7 +699,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a long block chain to download and the tester - targetBlocks := 8 * blockCacheLimit + targetBlocks := 8 * blockCacheItems hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -745,9 +734,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { - cached = receipts - } + //if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { + cached = receipts + //} } } frozen = int(atomic.LoadUint32(&blocked)) @@ -755,7 +744,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.downloader.queue.lock.Unlock() tester.lock.Unlock() - if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { + if cached == blockCacheItems || retrieved+cached+frozen == targetBlocks+1 { break } } @@ -765,8 +754,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() - if cached != blockCacheLimit && retrieved+cached+frozen != targetBlocks+1 { - t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheLimit, retrieved, frozen, targetBlocks+1) + if cached != blockCacheItems && retrieved+cached+frozen != targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import if atomic.LoadUint32(&blocked) > 0 { @@ -974,7 +963,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 if targetBlocks >= MaxHashFetch { targetBlocks = MaxHashFetch - 15 } @@ -1016,12 +1005,12 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create various peers with various parts of the chain targetPeers := 8 - targetBlocks := targetPeers*blockCacheLimit - 15 + targetBlocks := targetPeers*blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) + tester.newPeer(id, protocol, hashes[i*blockCacheItems:], headers, blocks, receipts) } if err := tester.sync("peer #0", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) @@ -1045,7 +1034,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Create peers of every type @@ -1084,7 +1073,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a block chain to download - targetBlocks := 2*blockCacheLimit - 15 + targetBlocks := 2*blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -1110,8 +1099,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { bodiesNeeded++ } } - for hash, receipt := range receipts { - if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= tester.downloader.queue.fastSyncPivot { + for _, receipt := range receipts { + if mode == FastSync && len(receipt) > 0 { receiptsNeeded++ } } @@ -1139,7 +1128,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt a full sync with an attacker feeding gapped headers @@ -1174,7 +1163,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt a full sync with an attacker feeding shifted headers @@ -1208,7 +1197,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks + targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt to sync with an attacker that feeds junk during the fast sync phase. @@ -1248,7 +1237,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 - tester.downloader.fsPivotFails = 0 tester.downloader.syncInitHook = func(uint64, uint64) { for i := missing; i <= len(hashes); i++ { delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i]) @@ -1267,8 +1255,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Errorf("fast sync pivot block #%d not rolled back", head) } } - tester.downloader.fsPivotFails = fsCriticalTrials - // Synchronise with the valid peer and make sure sync succeeds. Since the last // rollback should also disable fast syncing for this process, verify that we // did a fresh full sync. Note, we can't assert anything about the receipts @@ -1383,7 +1369,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1532,7 +1518,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1609,7 +1595,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small block chain - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1697,6 +1683,7 @@ func TestDeliverHeadersHang(t *testing.T) { type floodingTestPeer struct { peer Peer tester *downloadTester + pend sync.WaitGroup } func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } @@ -1717,9 +1704,12 @@ func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int deliveriesDone := make(chan struct{}, 500) for i := 0; i < cap(deliveriesDone); i++ { peer := fmt.Sprintf("fake-peer%d", i) + ftp.pend.Add(1) + go func() { ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) deliveriesDone <- struct{}{} + ftp.pend.Done() }() } // Deliver the actual requested headers. @@ -1751,110 +1741,15 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { // Whenever the downloader requests headers, flood it with // a lot of unrequested header deliveries. tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ - tester.downloader.peers.peers["peer"].peer, - tester, + peer: tester.downloader.peers.peers["peer"].peer, + tester: tester, } if err := tester.sync("peer", nil, mode); err != nil { - t.Errorf("sync failed: %v", err) + t.Errorf("test %d: sync failed: %v", i, err) } tester.terminate() - } -} -// Tests that if fast sync aborts in the critical section, it can restart a few -// times before giving up. -// We use data driven subtests to manage this so that it will be parallel on its own -// and not with the other tests, avoiding intermittent failures. -func TestFastCriticalRestarts(t *testing.T) { - testCases := []struct { - protocol int - progress bool - }{ - {63, false}, - {64, false}, - {63, true}, - {64, true}, - } - for _, tc := range testCases { - t.Run(fmt.Sprintf("protocol %d progress %v", tc.protocol, tc.progress), func(t *testing.T) { - testFastCriticalRestarts(t, tc.protocol, tc.progress) - }) - } -} - -func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { - t.Parallel() - - tester := newTester() - defer tester.terminate() - - // Create a large enough blockchin to actually fast sync on - targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - // Create a tester peer with a critical section header missing (force failures) - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) - delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1]) - tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test - - // Remove all possible pivot state roots and slow down replies (test failure resets later) - for i := 0; i < fsPivotInterval; i++ { - tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true - } - (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(500 * time.Millisecond) // Enough to reach the critical section - - // Synchronise with the peer a few times and make sure they fail until the retry limit - for i := 0; i < int(fsCriticalTrials)-1; i++ { - // Attempt a sync and ensure it fails properly - if err := tester.sync("peer", nil, FastSync); err == nil { - t.Fatalf("failing fast sync succeeded: %v", err) - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - - // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes - if i == 0 { - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - if tester.downloader.fsPivotLock == nil { - time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too - t.Fatalf("pivot block not locked in after critical section failure") - } - tester.lock.Lock() - tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] - tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} - (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(0) - tester.lock.Unlock() - } - } - // Return all nodes if we're testing fast sync progression - if progress { - tester.lock.Lock() - tester.peerMissingStates["peer"] = map[common.Hash]bool{} - tester.lock.Unlock() - - if err := tester.sync("peer", nil, FastSync); err != nil { - t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err) - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - - if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 { - t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1) - } - assertOwnChain(t, tester, targetBlocks+1) - } else { - if err := tester.sync("peer", nil, FastSync); err == nil { - t.Fatalf("succeeded to synchronise blocks in failed fast sync") - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - - if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials { - t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials) - } - } - // Retry limit exhausted, downloader will switch to full sync, should succeed - if err := tester.sync("peer", nil, FastSync); err != nil { - t.Fatalf("failed to synchronise blocks in slow sync: %v", err) + // Flush all goroutines to prevent messing with subsequent tests + tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait() } - // Note, we can't assert the chain here because the test asserter assumes sync - // completed using a single mode of operation, whereas fast-then-slow can result - // in arbitrary intermediate state that's not cleanly verifiable. } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 6926f1d8c..a1a70e46e 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -32,7 +32,11 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download +var ( + blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download + blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching + blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones +) var ( errNoFetchesPending = errors.New("no fetches pending") @@ -41,17 +45,17 @@ var ( // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { - Peer *peerConnection // Peer to which the request was sent - From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) - Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) - Headers []*types.Header // [eth/62] Requested headers, sorted by request order - Time time.Time // Time when the request was made + Peer *peerConnection // Peer to which the request was sent + From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) + Headers []*types.Header // [eth/62] Requested headers, sorted by request order + Time time.Time // Time when the request was made } // fetchResult is a struct collecting partial results from data fetchers until // all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { - Pending int // Number of data fetches still pending + Pending int // Number of data fetches still pending + Hash common.Hash // Hash of the header to prevent recalculating Header *types.Header Uncles []*types.Header @@ -61,12 +65,10 @@ type fetchResult struct { // queue represents hashes that are either need fetching or are being fetched type queue struct { - mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching - fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode - - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable @@ -87,8 +89,9 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - resultCache []*fetchResult // Downloaded but not yet delivered fetch results - resultOffset uint64 // Offset of the first cached fetch result in the block chain + resultCache []*fetchResult // Downloaded but not yet delivered fetch results + resultOffset uint64 // Offset of the first cached fetch result in the block chain + resultSize common.StorageSize // Approximate size of a block (exponential moving average) lock *sync.Mutex active *sync.Cond @@ -109,7 +112,7 @@ func newQueue() *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - resultCache: make([]*fetchResult, blockCacheLimit), + resultCache: make([]*fetchResult, blockCacheItems), active: sync.NewCond(lock), lock: lock, } @@ -122,10 +125,8 @@ func (q *queue) Reset() { q.closed = false q.mode = FullSync - q.fastSyncPivot = 0 q.headerHead = common.Hash{} - q.headerPendPool = make(map[string]*fetchRequest) q.blockTaskPool = make(map[common.Hash]*types.Header) @@ -138,7 +139,7 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.resultCache = make([]*fetchResult, blockCacheLimit) + q.resultCache = make([]*fetchResult, blockCacheItems) q.resultOffset = 0 } @@ -214,27 +215,13 @@ func (q *queue) Idle() bool { return (queued + pending + cached) == 0 } -// FastSyncPivot retrieves the currently used fast sync pivot point. -func (q *queue) FastSyncPivot() uint64 { - q.lock.Lock() - defer q.lock.Unlock() - - return q.fastSyncPivot -} - // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { q.lock.Lock() defer q.lock.Unlock() - // Calculate the currently in-flight block (body) requests - pending := 0 - for _, request := range q.blockPendPool { - pending += len(request.Hashes) + len(request.Headers) - } - // Throttle if more blocks (bodies) are in-flight than free space in the cache - return pending >= len(q.resultCache)-len(q.blockDonePool) + return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0 } // ShouldThrottleReceipts checks if the download should be throttled (active receipt @@ -243,13 +230,39 @@ func (q *queue) ShouldThrottleReceipts() bool { q.lock.Lock() defer q.lock.Unlock() - // Calculate the currently in-flight receipt requests + return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0 +} + +// resultSlots calculates the number of results slots available for requests +// whilst adhering to both the item and the memory limit too of the results +// cache. +func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int { + // Calculate the maximum length capped by the memory limit + limit := len(q.resultCache) + if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) { + limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + } + // Calculate the number of slots already finished + finished := 0 + for _, result := range q.resultCache[:limit] { + if result == nil { + break + } + if _, ok := donePool[result.Hash]; ok { + finished++ + } + } + // Calculate the number of slots currently downloading pending := 0 - for _, request := range q.receiptPendPool { - pending += len(request.Headers) + for _, request := range pendPool { + for _, header := range request.Headers { + if header.Number.Uint64() < q.resultOffset+uint64(limit) { + pending++ + } + } } - // Throttle if more receipts are in-flight than free space in the cache - return pending >= len(q.resultCache)-len(q.receiptDonePool) + // Return the free slots to distribute + return limit - finished - pending } // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill @@ -323,8 +336,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) - if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { - // Fast phase of the fast sync, retrieve receipts too + if q.mode == FastSync { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } @@ -335,18 +347,25 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// WaitResults retrieves and permanently removes a batch of fetch -// results from the cache. the result slice will be empty if the queue -// has been closed. -func (q *queue) WaitResults() []*fetchResult { +// Results retrieves and permanently removes a batch of fetch results from +// the cache. the result slice will be empty if the queue has been closed. +func (q *queue) Results(block bool) []*fetchResult { q.lock.Lock() defer q.lock.Unlock() + // Count the number of items available for processing nproc := q.countProcessableItems() for nproc == 0 && !q.closed { + if !block { + return nil + } q.active.Wait() nproc = q.countProcessableItems() } + // Since we have a batch limit, don't pull more into "dangling" memory + if nproc > maxResultsProcess { + nproc = maxResultsProcess + } results := make([]*fetchResult, nproc) copy(results, q.resultCache[:nproc]) if len(results) > 0 { @@ -363,6 +382,21 @@ func (q *queue) WaitResults() []*fetchResult { } // Advance the expected block number of the first cache entry. q.resultOffset += uint64(nproc) + + // Recalculate the result item weights to prevent memory exhaustion + for _, result := range results { + size := result.Header.Size() + for _, uncle := range result.Uncles { + size += uncle.Size() + } + for _, receipt := range result.Receipts { + size += receipt.Size() + } + for _, tx := range result.Transactions { + size += tx.Size() + } + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize + } } return results } @@ -370,21 +404,9 @@ func (q *queue) WaitResults() []*fetchResult { // countProcessableItems counts the processable items. func (q *queue) countProcessableItems() int { for i, result := range q.resultCache { - // Don't process incomplete or unavailable items. if result == nil || result.Pending > 0 { return i } - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } } return len(q.resultCache) } @@ -473,10 +495,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common return nil, false, nil } // Calculate an upper limit on the items we might fetch (i.e. throttling) - space := len(q.resultCache) - len(donePool) - for _, request := range pendPool { - space -= len(request.Headers) - } + space := q.resultSlots(pendPool, donePool) + // Retrieve a batch of tasks, skipping previously failed ones send := make([]*types.Header, 0, count) skip := make([]*types.Header, 0) @@ -484,6 +504,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common progress := false for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) + hash := header.Hash() // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) @@ -493,18 +514,19 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common } if q.resultCache[index] == nil { components := 1 - if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + if q.mode == FastSync { components = 2 } q.resultCache[index] = &fetchResult{ Pending: components, + Hash: hash, Header: header, } } // If this fetch task is a noop, skip this fetch operation if isNoop(header) { - donePool[header.Hash()] = struct{}{} - delete(taskPool, header.Hash()) + donePool[hash] = struct{}{} + delete(taskPool, hash) space, proc = space-1, proc-1 q.resultCache[index].Pending-- @@ -512,7 +534,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common continue } // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.Lacks(header.Hash()) { + if p.Lacks(hash) { skip = append(skip, header) } else { send = append(send, header) @@ -565,9 +587,6 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m if request.From > 0 { taskQueue.Push(request.From, -float32(request.From)) } - for hash, index := range request.Hashes { - taskQueue.Push(hash, float32(index)) - } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } @@ -640,18 +659,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, if request.From > 0 { taskQueue.Push(request.From, -float32(request.From)) } - for hash, index := range request.Hashes { - taskQueue.Push(hash, float32(index)) - } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } // Add the peer to the expiry report along the the number of failed requests - expirations := len(request.Hashes) - if expirations < len(request.Headers) { - expirations = len(request.Headers) - } - expiries[id] = expirations + expiries[id] = len(request.Headers) } } // Remove the expired requests from the pending pool @@ -828,14 +840,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ failure = err break } - donePool[header.Hash()] = struct{}{} + hash := header.Hash() + + donePool[hash] = struct{}{} q.resultCache[index].Pending-- useful = true accepted++ // Clean up a successful fetch request.Headers[i] = nil - delete(taskPool, header.Hash()) + delete(taskPool, hash) } // Return all failed or missing fetches to the queue for _, header := range request.Headers { @@ -860,7 +874,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { +func (q *queue) Prepare(offset uint64, mode SyncMode) { q.lock.Lock() defer q.lock.Unlock() @@ -868,6 +882,5 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. if q.resultOffset < offset { q.resultOffset = offset } - q.fastSyncPivot = pivot q.mode = mode } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index a0b05c9be..9cc65a208 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -20,7 +20,6 @@ import ( "fmt" "hash" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -132,7 +131,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { // Send the next finished request to the current sync: case deliverReqCh <- deliverReq: - finished = append(finished[:0], finished[1:]...) + // Shift out the first request, but also set the emptied slot to nil for GC + copy(finished, finished[1:]) + finished[len(finished)-1] = nil + finished = finished[:len(finished)-1] // Handle incoming state packs: case pack := <-d.stateCh: @@ -291,6 +293,9 @@ func (s *stateSync) loop() error { case <-s.cancel: return errCancelStateFetch + case <-s.d.cancelCh: + return errCancelStateFetch + case req := <-s.deliver: // Response, disconnect or timeout triggered, drop the peer if stalling log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) @@ -301,15 +306,11 @@ func (s *stateSync) loop() error { s.d.dropPeer(req.peer.id) } // Process all the received blobs and check for stale delivery - stale, err := s.process(req) - if err != nil { + if err := s.process(req); err != nil { log.Warn("Node data write error", "err", err) return err } - // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) - if !stale { - req.peer.SetNodeDataIdle(len(req.response)) - } + req.peer.SetNodeDataIdle(len(req.response)) } } return s.commit(true) @@ -349,6 +350,7 @@ func (s *stateSync) assignTasks() { case s.d.trackStateReq <- req: req.peer.FetchNodeData(req.items) case <-s.cancel: + case <-s.d.cancelCh: } } } @@ -387,7 +389,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { // process iterates over a batch of delivered state data, injecting each item // into a running state sync, re-queuing any items that were requested but not // delivered. -func (s *stateSync) process(req *stateReq) (bool, error) { +func (s *stateSync) process(req *stateReq) error { // Collect processing stats and update progress if valid data was received duplicate, unexpected := 0, 0 @@ -398,7 +400,7 @@ func (s *stateSync) process(req *stateReq) (bool, error) { }(time.Now()) // Iterate over all the delivered data and inject one-by-one into the trie - progress, stale := false, len(req.response) > 0 + progress := false for _, blob := range req.response { prog, hash, err := s.processNodeData(blob) @@ -412,20 +414,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) { case trie.ErrAlreadyProcessed: duplicate++ default: - return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) } - // If the node delivered a requested item, mark the delivery non-stale if _, ok := req.tasks[hash]; ok { delete(req.tasks, hash) - stale = false } } - // If we're inside the critical section, reset fail counter since we progressed. - if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 { - log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) - atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Put unfulfilled tasks back into the retry queue npeers := s.d.peers.Len() for hash, task := range req.tasks { @@ -438,12 +432,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) { // If we've requested the node too many times already, it may be a malicious // sync where nobody has the right data. Abort. if len(task.attempts) >= npeers { - return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) } // Missing item, place into the retry queue. s.tasks[hash] = task } - return stale, nil + return nil } // processNodeData tries to inject a trie node data blob delivered from a remote diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index e6a639417..9d53b98b6 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -52,7 +52,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common // If the block number is multiple of 3, send a bonus transaction to the miner if parent == genesis && i%3 == 0 { signer := types.MakeSigner(params.TestChainConfig, block.Number()) - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), new(big.Int).SetUint64(params.TxGas), nil, nil), signer, testKey) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) if err != nil { panic(err) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 03c1d6afc..406c9442e 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -25,6 +25,7 @@ import ( "sync" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" @@ -240,7 +241,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc matchedLogs = make(chan []*types.Log) ) - logsSub, err := api.events.SubscribeLogs(crit, matchedLogs) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) if err != nil { return nil, err } @@ -267,6 +268,8 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc } // FilterCriteria represents a request to create a new filter. +// +// TODO(karalabe): Kill this in favor of ethereum.FilterQuery. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int @@ -289,7 +292,7 @@ type FilterCriteria struct { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { logs := make(chan []*types.Log) - logsSub, err := api.events.SubscribeLogs(crit, logs) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) if err != nil { return rpc.ID(""), err } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index e08cedb27..b09998f9c 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -25,6 +25,7 @@ import ( "sync" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -75,7 +76,7 @@ type subscription struct { id rpc.ID typ Type created time.Time - logsCrit FilterCriteria + logsCrit ethereum.FilterQuery logs chan []*types.Log hashes chan common.Hash headers chan *types.Header @@ -162,7 +163,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*types.Log) (*Subscription, error) { +func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber @@ -200,7 +201,7 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*types.Log // subscribeMinedPendingLogs creates a subscription that returned mined and // pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: MinedAndPendingLogsSubscription, @@ -217,7 +218,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: LogsSubscription, @@ -234,7 +235,7 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log // subscribePendingLogs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription { +func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingLogsSubscription, diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index a5025db3d..7ec3b4be7 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" @@ -195,11 +196,11 @@ func TestPendingTxFilter(t *testing.T) { api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ - types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), - types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), - types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), - types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), - types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } hashes []common.Hash @@ -488,27 +489,27 @@ func TestPendingLogsSubscription(t *testing.T) { } testCases = []struct { - crit FilterCriteria + crit ethereum.FilterQuery expected []*types.Log c chan []*types.Log sub *Subscription }{ // match all - {FilterCriteria{}, convertLogs(allLogs), nil, nil}, + {ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil}, // match none due to no matching addresses - {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil}, // match logs based on addresses, ignore topics - {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // match none due to no matching topics (match with address) - {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil}, // match logs based on addresses and topics - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil}, // match logs based on multiple addresses and "or" topics - {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil}, // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes - {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // multiple pending logs, should match only 2 topics from the logs in block 5 - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, } ) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 729d0afc8..0018142c4 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -34,7 +34,7 @@ import ( ) func makeReceipt(addr common.Address) *types.Receipt { - receipt := types.NewReceipt(nil, false, new(big.Int)) + receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ {Address: addr}, } @@ -136,7 +136,7 @@ func TestFilters(t *testing.T) { chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) { switch i { case 1: - receipt := types.NewReceipt(nil, false, new(big.Int)) + receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ { Address: addr, @@ -145,7 +145,7 @@ func TestFilters(t *testing.T) { } gen.AddUncheckedReceipt(receipt) case 2: - receipt := types.NewReceipt(nil, false, new(big.Int)) + receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ { Address: addr, @@ -154,7 +154,7 @@ func TestFilters(t *testing.T) { } gen.AddUncheckedReceipt(receipt) case 998: - receipt := types.NewReceipt(nil, false, new(big.Int)) + receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ { Address: addr, @@ -163,7 +163,7 @@ func TestFilters(t *testing.T) { } gen.AddUncheckedReceipt(receipt) case 999: - receipt := types.NewReceipt(nil, false, new(big.Int)) + receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ { Address: addr, diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go index c662348e1..54325692c 100644 --- a/eth/gasprice/gasprice.go +++ b/eth/gasprice/gasprice.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" @@ -101,9 +102,9 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { ch := make(chan getBlockPricesResult, gpo.checkBlocks) sent := 0 exp := 0 - var txPrices []*big.Int + var blockPrices []*big.Int for sent < gpo.checkBlocks && blockNum > 0 { - go gpo.getBlockPrices(ctx, blockNum, ch) + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) sent++ exp++ blockNum-- @@ -115,8 +116,8 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { return lastPrice, res.err } exp-- - if len(res.prices) > 0 { - txPrices = append(txPrices, res.prices...) + if res.price != nil { + blockPrices = append(blockPrices, res.price) continue } if maxEmpty > 0 { @@ -124,16 +125,16 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { continue } if blockNum > 0 && sent < gpo.maxBlocks { - go gpo.getBlockPrices(ctx, blockNum, ch) + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) sent++ exp++ blockNum-- } } price := lastPrice - if len(txPrices) > 0 { - sort.Sort(bigIntArray(txPrices)) - price = txPrices[(len(txPrices)-1)*gpo.percentile/100] + if len(blockPrices) > 0 { + sort.Sort(bigIntArray(blockPrices)) + price = blockPrices[(len(blockPrices)-1)*gpo.percentile/100] } if price.Cmp(maxPrice) > 0 { price = new(big.Int).Set(maxPrice) @@ -147,24 +148,38 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { } type getBlockPricesResult struct { - prices []*big.Int - err error + price *big.Int + err error } -// getLowestPrice calculates the lowest transaction gas price in a given block +type transactionsByGasPrice []*types.Transaction + +func (t transactionsByGasPrice) Len() int { return len(t) } +func (t transactionsByGasPrice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPrice().Cmp(t[j].GasPrice()) < 0 } + +// getBlockPrices calculates the lowest transaction gas price in a given block // and sends it to the result channel. If the block is empty, price is nil. -func (gpo *Oracle) getBlockPrices(ctx context.Context, blockNum uint64, ch chan getBlockPricesResult) { +func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, ch chan getBlockPricesResult) { block, err := gpo.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum)) if block == nil { ch <- getBlockPricesResult{nil, err} return } - txs := block.Transactions() - prices := make([]*big.Int, len(txs)) - for i, tx := range txs { - prices[i] = tx.GasPrice() + + blockTxs := block.Transactions() + txs := make([]*types.Transaction, len(blockTxs)) + copy(txs, blockTxs) + sort.Sort(transactionsByGasPrice(txs)) + + for _, tx := range txs { + sender, err := types.Sender(signer, tx) + if err == nil && sender != block.Coinbase() { + ch <- getBlockPricesResult{tx.GasPrice(), nil} + return + } } - ch <- getBlockPricesResult{prices, nil} + ch <- getBlockPricesResult{nil, nil} } type bigIntArray []*big.Int diff --git a/eth/gen_config.go b/eth/gen_config.go index e2d50e1f6..4f2e82d94 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -13,6 +13,8 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" ) +var _ = (*configMarshaling)(nil) + func (c Config) MarshalTOML() (interface{}, error) { type Config struct { Genesis *core.Genesis `toml:",omitempty"` @@ -20,7 +22,6 @@ func (c Config) MarshalTOML() (interface{}, error) { SyncMode downloader.SyncMode LightServ int `toml:",omitempty"` LightPeers int `toml:",omitempty"` - MaxPeers int `toml:"-"` SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int @@ -28,17 +29,11 @@ func (c Config) MarshalTOML() (interface{}, error) { MinerThreads int `toml:",omitempty"` ExtraData hexutil.Bytes `toml:",omitempty"` GasPrice *big.Int - EthashCacheDir string - EthashCachesInMem int - EthashCachesOnDisk int - EthashDatasetDir string - EthashDatasetsInMem int - EthashDatasetsOnDisk int + Ethash ethash.Config TxPool core.TxPoolConfig GPO gasprice.Config EnablePreimageRecording bool - DocRoot string `toml:"-"` - PowMode ethash.Mode `toml:"-"` + DocRoot string `toml:"-"` } var enc Config enc.Genesis = c.Genesis @@ -53,17 +48,11 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.MinerThreads = c.MinerThreads enc.ExtraData = c.ExtraData enc.GasPrice = c.GasPrice - enc.EthashCacheDir = c.Ethash.CacheDir - enc.EthashCachesInMem = c.Ethash.CachesInMem - enc.EthashCachesOnDisk = c.Ethash.CachesOnDisk - enc.EthashDatasetDir = c.Ethash.DatasetDir - enc.EthashDatasetsInMem = c.Ethash.DatasetsInMem - enc.EthashDatasetsOnDisk = c.Ethash.DatasetsOnDisk + enc.Ethash = c.Ethash enc.TxPool = c.TxPool enc.GPO = c.GPO enc.EnablePreimageRecording = c.EnablePreimageRecording enc.DocRoot = c.DocRoot - enc.PowMode = c.Ethash.PowMode return &enc, nil } @@ -74,25 +63,18 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { SyncMode *downloader.SyncMode LightServ *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"` - MaxPeers *int `toml:"-"` SkipBcVersionCheck *bool `toml:"-"` DatabaseHandles *int `toml:"-"` DatabaseCache *int Etherbase *common.Address `toml:",omitempty"` MinerThreads *int `toml:",omitempty"` - ExtraData hexutil.Bytes `toml:",omitempty"` + ExtraData *hexutil.Bytes `toml:",omitempty"` GasPrice *big.Int - EthashCacheDir *string - EthashCachesInMem *int - EthashCachesOnDisk *int - EthashDatasetDir *string - EthashDatasetsInMem *int - EthashDatasetsOnDisk *int + Ethash *ethash.Config TxPool *core.TxPoolConfig GPO *gasprice.Config EnablePreimageRecording *bool - DocRoot *string `toml:"-"` - PowMode *ethash.Mode `toml:"-"` + DocRoot *string `toml:"-"` } var dec Config if err := unmarshal(&dec); err != nil { @@ -129,28 +111,13 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { c.MinerThreads = *dec.MinerThreads } if dec.ExtraData != nil { - c.ExtraData = dec.ExtraData + c.ExtraData = *dec.ExtraData } if dec.GasPrice != nil { c.GasPrice = dec.GasPrice } - if dec.EthashCacheDir != nil { - c.Ethash.CacheDir = *dec.EthashCacheDir - } - if dec.EthashCachesInMem != nil { - c.Ethash.CachesInMem = *dec.EthashCachesInMem - } - if dec.EthashCachesOnDisk != nil { - c.Ethash.CachesOnDisk = *dec.EthashCachesOnDisk - } - if dec.EthashDatasetDir != nil { - c.Ethash.DatasetDir = *dec.EthashDatasetDir - } - if dec.EthashDatasetsInMem != nil { - c.Ethash.DatasetsInMem = *dec.EthashDatasetsInMem - } - if dec.EthashDatasetsOnDisk != nil { - c.Ethash.DatasetsOnDisk = *dec.EthashDatasetsOnDisk + if dec.Ethash != nil { + c.Ethash = *dec.Ethash } if dec.TxPool != nil { c.TxPool = *dec.TxPool @@ -164,8 +131,5 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DocRoot != nil { c.DocRoot = *dec.DocRoot } - if dec.PowMode != nil { - c.Ethash.PowMode = *dec.PowMode - } return nil } diff --git a/eth/handler.go b/eth/handler.go index 31c311687..c2426544f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -71,7 +71,6 @@ type ProtocolManager struct { txpool txPool blockchain *core.BlockChain - chaindb ethdb.Database chainconfig *params.ChainConfig maxPeers int @@ -106,7 +105,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne eventMux: mux, txpool: txpool, blockchain: blockchain, - chaindb: chaindb, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), @@ -257,8 +255,14 @@ func (pm *ProtocolManager) handle(p *peer) error { p.Log().Debug("Ethereum peer connected", "name", p.Name()) // Execute the Ethereum handshake - td, head, genesis := pm.blockchain.Status() - if err := p.Handshake(pm.networkId, td, head, genesis); err != nil { + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + hash = head.Hash() + number = head.Number.Uint64() + td = pm.blockchain.GetTd(hash, number) + ) + if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } @@ -394,14 +398,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { case query.Reverse: // Number based traversal towards the genesis block if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= (query.Skip + 1) + query.Origin.Number -= query.Skip + 1 } else { unknown = true } case !query.Reverse: // Number based traversal towards the leaf block - query.Origin.Number += (query.Skip + 1) + query.Origin.Number += query.Skip + 1 } } return p.SendBlockHeaders(headers) @@ -532,7 +536,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested state entry, stopping if enough was found - if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil { + if entry, err := pm.blockchain.TrieNode(hash); err == nil { data = append(data, entry) bytes += len(entry) } @@ -570,7 +574,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block's receipts, skipping if unknown to us - results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash)) + results := pm.blockchain.GetReceiptsByHash(hash) if results == nil { if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { continue diff --git a/eth/handler_test.go b/eth/handler_test.go index ebbd83c3a..e336dfa28 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -37,8 +37,6 @@ import ( "github.com/ethereum/go-ethereum/params" ) -var bigTxGas = new(big.Int).SetUint64(params.TxGas) - // Tests that protocol versions and modes of operations are matched up properly. func TestProtocolCompatibility(t *testing.T) { // Define the compatibility chart @@ -58,7 +56,7 @@ func TestProtocolCompatibility(t *testing.T) { for i, tt := range tests { ProtocolVersions = []uint{tt.version} - pm, err := newTestProtocolManager(tt.mode, 0, nil, nil) + pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil) if pm != nil { defer pm.Stop() } @@ -73,7 +71,7 @@ func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) } func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) } func testGetBlockHeaders(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil) peer, _ := newTestPeer("peer", protocol, pm, true) defer peer.close() @@ -232,7 +230,7 @@ func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) } func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) } func testGetBlockBodies(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil) peer, _ := newTestPeer("peer", protocol, pm, true) defer peer.close() @@ -315,13 +313,13 @@ func testGetNodeData(t *testing.T, protocol int) { switch i { case 0: // In block 1, the test bank sends account #1 some ether. - tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), bigTxGas, nil, nil), signer, testBankKey) + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) block.AddTx(tx) case 1: // In block 2, the test bank sends some more ether to account #1. // acc1Addr passes it on to account #2. - tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), bigTxGas, nil, nil), signer, testBankKey) - tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), bigTxGas, nil, nil), signer, acc1Key) + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) block.AddTx(tx1) block.AddTx(tx2) case 2: @@ -339,13 +337,13 @@ func testGetNodeData(t *testing.T, protocol int) { } } // Assemble the test environment - pm := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) peer, _ := newTestPeer("peer", protocol, pm, true) defer peer.close() // Fetch for now the entire chain db hashes := []common.Hash{} - for _, key := range pm.chaindb.(*ethdb.MemDatabase).Keys() { + for _, key := range db.Keys() { if len(key) == len(common.Hash{}) { hashes = append(hashes, common.BytesToHash(key)) } @@ -407,13 +405,13 @@ func testGetReceipt(t *testing.T, protocol int) { switch i { case 0: // In block 1, the test bank sends account #1 some ether. - tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), bigTxGas, nil, nil), signer, testBankKey) + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) block.AddTx(tx) case 1: // In block 2, the test bank sends some more ether to account #1. // acc1Addr passes it on to account #2. - tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), bigTxGas, nil, nil), signer, testBankKey) - tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), bigTxGas, nil, nil), signer, acc1Key) + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) block.AddTx(tx1) block.AddTx(tx2) case 2: @@ -431,7 +429,7 @@ func testGetReceipt(t *testing.T, protocol int) { } } // Assemble the test environment - pm := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) peer, _ := newTestPeer("peer", protocol, pm, true) defer peer.close() @@ -441,7 +439,7 @@ func testGetReceipt(t *testing.T, protocol int) { block := pm.blockchain.GetBlockByNumber(i) hashes = append(hashes, block.Hash()) - receipts = append(receipts, core.GetBlockReceipts(pm.chaindb, block.Hash(), block.NumberU64())) + receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash())) } // Send the hash request and verify the response p2p.Send(peer.app, 0x0f, hashes) @@ -474,7 +472,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, nil, config, pow, vm.Config{}) ) pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db) if err != nil { diff --git a/eth/helper_test.go b/eth/helper_test.go index bfb003c8b..2b05cea80 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -49,7 +49,7 @@ var ( // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. -func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, error) { +func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase, error) { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() @@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) ) chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { @@ -68,22 +68,22 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db) if err != nil { - return nil, err + return nil, nil, err } pm.Start(1000) - return pm, nil + return pm, db, nil } // newTestProtocolManagerMust creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. In case of an error, the constructor force- // fails the test. -func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) *ProtocolManager { - pm, err := newTestProtocolManager(mode, blocks, generator, newtx) +func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase) { + pm, db, err := newTestProtocolManager(mode, blocks, generator, newtx) if err != nil { t.Fatalf("Failed to create protocol manager: %v", err) } - return pm + return pm, db } // testTxPool is a fake, helper transaction pool for testing purposes @@ -130,7 +130,7 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { - tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize)) + tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize)) tx, _ = types.SignTx(tx, types.HomesteadSigner{}, from) return tx } @@ -166,8 +166,12 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te tp := &testPeer{app: app, net: net, peer: peer} // Execute any implicitly requested handshakes and return if shake { - td, head, genesis := pm.blockchain.Status() - tp.handshake(nil, td, head, genesis) + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) + tp.handshake(nil, td, head.Hash(), genesis.Hash()) } return tp, errc } diff --git a/eth/protocol_test.go b/eth/protocol_test.go index d3a44ae91..b2f93d8dd 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -41,8 +41,12 @@ func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) } func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) } func testStatusMsgErrors(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) - td, currentBlock, genesis := pm.blockchain.Status() + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) defer pm.Stop() tests := []struct { @@ -55,16 +59,16 @@ func testStatusMsgErrors(t *testing.T, protocol int) { wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), }, { - code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, td, currentBlock, genesis}, + code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, td, head.Hash(), genesis.Hash()}, wantError: errResp(ErrProtocolVersionMismatch, "10 (!= %d)", protocol), }, { - code: StatusMsg, data: statusData{uint32(protocol), 999, td, currentBlock, genesis}, + code: StatusMsg, data: statusData{uint32(protocol), 999, td, head.Hash(), genesis.Hash()}, wantError: errResp(ErrNetworkIdMismatch, "999 (!= 1)"), }, { - code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, td, currentBlock, common.Hash{3}}, - wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000 (!= %x)", genesis[:8]), + code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, td, head.Hash(), common.Hash{3}}, + wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000 (!= %x)", genesis.Hash().Bytes()[:8]), }, } @@ -94,7 +98,7 @@ func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } func testRecvTransactions(t *testing.T, protocol int) { txAdded := make(chan []*types.Transaction) - pm := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, txAdded) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, txAdded) pm.acceptTxs = 1 // mark synced to accept transactions p, _ := newTestPeer("peer", protocol, pm, true) defer pm.Stop() @@ -121,7 +125,7 @@ func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) } func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) } func testSendTransactions(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) defer pm.Stop() // Fill the pool with big transactions. diff --git a/eth/sync.go b/eth/sync.go index a8ae64617..2da1464bc 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -189,18 +189,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) { mode = downloader.FastSync } // Run the sync cycle, and disable fast sync if we've went past the pivot block - err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode) - - if atomic.LoadUint32(&pm.fastSync) == 1 { - // Disable fast sync if we indeed have something in our chain - if pm.blockchain.CurrentBlock().NumberU64() > 0 { - log.Info("Fast sync complete, auto disabling") - atomic.StoreUint32(&pm.fastSync, 0) - } - } - if err != nil { + if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } + if atomic.LoadUint32(&pm.fastSync) == 1 { + log.Info("Fast sync complete, auto disabling") + atomic.StoreUint32(&pm.fastSync, 0) + } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is diff --git a/eth/sync_test.go b/eth/sync_test.go index 9eaa1156f..88c10c7f7 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -30,12 +30,12 @@ import ( // imported into the blockchain. func TestFastSyncDisabling(t *testing.T) { // Create a pristine protocol manager, check that fast sync is left enabled - pmEmpty := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) + pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) if atomic.LoadUint32(&pmEmpty.fastSync) == 0 { t.Fatalf("fast sync disabled on pristine blockchain") } // Create a full protocol manager, check that fast sync gets disabled - pmFull := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil) + pmFull, _ := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil) if atomic.LoadUint32(&pmFull.fastSync) == 1 { t.Fatalf("fast sync not disabled on non-empty blockchain") } diff --git a/eth/tracers/internal/tracers/assets.go b/eth/tracers/internal/tracers/assets.go index cb0421008..1912f74ed 100644 --- a/eth/tracers/internal/tracers/assets.go +++ b/eth/tracers/internal/tracers/assets.go @@ -1,4 +1,4 @@ -// Code generated by go-bindata. +// Code generated by go-bindata. DO NOT EDIT. // sources: // 4byte_tracer.js // call_tracer.js @@ -6,7 +6,6 @@ // noop_tracer.js // opcount_tracer.js // prestate_tracer.js -// DO NOT EDIT! package tracers @@ -197,8 +196,8 @@ func prestate_tracerJs() (*asset, error) { // It returns an error if the asset could not be found or // could not be loaded. func Asset(name string) ([]byte, error) { - cannonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[cannonicalName]; ok { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { a, err := f() if err != nil { return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) @@ -223,8 +222,8 @@ func MustAsset(name string) []byte { // It returns an error if the asset could not be found or // could not be loaded. func AssetInfo(name string) (os.FileInfo, error) { - cannonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[cannonicalName]; ok { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { a, err := f() if err != nil { return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) @@ -245,11 +244,16 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "4byte_tracer.js": _4byte_tracerJs, - "call_tracer.js": call_tracerJs, - "evmdis_tracer.js": evmdis_tracerJs, - "noop_tracer.js": noop_tracerJs, - "opcount_tracer.js": opcount_tracerJs, + "4byte_tracer.js": _4byte_tracerJs, + + "call_tracer.js": call_tracerJs, + + "evmdis_tracer.js": evmdis_tracerJs, + + "noop_tracer.js": noop_tracerJs, + + "opcount_tracer.js": opcount_tracerJs, + "prestate_tracer.js": prestate_tracerJs, } @@ -269,8 +273,8 @@ var _bindata = map[string]func() (*asset, error){ func AssetDir(name string) ([]string, error) { node := _bintree if len(name) != 0 { - cannonicalName := strings.Replace(name, "\\", "/", -1) - pathList := strings.Split(cannonicalName, "/") + canonicalName := strings.Replace(name, "\\", "/", -1) + pathList := strings.Split(canonicalName, "/") for _, p := range pathList { node = node.Children[p] if node == nil { @@ -320,11 +324,7 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) - if err != nil { - return err - } - return nil + return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) } // RestoreAssets restores an asset under the given directory recursively @@ -345,6 +345,6 @@ func RestoreAssets(dir, name string) error { } func _filePath(dir, name string) string { - cannonicalName := strings.Replace(name, "\\", "/", -1) - return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) + canonicalName := strings.Replace(name, "\\", "/", -1) + return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...) } diff --git a/eth/tracers/tracer.go b/eth/tracers/tracer.go index f3f848fc1..4cec9e633 100644 --- a/eth/tracers/tracer.go +++ b/eth/tracers/tracer.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2017 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/eth/tracers/tracer_test.go b/eth/tracers/tracer_test.go index 7224a1489..117c376b8 100644 --- a/eth/tracers/tracer_test.go +++ b/eth/tracers/tracer_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2017 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/eth/tracers/tracers_test.go b/eth/tracers/tracers_test.go index 139280797..bf8120228 100644 --- a/eth/tracers/tracers_test.go +++ b/eth/tracers/tracers_test.go @@ -156,7 +156,7 @@ func TestCallTracer(t *testing.T) { BlockNumber: new(big.Int).SetUint64(uint64(test.Context.Number)), Time: new(big.Int).SetUint64(uint64(test.Context.Time)), Difficulty: (*big.Int)(test.Context.Difficulty), - GasLimit: new(big.Int).SetUint64(uint64(test.Context.GasLimit)), + GasLimit: uint64(test.Context.GasLimit), GasPrice: tx.GasPrice(), } db, _ := ethdb.NewMemDatabase() @@ -174,7 +174,7 @@ func TestCallTracer(t *testing.T) { t.Fatalf("failed to prepare transaction for tracing: %v", err) } st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.Gas())) - if _, _, _, _, err = st.TransitionDb(); err != nil { + if _, _, _, err = st.TransitionDb(); err != nil { t.Fatalf("failed to execute transaction: %v", err) } // Retrieve the trace result and compare against the etalon |