diff options
Diffstat (limited to 'eth')
33 files changed, 2328 insertions, 1719 deletions
diff --git a/eth/api.go b/eth/api.go index 81570988c..e91f51bb9 100644 --- a/eth/api.go +++ b/eth/api.go @@ -51,7 +51,7 @@ type PublicEthereumAPI struct { e *Ethereum } -// NewPublicEthereumAPI creates a new Etheruem protocol API for full nodes. +// NewPublicEthereumAPI creates a new Ethereum protocol API for full nodes. func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI { return &PublicEthereumAPI{e} } @@ -205,7 +205,7 @@ func (api *PrivateMinerAPI) GetHashrate() uint64 { return uint64(api.e.miner.HashRate()) } -// PrivateAdminAPI is the collection of Etheruem full node-related APIs +// PrivateAdminAPI is the collection of Ethereum full node-related APIs // exposed over the private admin endpoint. type PrivateAdminAPI struct { eth *Ethereum @@ -241,7 +241,7 @@ func (api *PrivateAdminAPI) ExportChain(file string) (bool, error) { func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool { for _, b := range bs { - if !chain.HasBlock(b.Hash()) { + if !chain.HasBlock(b.Hash(), b.NumberU64()) { return false } } @@ -298,7 +298,7 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } -// PublicDebugAPI is the collection of Etheruem full node APIs exposed +// PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { eth *Ethereum @@ -335,7 +335,7 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error return stateDb.RawDump(), nil } -// PrivateDebugAPI is the collection of Etheruem full node APIs exposed over +// PrivateDebugAPI is the collection of Ethereum full node APIs exposed over // the private debugging endpoint. type PrivateDebugAPI struct { config *params.ChainConfig @@ -465,26 +465,6 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf return true, structLogger.StructLogs(), nil } -// callmsg is the message type used for call transitions. -type callmsg struct { - addr common.Address - to *common.Address - gas, gasPrice *big.Int - value *big.Int - data []byte -} - -// accessor boilerplate to implement core.Message -func (m callmsg) From() (common.Address, error) { return m.addr, nil } -func (m callmsg) FromFrontier() (common.Address, error) { return m.addr, nil } -func (m callmsg) Nonce() uint64 { return 0 } -func (m callmsg) CheckNonce() bool { return false } -func (m callmsg) To() *common.Address { return m.to } -func (m callmsg) GasPrice() *big.Int { return m.gasPrice } -func (m callmsg) Gas() *big.Int { return m.gas } -func (m callmsg) Value() *big.Int { return m.value } -func (m callmsg) Data() []byte { return m.data } - // formatError formats a Go error into either an empty string or the data content // of the error itself. func formatError(err error) string { @@ -543,7 +523,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common. // Run the transaction with tracing enabled. vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{Debug: true, Tracer: tracer}) - ret, gas, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())) + ret, gas, failed, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())) if err != nil { return nil, fmt.Errorf("tracing failed: %v", err) } @@ -551,6 +531,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common. case *vm.StructLogger: return ðapi.ExecutionResult{ Gas: gas, + Failed: failed, ReturnValue: fmt.Sprintf("%x", ret), StructLogs: ethapi.FormatLogs(tracer.StructLogs()), }, nil @@ -590,7 +571,7 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int) (co vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{}) gp := new(core.GasPool).AddGas(tx.Gas()) - _, _, err := core.ApplyMessage(vmenv, msg, gp) + _, _, _, err := core.ApplyMessage(vmenv, msg, gp) if err != nil { return nil, vm.Context{}, nil, fmt.Errorf("tx %x failed: %v", tx.Hash(), err) } @@ -637,7 +618,7 @@ func (api *PrivateDebugAPI) StorageRangeAt(ctx context.Context, blockHash common return storageRangeAt(st, keyStart, maxResult), nil } -func storageRangeAt(st *trie.SecureTrie, start []byte, maxResult int) StorageRangeResult { +func storageRangeAt(st state.Trie, start []byte, maxResult int) StorageRangeResult { it := trie.NewIterator(st.NodeIterator(start)) result := StorageRangeResult{Storage: storageMap{}} for i := 0; i < maxResult && it.Next(); i++ { diff --git a/eth/api_backend.go b/eth/api_backend.go index fe108d272..91f392f94 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -31,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -81,11 +81,11 @@ func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumb return b.eth.blockchain.GetBlockByNumber(uint64(blockNr)), nil } -func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (ethapi.State, *types.Header, error) { +func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) { // Pending state is only known by the miner if blockNr == rpc.PendingBlockNumber { block, state := b.eth.miner.Pending() - return EthApiState{state}, block.Header(), nil + return state, block.Header(), nil } // Otherwise resolve the block number and return its state header, err := b.HeaderByNumber(ctx, blockNr) @@ -93,7 +93,7 @@ func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc. return nil, nil, err } stateDb, err := b.eth.BlockChain().StateAt(header.Root) - return EthApiState{stateDb}, header, err + return stateDb, header, err } func (b *EthApiBackend) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) { @@ -108,40 +108,43 @@ func (b *EthApiBackend) GetTd(blockHash common.Hash) *big.Int { return b.eth.blockchain.GetTdByHash(blockHash) } -func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state ethapi.State, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) { - statedb := state.(EthApiState).state - from := statedb.GetOrNewStateObject(msg.From()) - from.SetBalance(math.MaxBig256) +func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) { + state.SetBalance(msg.From(), math.MaxBig256) vmError := func() error { return nil } context := core.NewEVMContext(msg, header, b.eth.BlockChain(), nil) - return vm.NewEVM(context, statedb, b.eth.chainConfig, vmCfg), vmError, nil + return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil } -func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() +func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainEvent(ch) +} - b.eth.txPool.SetLocal(signedTx) - return b.eth.txPool.Add(signedTx) +func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainHeadEvent(ch) } -func (b *EthApiBackend) RemoveTx(txHash common.Hash) { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() +func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainSideEvent(ch) +} - b.eth.txPool.Remove(txHash) +func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.BlockChain().SubscribeLogsEvent(ch) } -func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() +func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { + return b.eth.txPool.AddLocal(signedTx) +} +func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { pending, err := b.eth.txPool.Pending() if err != nil { return nil, err } - var txs types.Transactions for _, batch := range pending { txs = append(txs, batch...) @@ -150,33 +153,25 @@ func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { } func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() - return b.eth.txPool.Get(hash) } func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() - return b.eth.txPool.State().GetNonce(addr), nil } func (b *EthApiBackend) Stats() (pending int, queued int) { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() - return b.eth.txPool.Stats() } func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { - b.eth.txMu.Lock() - defer b.eth.txMu.Unlock() - return b.eth.TxPool().Content() } +func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.TxPool().SubscribeTxPreEvent(ch) +} + func (b *EthApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } @@ -201,22 +196,13 @@ func (b *EthApiBackend) AccountManager() *accounts.Manager { return b.eth.AccountManager() } -type EthApiState struct { - state *state.StateDB -} - -func (s EthApiState) GetBalance(ctx context.Context, addr common.Address) (*big.Int, error) { - return s.state.GetBalance(addr), nil +func (b *EthApiBackend) BloomStatus() (uint64, uint64) { + sections, _, _ := b.eth.bloomIndexer.Sections() + return params.BloomBitsBlocks, sections } -func (s EthApiState) GetCode(ctx context.Context, addr common.Address) ([]byte, error) { - return s.state.GetCode(addr), nil -} - -func (s EthApiState) GetState(ctx context.Context, a common.Address, b common.Hash) (common.Hash, error) { - return s.state.GetState(a, b), nil -} - -func (s EthApiState) GetNonce(ctx context.Context, addr common.Address) (uint64, error) { - return s.state.GetNonce(addr), nil +func (b *EthApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { + for i := 0; i < bloomFilterThreads; i++ { + go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) + } } diff --git a/eth/api_test.go b/eth/api_test.go index f8d2e9c76..49ce38688 100644 --- a/eth/api_test.go +++ b/eth/api_test.go @@ -32,7 +32,7 @@ func TestStorageRangeAt(t *testing.T) { // Create a state where account 0x010000... has a few storage entries. var ( db, _ = ethdb.NewMemDatabase() - state, _ = state.New(common.Hash{}, db) + state, _ = state.New(common.Hash{}, state.NewDatabase(db)) addr = common.Address{0x01} keys = []common.Hash{ // hashes of Keys of storage common.HexToHash("340dd630ad21bf010b4e676dbfa9ba9a02175262d1fa356232cfde6cb5b47ef2"), diff --git a/eth/backend.go b/eth/backend.go index 639792333..1cd9e8fff 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" @@ -53,20 +54,24 @@ type LesServer interface { Start(srvr *p2p.Server) Stop() Protocols() []p2p.Protocol + SetBloomBitsIndexer(bbIndexer *core.ChainIndexer) } // Ethereum implements the Ethereum full node service. type Ethereum struct { + config *Config chainConfig *params.ChainConfig + // Channel for shutting down the service - shutdownChan chan bool // Channel for shutting down the ethereum - stopDbUpgrade func() // stop chain db sequential key upgrade + shutdownChan chan bool // Channel for shutting down the ethereum + stopDbUpgrade func() error // stop chain db sequential key upgrade + // Handlers txPool *core.TxPool - txMu sync.Mutex blockchain *core.BlockChain protocolManager *ProtocolManager lesServer LesServer + // DB interfaces chainDb ethdb.Database // Block chain database @@ -74,6 +79,9 @@ type Ethereum struct { engine consensus.Engine accountManager *accounts.Manager + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + ApiBackend *EthApiBackend miner *miner.Miner @@ -88,7 +96,7 @@ type Ethereum struct { func (s *Ethereum) AddLesServer(ls LesServer) { s.lesServer = ls - s.protocolManager.lesServer = ls + ls.SetBloomBitsIndexer(s.bloomIndexer) } // New creates a new Ethereum object (including the @@ -100,12 +108,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } - chainDb, err := CreateDB(ctx, config, "chaindata") if err != nil { return nil, err } - stopDbUpgrade := upgradeSequentialKeys(chainDb) + stopDbUpgrade := upgradeDeduplicateData(chainDb) chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr @@ -113,6 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ + config: config, chainDb: chainDb, chainConfig: chainConfig, eventMux: ctx.EventMux, @@ -123,11 +131,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { networkId: config.NetworkId, gasPrice: config.GasPrice, etherbase: config.Etherbase, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks), } - if err := addMipmapBloomBins(chainDb); err != nil { - return nil, err - } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) if !config.SkipBcVersionCheck { @@ -139,7 +146,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig) + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -149,25 +156,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eth.blockchain.SetHead(compat.RewindTo) core.WriteChainConfig(chainDb, genesisHash, chainConfig) } + eth.bloomIndexer.Start(eth.blockchain) - newPool := core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) - eth.txPool = newPool - - maxPeers := config.MaxPeers - if config.LightServ > 0 { - // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections - // temporary solution until the new peer connectivity API is finished - halfPeers := maxPeers / 2 - maxPeers -= config.LightPeers - if maxPeers < halfPeers { - maxPeers = halfPeers - } + if config.TxPool.Journal != "" { + config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) @@ -201,10 +199,13 @@ func makeExtraData(extra []byte) []byte { // CreateDB creates the chain database. func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) { db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles) + if err != nil { + return nil, err + } if db, ok := db.(*ethdb.LDBDatabase); ok { db.Meter("eth/db/chaindata/") } - return db, err + return db, nil } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service @@ -328,7 +329,7 @@ func (s *Ethereum) StartMining(local bool) error { wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) if wallet == nil || err != nil { log.Error("Etherbase account unavailable locally", "err", err) - return fmt.Errorf("singer missing: %v", err) + return fmt.Errorf("signer missing: %v", err) } clique.Authorize(eb, wallet.SignHash) } @@ -363,17 +364,29 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage func (s *Ethereum) Protocols() []p2p.Protocol { if s.lesServer == nil { return s.protocolManager.SubProtocols - } else { - return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...) } + return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...) } // Start implements node.Service, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start(srvr *p2p.Server) error { + // Start the bloom bits servicing goroutines + s.startBloomHandlers() + + // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) - s.protocolManager.Start() + // Figure out a max peers count based on the server limits + maxPeers := srvr.MaxPeers + if s.config.LightServ > 0 { + maxPeers -= s.config.LightPeers + if maxPeers < srvr.MaxPeers/2 { + maxPeers = srvr.MaxPeers / 2 + } + } + // Start the networking layer and the light server if requested + s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } @@ -386,6 +399,7 @@ func (s *Ethereum) Stop() error { if s.stopDbUpgrade != nil { s.stopDbUpgrade() } + s.bloomIndexer.Close() s.blockchain.Stop() s.protocolManager.Stop() if s.lesServer != nil { diff --git a/eth/backend_test.go b/eth/backend_test.go deleted file mode 100644 index f60e3214c..000000000 --- a/eth/backend_test.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package eth - -import ( - "math/big" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/params" -) - -func TestMipmapUpgrade(t *testing.T) { - db, _ := ethdb.NewMemDatabase() - addr := common.BytesToAddress([]byte("jeff")) - genesis := new(core.Genesis).MustCommit(db) - - chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) { - var receipts types.Receipts - switch i { - case 1: - receipt := types.NewReceipt(nil, new(big.Int)) - receipt.Logs = []*types.Log{{Address: addr}} - gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} - case 2: - receipt := types.NewReceipt(nil, new(big.Int)) - receipt.Logs = []*types.Log{{Address: addr}} - gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} - } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) - } - }) - for i, block := range chain { - core.WriteBlock(db, block) - if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { - t.Fatalf("failed to insert block number: %v", err) - } - if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { - t.Fatalf("failed to insert block number: %v", err) - } - if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil { - t.Fatal("error writing block receipts:", err) - } - } - - err := addMipmapBloomBins(db) - if err != nil { - t.Fatal(err) - } - - bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0]) - if (bloom == types.Bloom{}) { - t.Error("got empty bloom filter") - } - - data, _ := db.Get([]byte("setting-mipmap-version")) - if len(data) == 0 { - t.Error("setting-mipmap-version not written to database") - } -} diff --git a/eth/bind.go b/eth/bind.go index e5abd8617..d09977dbc 100644 --- a/eth/bind.go +++ b/eth/bind.go @@ -43,7 +43,7 @@ type ContractBackend struct { } // NewContractBackend creates a new native contract backend using an existing -// Etheruem object. +// Ethereum object. func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend { return &ContractBackend{ eapi: ethapi.NewPublicEthereumAPI(apiBackend), @@ -54,14 +54,12 @@ func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend { // CodeAt retrieves any code associated with the contract from the local API. func (b *ContractBackend) CodeAt(ctx context.Context, contract common.Address, blockNum *big.Int) ([]byte, error) { - out, err := b.bcapi.GetCode(ctx, contract, toBlockNumber(blockNum)) - return common.FromHex(out), err + return b.bcapi.GetCode(ctx, contract, toBlockNumber(blockNum)) } // CodeAt retrieves any code associated with the contract from the local API. func (b *ContractBackend) PendingCodeAt(ctx context.Context, contract common.Address) ([]byte, error) { - out, err := b.bcapi.GetCode(ctx, contract, rpc.PendingBlockNumber) - return common.FromHex(out), err + return b.bcapi.GetCode(ctx, contract, rpc.PendingBlockNumber) } // ContractCall implements bind.ContractCaller executing an Ethereum contract diff --git a/eth/bloombits.go b/eth/bloombits.go new file mode 100644 index 000000000..c5597391c --- /dev/null +++ b/eth/bloombits.go @@ -0,0 +1,143 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + bloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + bloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + bloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + bloomRetrievalWait = time.Duration(0) +) + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (eth *Ethereum) startBloomHandlers() { + for i := 0; i < bloomServiceThreads; i++ { + go func() { + for { + select { + case <-eth.shutdownChan: + return + + case request := <-eth.bloomRequests: + task := <-request + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1) + if compVector, err := core.GetBloomBits(eth.chainDb, task.Bit, section, head); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(params.BloomBitsBlocks)/8); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } + request <- task + } + } + }() + } +} + +const ( + // bloomConfirms is the number of confirmation blocks before a bloom section is + // considered probably final and its rotated bits are calculated. + bloomConfirms = 256 + + // bloomThrottling is the time to wait between processing two consecutive index + // sections. It's useful during chain upgrades to prevent disk overload. + bloomThrottling = 100 * time.Millisecond +) + +// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index +// for the Ethereum header bloom filters, permitting blazing fast filtering. +type BloomIndexer struct { + size uint64 // section size to generate bloombits for + + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed +} + +// NewBloomIndexer returns a chain indexer that generates bloom bits data for the +// canonical chain for fast logs filtering. +func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { + backend := &BloomIndexer{ + db: db, + size: size, + } + table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix)) + + return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") +} + +// Reset implements core.ChainIndexerBackend, starting a new bloombits index +// section. +func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error { + gen, err := bloombits.NewGenerator(uint(b.size)) + b.gen, b.section, b.head = gen, section, common.Hash{} + return err +} + +// Process implements core.ChainIndexerBackend, adding a new header's bloom into +// the index. +func (b *BloomIndexer) Process(header *types.Header) { + b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) + b.head = header.Hash() +} + +// Commit implements core.ChainIndexerBackend, finalizing the bloom section and +// writing it out into the database. +func (b *BloomIndexer) Commit() error { + batch := b.db.NewBatch() + + for i := 0; i < types.BloomBitLength; i++ { + bits, err := b.gen.Bitset(uint(i)) + if err != nil { + return err + } + core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) + } + return batch.Write() +} diff --git a/eth/config.go b/eth/config.go index 4109cff8b..7bcfd403e 100644 --- a/eth/config.go +++ b/eth/config.go @@ -79,7 +79,6 @@ type Config struct { // Light client options LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests LightPeers int `toml:",omitempty"` // Maximum number of LES client peers - MaxPeers int `toml:"-"` // Maximum number of global peers // Database options SkipBcVersionCheck bool `toml:"-"` diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go index 82cdd7e55..d41afa17c 100644 --- a/eth/db_upgrade.go +++ b/eth/db_upgrade.go @@ -19,278 +19,117 @@ package eth import ( "bytes" - "encoding/binary" - "fmt" - "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys") +var deduplicateData = []byte("dbUpgrade_20170714deduplicateData") -// upgradeSequentialKeys checks the chain database version and +// upgradeDeduplicateData checks the chain database version and // starts a background process to make upgrades if necessary. // Returns a stop function that blocks until the process has // been safely stopped. -func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { - data, _ := db.Get(useSequentialKeys) +func upgradeDeduplicateData(db ethdb.Database) func() error { + // If the database is already converted or empty, bail out + data, _ := db.Get(deduplicateData) if len(data) > 0 && data[0] == 42 { - return nil // already converted + return nil } - if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 { - db.Put(useSequentialKeys, []byte{42}) - return nil // empty database, nothing to do + db.Put(deduplicateData, []byte{42}) + return nil } - - log.Warn("Upgrading chain database to use sequential keys") - - stopChn := make(chan struct{}) - stoppedChn := make(chan struct{}) + // Start the deduplication upgrade on a new goroutine + log.Warn("Upgrading database to use lookup entries") + stop := make(chan chan error) go func() { - stopFn := func() bool { - select { - case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved - case <-stopChn: - return true - } - return false - } - - err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn) - if err == nil && !stopped { - err, stopped = upgradeSequentialBlocks(db, stopFn) - } - if err == nil && !stopped { - err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) - } - if err == nil && !stopped { - log.Info("Database conversion successful") - db.Put(useSequentialKeys, []byte{42}) - } - if err != nil { - log.Error("Database conversion failed", "err", err) - } - close(stoppedChn) - }() - - return func() { - close(stopChn) - <-stoppedChn - } -} - -// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from -// the database, writes them in new format and deletes the old ones if successful. -func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-num-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) < 20 { - cnt++ - if cnt%100000 == 0 { + // Create an iterator to read the entire database and covert old lookup entires + it := db.(*ethdb.LDBDatabase).NewIterator() + defer func() { + if it != nil { it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting canonical numbers", "count", cnt) } - number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() - newKey := []byte("h12345678n") - binary.BigEndian.PutUint64(newKey[1:9], number) - if err := db.Put(newKey, it.Value()); err != nil { - return err, false + }() + + var ( + converted uint64 + failed error + ) + for failed == nil && it.Next() { + // Skip any entries that don't look like old transaction meta entires (<hash>0x01) + key := it.Key() + if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 { + continue } - if err := db.Delete(keyPtr); err != nil { - return err, false + // Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>) + var meta struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 } - } - - if stopFn() { - return nil, true - } - it.Next() - } - if cnt > 0 { - log.Info("converted canonical numbers", "count", cnt) - } - return nil, false -} - -// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block -// receipts from the database, writes them in new format and deletes the old ones -// if successful. -func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) >= 38 { - cnt++ - if cnt%10000 == 0 { - it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting blocks", "count", cnt) + if err := rlp.DecodeBytes(it.Value(), &meta); err != nil { + continue } - // convert header, body, td and block receipts - var keyPrefix [38]byte - copy(keyPrefix[:], keyPtr[0:38]) - hash := keyPrefix[6:38] - if err := upgradeSequentialBlockData(db, hash); err != nil { - return err, false - } - // delete old db entries belonging to this hash - for bytes.HasPrefix(it.Key(), keyPrefix[:]) { - if err := db.Delete(it.Key()); err != nil { - return err, false + // Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix)) + hash := key[:common.HashLength] + + if hash[0] == byte('l') { + // Potential clash, the "old" `hash` must point to a live transaction. + if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) { + continue } - it.Next() } - if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil { - return err, false + // Convert the old metadata to a new lookup entry, delete duplicate data + if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry + if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data + if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data + if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata + break + } + } + } } - } else { - it.Next() - } - - if stopFn() { - return nil, true - } - } - if cnt > 0 { - log.Info("Converted blocks", "count", cnt) - } - return nil, false -} - -// upgradeSequentialOrphanedReceipts removes any old format block receipts from the -// database that did not have a corresponding block -func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("receipts-block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer it.Release() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - // phase 2 already converted receipts belonging to existing - // blocks, just remove if there's anything left - cnt++ - if err := db.Delete(it.Key()); err != nil { - return err, false - } - - if stopFn() { - return nil, true - } - it.Next() - } - if cnt > 0 { - log.Info("Removed orphaned block receipts", "count", cnt) - } - return nil, false -} + // Bump the conversion counter, and recreate the iterator occasionally to + // avoid too high memory consumption. + converted++ + if converted%100000 == 0 { + it.Release() + it = db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(key) -// upgradeSequentialBlockData upgrades the header, body, td and block receipts -// database entries belonging to a single hash (doesn't delete old data). -func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error { - // get old chain data and block number - headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...)) - if len(headerRLP) == 0 { - return nil - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { - return err - } - number := header.Number.Uint64() - bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...)) - tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...)) - receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...)) - // store new hash -> number association - encNum := make([]byte, 8) - binary.BigEndian.PutUint64(encNum, number) - if err := db.Put(append([]byte("H"), hash...), encNum); err != nil { - return err - } - // store new chain data - if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil { - return err - } - if len(tdRLP) != 0 { - if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil { - return err - } - } - if len(bodyRLP) != 0 { - if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil { - return err - } - } - if len(receiptsRLP) != 0 { - if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil { - return err + log.Info("Deduplicating database entries", "deduped", converted) + } + // Check for termination, or continue after a bit of a timeout + select { + case errc := <-stop: + errc <- nil + return + case <-time.After(time.Microsecond * 100): + } } - } - return nil -} - -func addMipmapBloomBins(db ethdb.Database) (err error) { - const mipmapVersion uint = 2 - - // check if the version is set. We ignore data for now since there's - // only one version so we can easily ignore it for now - var data []byte - data, _ = db.Get([]byte("setting-mipmap-version")) - if len(data) > 0 { - var version uint - if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { - return nil + // Upgrade finished, mark a such and terminate + if failed == nil { + log.Info("Database deduplication successful", "deduped", converted) + db.Put(deduplicateData, []byte{42}) + } else { + log.Error("Database deduplication failed", "deduped", converted, "err", failed) } - } + it.Release() + it = nil - defer func() { - if err == nil { - var val []byte - val, err = rlp.EncodeToBytes(mipmapVersion) - if err == nil { - err = db.Put([]byte("setting-mipmap-version"), val) - } - return - } + errc := <-stop + errc <- failed }() - latestHash := core.GetHeadBlockHash(db) - latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash)) - if latestBlock == nil { // clean database - return - } - - tstart := time.Now() - log.Warn("Upgrading db log bloom bins") - for i := uint64(0); i <= latestBlock.NumberU64(); i++ { - hash := core.GetCanonicalHash(db, i) - if (hash == common.Hash{}) { - return fmt.Errorf("chain db corrupted. Could not find block %d.", i) - } - core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i)) + // Assembly the cancellation callback + return func() error { + errc := make(chan error) + stop <- errc + return <-errc } - log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart))) - return nil } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03..cca4fe7a9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,26 +109,16 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + lightchain LightChain + blockchain BlockChain + // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - rollback chainRollbackFn // Removes a batch of recently added chain links - dropPeer peerDropFn // Drops a peer for misbehaving + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -136,16 +126,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -161,45 +153,83 @@ type Downloader struct { chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } +// LightChain encapsulates functions required to synchronise a light chain. +type LightChain interface { + // HasHeader verifies a header's presence in the local chain. + HasHeader(h common.Hash, number uint64) bool + + // GetHeaderByHash retrieves a header from the local chain. + GetHeaderByHash(common.Hash) *types.Header + + // CurrentHeader retrieves the head header from the local chain. + CurrentHeader() *types.Header + + // GetTdByHash returns the total difficulty of a local block. + GetTdByHash(common.Hash) *big.Int + + // InsertHeaderChain inserts a batch of headers into the local chain. + InsertHeaderChain([]*types.Header, int) (int, error) + + // Rollback removes a few recently added elements from the local chain. + Rollback([]common.Hash) +} + +// BlockChain encapsulates functions required to sync a (full or fast) blockchain. +type BlockChain interface { + LightChain + + // HasBlockAndState verifies block and associated states' presence in the local chain. + HasBlockAndState(common.Hash) bool + + // GetBlockByHash retrieves a block from the local chain. + GetBlockByHash(common.Hash) *types.Block + + // CurrentBlock retrieves the head block from the local chain. + CurrentBlock() *types.Block + + // CurrentFastBlock retrieves the head fast block from the local chain. + CurrentFastBlock() *types.Block + + // FastSyncCommitHead directly commits the head block to a certain entity. + FastSyncCommitHead(common.Hash) error + + // InsertChain inserts a batch of blocks into the local chain. + InsertChain(types.Blocks) (int, error) + + // InsertReceiptChain inserts a batch of receipts into the local chain. + InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) +} + // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn, - getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, - headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, - insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { + if lightchain == nil { + lightchain = chain + } dl := &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(stateDb), - peers: newPeerSet(), - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), - hasHeader: hasHeader, - hasBlockAndState: hasBlockAndState, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - commitHeadBlock: commitHeadBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - rollback: rollback, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - headerCh: make(chan dataPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), - quitCh: make(chan struct{}), + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +241,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -221,18 +248,18 @@ func (d *Downloader) Progress() ethereum.SyncProgress { current := uint64(0) switch d.mode { case FullSync: - current = d.headBlock().NumberU64() + current = d.blockchain.CurrentBlock().NumberU64() case FastSync: - current = d.headFastBlock().NumberU64() + current = d.blockchain.CurrentFastBlock().NumberU64() case LightSync: - current = d.headHeader().Number.Uint64() + current = d.lightchain.CurrentHeader().Number.Uint64() } return ethereum.SyncProgress{ StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -243,13 +270,11 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn, - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { logger := log.New("peer", id) logger.Trace("Registering sync peer") - if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil { + if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { logger.Error("Failed to register sync peer", "err", err) return err } @@ -258,6 +283,11 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea return nil } +// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer. +func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error { + return d.RegisterPeer(id, version, &lightPeerWrapper{peer}) +} + // UnregisterPeer remove a peer from the known list, preventing any action from // the specified peer. An effort is also made to return any pending fetches into // the queue. @@ -324,13 +354,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -369,7 +399,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -439,30 +469,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +515,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -517,12 +552,12 @@ func (d *Downloader) Terminate() { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. -func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { +func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Retrieving remote chain height") // Request the advertised remote head block and wait for the response - head, _ := p.currentHead() - go p.getRelHeaders(head, 1, 0, false) + head, _ := p.peer.Head() + go p.peer.RequestHeadersByHash(head, 1, 0, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -552,7 +587,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -564,15 +598,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { +func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks - floor, ceil := int64(-1), d.headHeader().Number.Uint64() + floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { - ceil = d.headBlock().NumberU64() + ceil = d.blockchain.CurrentBlock().NumberU64() } else if d.mode == FastSync { - ceil = d.headFastBlock().NumberU64() + ceil = d.blockchain.CurrentFastBlock().NumberU64() } if ceil >= MaxForkAncestry { floor = int64(ceil - MaxForkAncestry) @@ -592,7 +626,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { if count > limit { count = limit } - go p.getAbsHeaders(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -632,7 +666,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() // If every header is known, even future ones, the peer straight out lied about its head @@ -649,7 +683,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -675,7 +708,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { ttl := d.requestTTL() timeout := time.After(ttl) - go p.getAbsHeaders(uint64(check), 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -698,11 +731,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) { + if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { end = check break } - header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists + header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer @@ -714,7 +747,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -737,7 +769,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peer, from uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") @@ -757,10 +789,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } // Start pulling the header chain skeleton until all is done @@ -827,7 +859,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -862,12 +894,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( } expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, @@ -891,9 +923,9 @@ func (d *Downloader) fetchBodies(from uint64) error { return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -915,9 +947,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -927,68 +959,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1015,9 +985,9 @@ func (d *Downloader) fetchNodeData() error { // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, - expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), - fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), + fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1123,6 +1093,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv throttled = true break } + // Short circuit if there is no more available task. + if pending() == 0 { + break + } // Reserve a chunk of fetches for a peer. A nil can mean either that // no more headers are available, or that the peer is known not to // have them. @@ -1182,29 +1156,25 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { for i, header := range rollback { hashes[i] = header.Hash() } - lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0 - if d.headFastBlock != nil { - lastFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - lastBlock = d.headBlock().Number() + lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 + if d.mode != LightSync { + lastFastBlock = d.blockchain.CurrentFastBlock().Number() + lastBlock = d.blockchain.CurrentBlock().Number() } - d.rollback(hashes) + d.lightchain.Rollback(hashes) curFastBlock, curBlock := common.Big0, common.Big0 - if d.headFastBlock != nil { - curFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - curBlock = d.headBlock().Number() + if d.mode != LightSync { + curFastBlock = d.blockchain.CurrentFastBlock().Number() + curBlock = d.blockchain.CurrentBlock().Number() } log.Warn("Rolled back headers", "count", len(hashes), - "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number), + "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { - // If we didn't ever fail, lock in te pivot header (must! not! change!) + // If we didn't ever fail, lock in the pivot header (must! not! change!) if atomic.LoadUint32(&d.fsPivotFails) == 0 { for _, header := range rollback { if header.Number.Uint64() == pivot { @@ -1229,7 +1199,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1248,7 +1218,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give if d.mode != LightSync { - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 { return errStallingPeer } } @@ -1260,7 +1230,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // queued for processing when the header download completes. However, as long as the // peer gave us something useful, we're already happy/progressed (above check). if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { + if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 { return errStallingPeer } } @@ -1290,7 +1260,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { - if !d.hasHeader(header.Hash()) { + if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) } } @@ -1299,7 +1269,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } - if n, err := d.insertHeaders(chunk, frequency); err != nil { + if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) @@ -1341,7 +1311,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,71 +1321,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.blockchain.InsertChain(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + } + if err := d.importBlockResults(afterP); err != nil { + return err + } + } +} + +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err } + return d.blockchain.FastSyncCommitHead(b.Hash()) } // DeliverHeaders injects a new batch of block headers received from a remote @@ -1468,7 +1518,7 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i func (d *Downloader) qosTuner() { for { // Retrieve the current median RTT and integrate into the previoust target RTT - rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) + rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) // A new RTT cycle passed, increase our confidence in the estimated RTT diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 267a0def9..58f6e9a62 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -96,9 +96,7 @@ func newTester() *downloadTester { tester.stateDb, _ = ethdb.NewMemDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, - tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, - tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer) + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) return tester } @@ -218,14 +216,14 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { return err } -// hasHeader checks if a header is present in the testers canonical chain. -func (dl *downloadTester) hasHeader(hash common.Hash) bool { - return dl.getHeader(hash) != nil +// HasHeader checks if a header is present in the testers canonical chain. +func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool { + return dl.GetHeaderByHash(hash) != nil } -// hasBlock checks if a block and associated state is present in the testers canonical chain. -func (dl *downloadTester) hasBlock(hash common.Hash) bool { - block := dl.getBlock(hash) +// HasBlockAndState checks if a block and associated state is present in the testers canonical chain. +func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool { + block := dl.GetBlockByHash(hash) if block == nil { return false } @@ -233,24 +231,24 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool { return err == nil } -// getHeader retrieves a header from the testers canonical chain. -func (dl *downloadTester) getHeader(hash common.Hash) *types.Header { +// GetHeader retrieves a header from the testers canonical chain. +func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownHeaders[hash] } -// getBlock retrieves a block from the testers canonical chain. -func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { +// GetBlock retrieves a block from the testers canonical chain. +func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownBlocks[hash] } -// headHeader retrieves the current head header from the canonical chain. -func (dl *downloadTester) headHeader() *types.Header { +// CurrentHeader retrieves the current head header from the canonical chain. +func (dl *downloadTester) CurrentHeader() *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() @@ -262,8 +260,8 @@ func (dl *downloadTester) headHeader() *types.Header { return dl.genesis.Header() } -// headBlock retrieves the current head block from the canonical chain. -func (dl *downloadTester) headBlock() *types.Block { +// CurrentBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) CurrentBlock() *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() @@ -277,8 +275,8 @@ func (dl *downloadTester) headBlock() *types.Block { return dl.genesis } -// headFastBlock retrieves the current head fast-sync block from the canonical chain. -func (dl *downloadTester) headFastBlock() *types.Block { +// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain. +func (dl *downloadTester) CurrentFastBlock() *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() @@ -290,26 +288,26 @@ func (dl *downloadTester) headFastBlock() *types.Block { return dl.genesis } -// commitHeadBlock manually sets the head block to a given hash. -func (dl *downloadTester) commitHeadBlock(hash common.Hash) error { +// FastSyncCommitHead manually sets the head block to a given hash. +func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { // For now only check that the state trie is correct - if block := dl.getBlock(hash); block != nil { + if block := dl.GetBlockByHash(hash); block != nil { _, err := trie.NewSecure(block.Root(), dl.stateDb, 0) return err } return fmt.Errorf("non existent block: %x", hash[:4]) } -// getTd retrieves the block's total difficulty from the canonical chain. -func (dl *downloadTester) getTd(hash common.Hash) *big.Int { +// GetTdByHash retrieves the block's total difficulty from the canonical chain. +func (dl *downloadTester) GetTdByHash(hash common.Hash) *big.Int { dl.lock.RLock() defer dl.lock.RUnlock() return dl.ownChainTd[hash] } -// insertHeaders injects a new batch of headers into the simulated chain. -func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) (int, error) { +// InsertHeaderChain injects a new batch of headers into the simulated chain. +func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -337,8 +335,8 @@ func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) return len(headers), nil } -// insertBlocks injects a new batch of blocks into the simulated chain. -func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { +// InsertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -359,8 +357,8 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { return len(blocks), nil } -// insertReceipts injects a new batch of receipts into the simulated chain. -func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) { +// InsertReceiptChain injects a new batch of receipts into the simulated chain. +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (int, error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -377,8 +375,8 @@ func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.R return len(blocks), nil } -// rollback removes some recently added elements from the chain. -func (dl *downloadTester) rollback(hashes []common.Hash) { +// Rollback removes some recently added elements from the chain. +func (dl *downloadTester) Rollback(hashes []common.Hash) { dl.lock.Lock() defer dl.lock.Unlock() @@ -405,15 +403,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha dl.lock.Lock() defer dl.lock.Unlock() - var err error - switch version { - case 62: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) - case 63: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) - case 64: - err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) - } + var err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl: dl, id: id, delay: delay}) if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -471,139 +461,151 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } -// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash +type downloadTesterPeer struct { + dl *downloadTester + id string + delay time.Duration + lock sync.RWMutex +} + +// setDelay is a thread safe setter for the network delay value. +func (dlp *downloadTesterPeer) setDelay(delay time.Duration) { + dlp.lock.Lock() + defer dlp.lock.Unlock() + + dlp.delay = delay +} + +// waitDelay is a thread safe way to sleep for the configured time. +func (dlp *downloadTesterPeer) waitDelay() { + dlp.lock.RLock() + delay := dlp.delay + dlp.lock.RUnlock() + + time.Sleep(delay) +} + +// Head constructs a function to retrieve a peer's current head hash // and total difficulty. -func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) { - return func() (common.Hash, *big.Int) { - dl.lock.RLock() - defer dl.lock.RUnlock() +func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - return dl.peerHashes[id][0], nil - } + return dlp.dl.peerHashes[dlp.id][0], nil } -// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed +// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. -func (dl *downloadTester) peerGetRelHeadersFn(id string, delay time.Duration) func(common.Hash, int, int, bool) error { - return func(origin common.Hash, amount int, skip int, reverse bool) error { - // Find the canonical number of the hash - dl.lock.RLock() - number := uint64(0) - for num, hash := range dl.peerHashes[id] { - if hash == origin { - number = uint64(len(dl.peerHashes[id]) - num - 1) - break - } +func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { + // Find the canonical number of the hash + dlp.dl.lock.RLock() + number := uint64(0) + for num, hash := range dlp.dl.peerHashes[dlp.id] { + if hash == origin { + number = uint64(len(dlp.dl.peerHashes[dlp.id]) - num - 1) + break } - dl.lock.RUnlock() - - // Use the absolute header fetcher to satisfy the query - return dl.peerGetAbsHeadersFn(id, delay)(number, amount, skip, reverse) } + dlp.dl.lock.RUnlock() + + // Use the absolute header fetcher to satisfy the query + return dlp.RequestHeadersByNumber(number, amount, skip, reverse) } -// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered +// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. -func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error { - return func(origin uint64, amount int, skip int, reverse bool) error { - time.Sleep(delay) - - dl.lock.RLock() - defer dl.lock.RUnlock() - - // Gather the next batch of headers - hashes := dl.peerHashes[id] - headers := dl.peerHeaders[id] - result := make([]*types.Header, 0, amount) - for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { - if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { - result = append(result, header) - } +func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + dlp.waitDelay() + + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() + + // Gather the next batch of headers + hashes := dlp.dl.peerHashes[dlp.id] + headers := dlp.dl.peerHeaders[dlp.id] + result := make([]*types.Header, 0, amount) + for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { + if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { + result = append(result, header) } - // Delay delivery a bit to allow attacks to unfold - go func() { - time.Sleep(time.Millisecond) - dl.downloader.DeliverHeaders(id, result) - }() - return nil } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dlp.dl.downloader.DeliverHeaders(dlp.id, result) + }() + return nil } -// peerGetBodiesFn constructs a getBlockBodies method associated with a particular +// RequestBodies constructs a getBlockBodies method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of block bodies from the particularly requested peer. -func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) +func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error { + dlp.waitDelay() - dl.lock.RLock() - defer dl.lock.RUnlock() + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - blocks := dl.peerBlocks[id] + blocks := dlp.dl.peerBlocks[dlp.id] - transactions := make([][]*types.Transaction, 0, len(hashes)) - uncles := make([][]*types.Header, 0, len(hashes)) + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) - for _, hash := range hashes { - if block, ok := blocks[hash]; ok { - transactions = append(transactions, block.Transactions()) - uncles = append(uncles, block.Uncles()) - } + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) } - go dl.downloader.DeliverBodies(id, transactions, uncles) - - return nil } + go dlp.dl.downloader.DeliverBodies(dlp.id, transactions, uncles) + + return nil } -// peerGetReceiptsFn constructs a getReceipts method associated with a particular +// RequestReceipts constructs a getReceipts method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of block receipts from the particularly requested peer. -func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) +func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error { + dlp.waitDelay() - dl.lock.RLock() - defer dl.lock.RUnlock() + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() - receipts := dl.peerReceipts[id] + receipts := dlp.dl.peerReceipts[dlp.id] - results := make([][]*types.Receipt, 0, len(hashes)) - for _, hash := range hashes { - if receipt, ok := receipts[hash]; ok { - results = append(results, receipt) - } + results := make([][]*types.Receipt, 0, len(hashes)) + for _, hash := range hashes { + if receipt, ok := receipts[hash]; ok { + results = append(results, receipt) } - go dl.downloader.DeliverReceipts(id, results) - - return nil } + go dlp.dl.downloader.DeliverReceipts(dlp.id, results) + + return nil } -// peerGetNodeDataFn constructs a getNodeData method associated with a particular +// RequestNodeData constructs a getNodeData method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of node state data from the particularly requested peer. -func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error { - return func(hashes []common.Hash) error { - time.Sleep(delay) - - dl.lock.RLock() - defer dl.lock.RUnlock() - - results := make([][]byte, 0, len(hashes)) - for _, hash := range hashes { - if data, err := dl.peerDb.Get(hash.Bytes()); err == nil { - if !dl.peerMissingStates[id][hash] { - results = append(results, data) - } +func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { + dlp.waitDelay() + + dlp.dl.lock.RLock() + defer dlp.dl.lock.RUnlock() + + results := make([][]byte, 0, len(hashes)) + for _, hash := range hashes { + if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil { + if !dlp.dl.peerMissingStates[dlp.id][hash] { + results = append(results, data) } } - go dl.downloader.DeliverNodeData(id, results) - - return nil } + go dlp.dl.downloader.DeliverNodeData(dlp.id, results) + + return nil } // assertOwnChain checks if the local chain contains the correct number of items @@ -657,7 +659,7 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot) } if index > 0 { - if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil || err != nil { + if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(tester.stateDb)); statedb == nil || err != nil { t.Fatalf("state reconstruction failed: %v", err) } } @@ -1212,7 +1214,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("fast-attack", nil, mode); err == nil { t.Fatalf("succeeded fast attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch) } // Attempt to sync with an attacker that feeds junk during the block import phase. @@ -1226,11 +1228,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("block-attack", nil, mode); err == nil { t.Fatalf("succeeded block attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) } if mode == FastSync { - if head := tester.headBlock().NumberU64(); head != 0 { + if head := tester.CurrentBlock().NumberU64(); head != 0 { t.Errorf("fast sync pivot block #%d not rolled back", head) } } @@ -1251,11 +1253,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("withhold-attack", nil, mode); err == nil { t.Fatalf("succeeded withholding attacker synchronisation") } - if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { + if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch) } if mode == FastSync { - if head := tester.headBlock().NumberU64(); head != 0 { + if head := tester.CurrentBlock().NumberU64(); head != 0 { t.Errorf("fast sync pivot block #%d not rolled back", head) } } @@ -1396,7 +1398,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("peer-half", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1413,7 +1415,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("peer-full", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1469,7 +1471,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("fork A", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1489,7 +1491,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("fork B", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1550,7 +1552,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("faulty", nil, mode); err == nil { - t.Fatalf("succeeded faulty synchronisation") + panic("succeeded faulty synchronisation") } }() <-starting @@ -1567,7 +1569,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1628,7 +1630,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("attack", nil, mode); err == nil { - t.Fatalf("succeeded attacker synchronisation") + panic("succeeded attacker synchronisation") } }() <-starting @@ -1645,7 +1647,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1670,6 +1672,48 @@ func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } +type floodingTestPeer struct { + peer Peer + tester *downloadTester +} + +func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } +func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error { + return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse) +} +func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error { + return ftp.peer.RequestBodies(hashes) +} +func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error { + return ftp.peer.RequestReceipts(hashes) +} +func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error { + return ftp.peer.RequestNodeData(hashes) +} + +func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + go ftp.peer.RequestHeadersByNumber(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(15 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil +} + func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { t.Parallel() @@ -1677,7 +1721,6 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { defer master.terminate() hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false) - fakeHeads := []*types.Header{{}, {}, {}, {}} for i := 0; i < 200; i++ { tester := newTester() tester.peerDb = master.peerDb @@ -1685,29 +1728,11 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Whenever the downloader requests headers, flood it with // a lot of unrequested header deliveries. - tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { - deliveriesDone := make(chan struct{}, 500) - for i := 0; i < cap(deliveriesDone); i++ { - peer := fmt.Sprintf("fake-peer%d", i) - go func() { - tester.downloader.DeliverHeaders(peer, fakeHeads) - deliveriesDone <- struct{}{} - }() - } - // Deliver the actual requested headers. - impl := tester.peerGetAbsHeadersFn("peer", 0) - go impl(from, count, skip, reverse) - // None of the extra deliveries should block. - timeout := time.After(15 * time.Second) - for i := 0; i < cap(deliveriesDone); i++ { - select { - case <-deliveriesDone: - case <-timeout: - panic("blocked") - } - } - return nil + tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ + tester.downloader.peers.peers["peer"].peer, + tester, } + if err := tester.sync("peer", nil, mode); err != nil { t.Errorf("sync failed: %v", err) } @@ -1739,7 +1764,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { for i := 0; i < fsPivotInterval; i++ { tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true } - tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section + (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(500 * time.Millisecond) // Enough to reach the critical section // Synchronise with the peer a few times and make sure they fail until the retry limit for i := 0; i < int(fsCriticalTrials)-1; i++ { @@ -1758,7 +1783,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { tester.lock.Lock() tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} - tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0) + (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(0) tester.lock.Unlock() } } diff --git a/eth/downloader/fakepeer.go b/eth/downloader/fakepeer.go new file mode 100644 index 000000000..b45acff7d --- /dev/null +++ b/eth/downloader/fakepeer.go @@ -0,0 +1,160 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" +) + +// FakePeer is a mock downloader peer that operates on a local database instance +// instead of being an actual live node. It's useful for testing and to implement +// sync commands from an xisting local database. +type FakePeer struct { + id string + db ethdb.Database + hc *core.HeaderChain + dl *Downloader +} + +// NewFakePeer creates a new mock downloader peer with the given data sources. +func NewFakePeer(id string, db ethdb.Database, hc *core.HeaderChain, dl *Downloader) *FakePeer { + return &FakePeer{id: id, db: db, hc: hc, dl: dl} +} + +// Head implements downloader.Peer, returning the current head hash and number +// of the best known header. +func (p *FakePeer) Head() (common.Hash, *big.Int) { + header := p.hc.CurrentHeader() + return header.Hash(), header.Number +} + +// RequestHeadersByHash implements downloader.Peer, returning a batch of headers +// defined by the origin hash and the associaed query parameters. +func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, reverse bool) error { + var ( + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < amount { + origin := p.hc.GetHeaderByHash(hash) + if origin == nil { + break + } + number := origin.Number.Uint64() + headers = append(headers, origin) + if reverse { + for i := 0; i <= skip; i++ { + if header := p.hc.GetHeader(hash, number); header != nil { + hash = header.ParentHash + number-- + } else { + unknown = true + break + } + } + } else { + var ( + current = origin.Number.Uint64() + next = current + uint64(skip) + 1 + ) + if header := p.hc.GetHeaderByNumber(next); header != nil { + if p.hc.GetBlockHashesFromHash(header.Hash(), uint64(skip+1))[skip] == hash { + hash = header.Hash() + } else { + unknown = true + } + } else { + unknown = true + } + } + } + p.dl.DeliverHeaders(p.id, headers) + return nil +} + +// RequestHeadersByNumber implements downloader.Peer, returning a batch of headers +// defined by the origin number and the associaed query parameters. +func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, reverse bool) error { + var ( + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < amount { + origin := p.hc.GetHeaderByNumber(number) + if origin == nil { + break + } + if reverse { + if number >= uint64(skip+1) { + number -= uint64(skip + 1) + } else { + unknown = true + } + } else { + number += uint64(skip + 1) + } + headers = append(headers, origin) + } + p.dl.DeliverHeaders(p.id, headers) + return nil +} + +// RequestBodies implements downloader.Peer, returning a batch of block bodies +// corresponding to the specified block hashes. +func (p *FakePeer) RequestBodies(hashes []common.Hash) error { + var ( + txs [][]*types.Transaction + uncles [][]*types.Header + ) + for _, hash := range hashes { + block := core.GetBlock(p.db, hash, p.hc.GetBlockNumber(hash)) + + txs = append(txs, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + p.dl.DeliverBodies(p.id, txs, uncles) + return nil +} + +// RequestReceipts implements downloader.Peer, returning a batch of transaction +// receipts corresponding to the specified block hashes. +func (p *FakePeer) RequestReceipts(hashes []common.Hash) error { + var receipts [][]*types.Receipt + for _, hash := range hashes { + receipts = append(receipts, core.GetBlockReceipts(p.db, hash, p.hc.GetBlockNumber(hash))) + } + p.dl.DeliverReceipts(p.id, receipts) + return nil +} + +// RequestNodeData implements downloader.Peer, returning a batch of state trie +// nodes corresponding to the specified trie hashes. +func (p *FakePeer) RequestNodeData(hashes []common.Hash) error { + var data [][]byte + for _, hash := range hashes { + if entry, err := p.db.Get(hash.Bytes()); err == nil { + data = append(data, entry) + } + } + p.dl.DeliverNodeData(p.id, data) + return nil +} diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 0d76c7dfd..58764ccf0 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -38,8 +38,6 @@ var ( receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") - stateInMeter = metrics.NewMeter("eth/downloader/states/in") - stateReqTimer = metrics.NewTimer("eth/downloader/states/req") - stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") - stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 15a912f1f..e638744ea 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -38,24 +39,14 @@ const ( measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) -// Head hash and total difficulty retriever for -type currentHeadRetrievalFn func() (common.Hash, *big.Int) - -// Block header and body fetchers belonging to eth/62 and above -type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error -type absoluteHeaderFetcherFn func(uint64, int, int, bool) error -type blockBodyFetcherFn func([]common.Hash) error -type receiptFetcherFn func([]common.Hash) error -type stateFetcherFn func([]common.Hash) error - var ( errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") ) -// peer represents an active peer from which hashes and blocks are retrieved. -type peer struct { +// peerConnection represents an active peer from which hashes and blocks are retrieved. +type peerConnection struct { id string // Unique identifier of the peer headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) @@ -77,37 +68,57 @@ type peer struct { lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) - currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer - - getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash - getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position - getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies - - getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts - getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data + peer Peer version int // Eth protocol version number to switch strategies log log.Logger // Contextual logger to add extra infos to peer logs lock sync.RWMutex } -// newPeer create a new downloader peer, with specific hash and block retrieval -// mechanisms. -func newPeer(id string, version int, currentHead currentHeadRetrievalFn, - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn, getNodeData stateFetcherFn, logger log.Logger) *peer { +// LightPeer encapsulates the methods required to synchronise with a remote light peer. +type LightPeer interface { + Head() (common.Hash, *big.Int) + RequestHeadersByHash(common.Hash, int, int, bool) error + RequestHeadersByNumber(uint64, int, int, bool) error +} + +// Peer encapsulates the methods required to synchronise with a remote full peer. +type Peer interface { + LightPeer + RequestBodies([]common.Hash) error + RequestReceipts([]common.Hash) error + RequestNodeData([]common.Hash) error +} - return &peer{ +// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods. +type lightPeerWrapper struct { + peer LightPeer +} + +func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } +func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error { + return w.peer.RequestHeadersByHash(h, amount, skip, reverse) +} +func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error { + return w.peer.RequestHeadersByNumber(i, amount, skip, reverse) +} +func (w *lightPeerWrapper) RequestBodies([]common.Hash) error { + panic("RequestBodies not supported in light client mode sync") +} +func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error { + panic("RequestReceipts not supported in light client mode sync") +} +func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error { + panic("RequestNodeData not supported in light client mode sync") +} + +// newPeerConnection creates a new downloader peer. +func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection { + return &peerConnection{ id: id, lacking: make(map[common.Hash]struct{}), - currentHead: currentHead, - getRelHeaders: getRelHeaders, - getAbsHeaders: getAbsHeaders, - getBlockBodies: getBlockBodies, - - getReceipts: getReceipts, - getNodeData: getNodeData, + peer: peer, version: version, log: logger, @@ -115,7 +126,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn, } // Reset clears the internal state of a peer entity. -func (p *peer) Reset() { +func (p *peerConnection) Reset() { p.lock.Lock() defer p.lock.Unlock() @@ -133,7 +144,7 @@ func (p *peer) Reset() { } // FetchHeaders sends a header retrieval request to the remote peer. -func (p *peer) FetchHeaders(from uint64, count int) error { +func (p *peerConnection) FetchHeaders(from uint64, count int) error { // Sanity check the protocol version if p.version < 62 { panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) @@ -145,13 +156,13 @@ func (p *peer) FetchHeaders(from uint64, count int) error { p.headerStarted = time.Now() // Issue the header retrieval request (absolut upwards without gaps) - go p.getAbsHeaders(from, count, 0, false) + go p.peer.RequestHeadersByNumber(from, count, 0, false) return nil } // FetchBodies sends a block body retrieval request to the remote peer. -func (p *peer) FetchBodies(request *fetchRequest) error { +func (p *peerConnection) FetchBodies(request *fetchRequest) error { // Sanity check the protocol version if p.version < 62 { panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version)) @@ -167,13 +178,13 @@ func (p *peer) FetchBodies(request *fetchRequest) error { for _, header := range request.Headers { hashes = append(hashes, header.Hash()) } - go p.getBlockBodies(hashes) + go p.peer.RequestBodies(hashes) return nil } // FetchReceipts sends a receipt retrieval request to the remote peer. -func (p *peer) FetchReceipts(request *fetchRequest) error { +func (p *peerConnection) FetchReceipts(request *fetchRequest) error { // Sanity check the protocol version if p.version < 63 { panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version)) @@ -189,13 +200,13 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { for _, header := range request.Headers { hashes = append(hashes, header.Hash()) } - go p.getReceipts(hashes) + go p.peer.RequestReceipts(hashes) return nil } // FetchNodeData sends a node state data retrieval request to the remote peer. -func (p *peer) FetchNodeData(request *fetchRequest) error { +func (p *peerConnection) FetchNodeData(hashes []common.Hash) error { // Sanity check the protocol version if p.version < 63 { panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) @@ -206,12 +217,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { } p.stateStarted = time.Now() - // Convert the hash set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash := range request.Hashes { - hashes = append(hashes, hash) - } - go p.getNodeData(hashes) + go p.peer.RequestNodeData(hashes) return nil } @@ -219,41 +225,41 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { // SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval // requests. Its estimated header retrieval throughput is updated with that measured // just now. -func (p *peer) SetHeadersIdle(delivered int) { +func (p *peerConnection) SetHeadersIdle(delivered int) { p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) } // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval // requests. Its estimated block retrieval throughput is updated with that measured // just now. -func (p *peer) SetBlocksIdle(delivered int) { +func (p *peerConnection) SetBlocksIdle(delivered int) { p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval // requests. Its estimated body retrieval throughput is updated with that measured // just now. -func (p *peer) SetBodiesIdle(delivered int) { +func (p *peerConnection) SetBodiesIdle(delivered int) { p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt // retrieval requests. Its estimated receipt retrieval throughput is updated // with that measured just now. -func (p *peer) SetReceiptsIdle(delivered int) { +func (p *peerConnection) SetReceiptsIdle(delivered int) { p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) } // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie // data retrieval requests. Its estimated state retrieval throughput is updated // with that measured just now. -func (p *peer) SetNodeDataIdle(delivered int) { +func (p *peerConnection) SetNodeDataIdle(delivered int) { p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) } // setIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its estimated retrieval throughput is updated with that measured just now. -func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { +func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { // Irrelevant of the scaling, make sure the peer ends up idle defer atomic.StoreInt32(idle, 0) @@ -280,7 +286,7 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id // HeaderCapacity retrieves the peers header download allowance based on its // previously discovered throughput. -func (p *peer) HeaderCapacity(targetRTT time.Duration) int { +func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() @@ -289,7 +295,7 @@ func (p *peer) HeaderCapacity(targetRTT time.Duration) int { // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. -func (p *peer) BlockCapacity(targetRTT time.Duration) int { +func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() @@ -298,7 +304,7 @@ func (p *peer) BlockCapacity(targetRTT time.Duration) int { // ReceiptCapacity retrieves the peers receipt download allowance based on its // previously discovered throughput. -func (p *peer) ReceiptCapacity(targetRTT time.Duration) int { +func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() @@ -307,7 +313,7 @@ func (p *peer) ReceiptCapacity(targetRTT time.Duration) int { // NodeDataCapacity retrieves the peers state download allowance based on its // previously discovered throughput. -func (p *peer) NodeDataCapacity(targetRTT time.Duration) int { +func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() @@ -317,7 +323,7 @@ func (p *peer) NodeDataCapacity(targetRTT time.Duration) int { // MarkLacking appends a new entity to the set of items (blocks, receipts, states) // that a peer is known not to have (i.e. have been requested before). If the // set reaches its maximum allowed capacity, items are randomly dropped off. -func (p *peer) MarkLacking(hash common.Hash) { +func (p *peerConnection) MarkLacking(hash common.Hash) { p.lock.Lock() defer p.lock.Unlock() @@ -332,7 +338,7 @@ func (p *peer) MarkLacking(hash common.Hash) { // Lacks retrieves whether the hash of a blockchain item is on the peers lacking // list (i.e. whether we know that the peer does not have it). -func (p *peer) Lacks(hash common.Hash) bool { +func (p *peerConnection) Lacks(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() @@ -343,17 +349,29 @@ func (p *peer) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peerConnection + newPeerFeed event.Feed + peerDropFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. func newPeerSet() *peerSet { return &peerSet{ - peers: make(map[string]*peer), + peers: make(map[string]*peerConnection), } } +// SubscribeNewPeers subscribes to peer arrival events. +func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription { + return ps.newPeerFeed.Subscribe(ch) +} + +// SubscribePeerDrops subscribes to peer departure events. +func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription { + return ps.peerDropFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -371,15 +389,14 @@ func (ps *peerSet) Reset() { // The method also sets the starting throughput values of the new peer to the // average of all existing peers, to give it a realistic chance of being used // for data retrievals. -func (ps *peerSet) Register(p *peer) error { +func (ps *peerSet) Register(p *peerConnection) error { // Retrieve the current median RTT as a sane default p.rtt = ps.medianRTT() // Register the new peer with some meaningful defaults ps.lock.Lock() - defer ps.lock.Unlock() - if _, ok := ps.peers[p.id]; ok { + ps.lock.Unlock() return errAlreadyRegistered } if len(ps.peers) > 0 { @@ -399,6 +416,9 @@ func (ps *peerSet) Register(p *peer) error { p.stateThroughput /= float64(len(ps.peers)) } ps.peers[p.id] = p + ps.lock.Unlock() + + ps.newPeerFeed.Send(p) return nil } @@ -406,17 +426,20 @@ func (ps *peerSet) Register(p *peer) error { // actions to/from that particular entity. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() - defer ps.lock.Unlock() - - if _, ok := ps.peers[id]; !ok { + p, ok := ps.peers[id] + if !ok { + defer ps.lock.Unlock() return errNotRegistered } delete(ps.peers, id) + ps.lock.Unlock() + + ps.peerDropFeed.Send(p) return nil } // Peer retrieves the registered peer with the given id. -func (ps *peerSet) Peer(id string) *peer { +func (ps *peerSet) Peer(id string) *peerConnection { ps.lock.RLock() defer ps.lock.RUnlock() @@ -432,11 +455,11 @@ func (ps *peerSet) Len() int { } // AllPeers retrieves a flat list of all the peers within the set. -func (ps *peerSet) AllPeers() []*peer { +func (ps *peerSet) AllPeers() []*peerConnection { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) + list := make([]*peerConnection, 0, len(ps.peers)) for _, p := range ps.peers { list = append(list, p) } @@ -445,11 +468,11 @@ func (ps *peerSet) AllPeers() []*peer { // HeaderIdlePeers retrieves a flat list of all the currently header-idle peers // within the active peer set, ordered by their reputation. -func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { - idle := func(p *peer) bool { +func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.headerIdle) == 0 } - throughput := func(p *peer) float64 { + throughput := func(p *peerConnection) float64 { p.lock.RLock() defer p.lock.RUnlock() return p.headerThroughput @@ -459,11 +482,11 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // the active peer set, ordered by their reputation. -func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { - idle := func(p *peer) bool { +func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - throughput := func(p *peer) float64 { + throughput := func(p *peerConnection) float64 { p.lock.RLock() defer p.lock.RUnlock() return p.blockThroughput @@ -473,11 +496,11 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers // within the active peer set, ordered by their reputation. -func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { - idle := func(p *peer) bool { +func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.receiptIdle) == 0 } - throughput := func(p *peer) float64 { + throughput := func(p *peerConnection) float64 { p.lock.RLock() defer p.lock.RUnlock() return p.receiptThroughput @@ -487,11 +510,11 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle // peers within the active peer set, ordered by their reputation. -func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { - idle := func(p *peer) bool { +func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { + idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.stateIdle) == 0 } - throughput := func(p *peer) float64 { + throughput := func(p *peerConnection) float64 { p.lock.RLock() defer p.lock.RUnlock() return p.stateThroughput @@ -502,11 +525,11 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { // idlePeers retrieves a flat list of all currently idle peers satisfying the // protocol version constraints, using the provided function to check idleness. // The resulting set of peers are sorted by their measure throughput. -func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) { +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) { ps.lock.RLock() defer ps.lock.RUnlock() - idle, total := make([]*peer, 0, len(ps.peers)), 0 + idle, total := make([]*peerConnection, 0, len(ps.peers)), 0 for _, p := range ps.peers { if p.version >= minProtocol && p.version <= maxProtocol { if idleCheck(p) { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 855097c45..6926f1d8c 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,20 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var ( - blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently -) +var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download var ( errNoFetchesPending = errors.New("no fetches pending") @@ -48,7 +41,7 @@ var ( // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { - Peer *peer // Peer to which the request was sent + Peer *peerConnection // Peer to which the request was sent From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order @@ -94,15 +87,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order - stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority - stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for - statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateWriters int // [eth/63] Number of running state DB writer goroutines - resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -112,7 +96,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(stateDb ethdb.Database) *queue { +func newQueue() *queue { lock := new(sync.Mutex) return &queue{ headerPendPool: make(map[string]*fetchRequest), @@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - stateTaskPool: make(map[common.Hash]int), - stateTaskQueue: prque.New(), - statePendPool: make(map[string]*fetchRequest), - stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), active: sync.NewCond(lock), lock: lock, @@ -158,12 +138,6 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - q.statePendPool = make(map[string]*fetchRequest) - q.stateScheduler = nil - q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 } @@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingNodeData retrieves the number of node data entries pending for retrieval. -func (q *queue) PendingNodeData() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.pendingNodeDataLocked() -} - -// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. -// The caller must hold q.lock. -func (q *queue) pendingNodeDataLocked() int { - var n int - if q.stateScheduler != nil { - n = q.stateScheduler.Pending() - } - // Ensure that PendingNodeData doesn't return 0 until all state is written. - if q.stateWriters > 0 { - n++ - } - return n -} - // InFlightHeaders retrieves whether there are header fetch requests currently // in flight. func (q *queue) InFlightHeaders() bool { @@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightNodeData retrieves whether there are node data entry fetch requests -// currently in flight. -func (q *queue) InFlightNodeData() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.statePendPool)+q.stateWriters > 0 -} - -// Idle returns if the queue is fully idle or has some data still inside. This -// method is used by the tester to detect termination events. +// Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - if q.stateScheduler != nil { - queued += q.stateScheduler.Pending() - } return (queued + pending + cached) == 0 } @@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } - if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash) - - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - for _, req := range q.statePendPool { - req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear - } - - q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - } inserts = append(inserts, header) q.headerHead = hash from++ @@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int { if result == nil || result.Pending > 0 { return i } - // Special handling for the fast-sync pivot block: - if q.mode == FastSync { - bnum := result.Header.Number.Uint64() - if bnum == q.fastSyncPivot { - // If the state of the pivot block is not - // available yet, we cannot proceed and return 0. - // - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { return i } - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } - // If we're just the fast sync pivot, stop as well - // because the following batch needs different insertion. - // This simplifies handling the switchover in d.process. - if bnum == q.fastSyncPivot+1 && i > 0 { - return i } } } @@ -481,7 +391,7 @@ func (q *queue) countProcessableItems() int { // ReserveHeaders reserves a set of headers for the given peer, skipping any // previously failed batches. -func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { +func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -519,85 +429,10 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { return request } -// ReserveNodeData reserves a set of node data hashes for the given peer, skipping -// any previously failed download. -func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - // Create a task generator to fetch status-fetch tasks if all schedules ones are done - generator := func(max int) { - if q.stateScheduler != nil { - for _, hash := range q.stateScheduler.Missing(max) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } - } - } - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates) -} - -// reserveHashes reserves a set of hashes for the given peer, skipping previously -// failed ones. -// -// Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need -// to access the queue, so they already need a lock anyway. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := pendPool[p.id]; ok { - return nil - } - // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - allowance := maxPending - if allowance > 0 { - for _, request := range pendPool { - allowance -= len(request.Hashes) - } - } - // If there's a task generator, ask it to fill our task queue - if taskGen != nil && taskQueue.Size() < allowance { - taskGen(allowance - taskQueue.Size()) - } - if taskQueue.Empty() { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send := make(map[common.Hash]int) - skip := make(map[common.Hash]int) - - for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { - hash, priority := taskQueue.Pop() - if p.Lacks(hash.(common.Hash)) { - skip[hash.(common.Hash)] = int(priority) - } else { - send[hash.(common.Hash)] = int(priority) - } - } - // Merge all the skipped hashes back - for hash, index := range skip { - taskQueue.Push(hash, float32(index)) - } - // Assemble and return the block download request - if len(send) == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - Hashes: send, - Time: time.Now(), - } - pendPool[p.id] = request - - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. -func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { +func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) { isNoop := func(header *types.Header) bool { return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash } @@ -610,7 +445,7 @@ func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping // any previously failed downloads. Beside the next batch of needed fetches, it // also returns a flag whether empty receipts were queued requiring importing. -func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) { +func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) { isNoop := func(header *types.Header) bool { return header.ReceiptHash == types.EmptyRootHash } @@ -627,7 +462,7 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) @@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// CancelNodeData aborts a node state data fetch request, returning all pending -// hashes to the task queue. -func (q *queue) CancelNodeData(request *fetchRequest) { - q.cancel(request, q.stateTaskQueue, q.statePendPool) -} - // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() @@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } - if request, ok := q.statePendPool[peerId]; ok { - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - delete(q.statePendPool, peerId) - } } // ExpireHeaders checks for in flight requests that exceeded a timeout allowance, @@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } -// ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) -} - // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // @@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } -// DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the data was never requested - request := q.statePendPool[id] - if request == nil { - return 0, errNoFetchesPending - } - stateReqTimer.UpdateSince(request.Time) - delete(q.statePendPool, id) - - // If no data was retrieved, mark their hashes as unavailable for the origin peer - if len(data) == 0 { - for hash := range request.Hashes { - request.Peer.MarkLacking(hash) - } - } - // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) - process := []trie.SyncResult{} - for _, blob := range data { - // Skip any state trie entries that were not requested - hash := common.BytesToHash(crypto.Keccak256(blob)) - if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) - continue - } - // Inject the next state trie item into the processing queue - process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - delete(request.Hashes, hash) - delete(q.stateTaskPool, hash) - } - // Return all failed or missing fetches to the queue - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - if q.stateScheduler == nil { - return 0, errNoFetchesPending - } - - // Run valid nodes through the trie download scheduler. It writes completed nodes to a - // batch, which is committed asynchronously. This may lead to over-fetches because the - // scheduler treats everything as written after Process has returned, but it's - // unlikely to be an issue in practice. - batch := q.stateDatabase.NewBatch() - progressed, nproc, procerr := q.stateScheduler.Process(process, batch) - q.stateWriters += 1 - go func() { - if procerr == nil { - nproc = len(process) - procerr = batch.Write() - } - // Return processing errors through the callback so the sync gets canceled. The - // number of writers is decremented prior to the call so PendingNodeData will - // return zero when the callback runs. - q.lock.Lock() - q.stateWriters -= 1 - q.lock.Unlock() - callback(nproc, progressed, procerr) - // Wake up WaitResults after the state has been written because it might be - // waiting for completion of the pivot block's state download. - q.active.Signal() - }() - - // If none of the data items were good, it's a stale delivery - switch { - case len(errs) == 0: - return len(process), nil - case len(errs) == len(request.Hashes): - return len(process), errStaleDelivery - default: - return len(process), fmt.Errorf("multiple failures: %v", errs) - } -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { @@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. } q.fastSyncPivot = pivot q.mode = mode - - // If long running fast sync, also start up a head stateretrieval immediately - if mode == FastSync && pivot > 0 { - q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) - } } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go new file mode 100644 index 000000000..a0b05c9be --- /dev/null +++ b/eth/downloader/statesync.go @@ -0,0 +1,475 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "hash" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" +) + +// stateReq represents a batch of state fetch requests groupped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peerConnection // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) + dropped bool // Flag whether the peer dropped off early +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + // Listen for peer departure events to cancel assigned tasks + peerDrop := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribePeerDrops(peerDrop) + defer peerSub.Unsubscribe() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + finished = append(finished[:0], finished[1:]...) + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previsouly timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle dropped peer connections: + case p := <-peerDrop: + // Skip if no request is currently pending + req := active[p.id] + if req == nil { + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.dropped = true + + finished = append(finished, req) + delete(active, p.id) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practive however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + old.timer.Stop() + old.dropped = true + + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *trie.TrieSync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + numUncommitted int + bytesUncommitted int + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download taks, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() error { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err := s.commit(false); err != nil { + return err + } + s.assignTasks() + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case req := <-s.deliver: + // Response, disconnect or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) + if len(req.items) <= 2 && !req.dropped && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + stale, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) + if !stale { + req.peer.SetNodeDataIdle(len(req.response)) + } + } + } + return s.commit(true) +} + +func (s *stateSync) commit(force bool) error { + if !force && s.bytesUncommitted < ethdb.IdealBatchSize { + return nil + } + start := time.Now() + b := s.d.stateDB.NewBatch() + s.sched.Commit(b) + if err := b.Write(); err != nil { + return fmt.Errorf("DB write error: %v", err) + } + s.updateStats(s.numUncommitted, 0, 0, time.Since(start)) + s.numUncommitted = 0 + s.bytesUncommitted = 0 + return nil +} + +// assignTasks attempts to assing new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + } + } + } +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +func (s *stateSync) process(req *stateReq) (bool, error) { + // Collect processing stats and update progress if valid data was received + duplicate, unexpected := 0, 0 + + defer func(start time.Time) { + if duplicate > 0 || unexpected > 0 { + s.updateStats(0, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress, stale := false, len(req.response) > 0 + + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + s.numUncommitted++ + s.bytesUncommitted += len(blob) + progress = progress || prog + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + // If the node delivered a requested item, mark the delivery non-stale + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + stale = false + } + } + // If we're inside the critical section, reset fail counter since we progressed. + if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 { + log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) + atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block + } + + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return stale, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(written) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + if written > 0 || duplicate > 0 || unexpected > 0 { + log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) + } +} diff --git a/eth/downloader/types.go b/eth/downloader/types.go index e10510486..3f30ea9dd 100644 --- a/eth/downloader/types.go +++ b/eth/downloader/types.go @@ -18,51 +18,10 @@ package downloader import ( "fmt" - "math/big" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) -// headerCheckFn is a callback type for verifying a header's presence in the local chain. -type headerCheckFn func(common.Hash) bool - -// blockAndStateCheckFn is a callback type for verifying block and associated states' presence in the local chain. -type blockAndStateCheckFn func(common.Hash) bool - -// headerRetrievalFn is a callback type for retrieving a header from the local chain. -type headerRetrievalFn func(common.Hash) *types.Header - -// blockRetrievalFn is a callback type for retrieving a block from the local chain. -type blockRetrievalFn func(common.Hash) *types.Block - -// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. -type headHeaderRetrievalFn func() *types.Header - -// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. -type headBlockRetrievalFn func() *types.Block - -// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. -type headFastBlockRetrievalFn func() *types.Block - -// headBlockCommitterFn is a callback for directly committing the head block to a certain entity. -type headBlockCommitterFn func(common.Hash) error - -// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. -type tdRetrievalFn func(common.Hash) *big.Int - -// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. -type headerChainInsertFn func([]*types.Header, int) (int, error) - -// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. -type blockChainInsertFn func(types.Blocks) (int, error) - -// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. -type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) - -// chainRollbackFn is a callback type to remove a few recently added elements from the local chain. -type chainRollbackFn func([]common.Hash) - // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 98cc1a76b..50966f5ee 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -83,6 +83,7 @@ type announce struct { // headerFilterTask represents a batch of headers needing fetcher filtering. type headerFilterTask struct { + peer string // The source peer of block headers headers []*types.Header // Collection of headers to filter time time.Time // Arrival time of the headers } @@ -90,6 +91,7 @@ type headerFilterTask struct { // headerFilterTask represents a batch of block bodies (transactions and uncles) // needing fetcher filtering. type bodyFilterTask struct { + peer string // The source peer of block bodies transactions [][]*types.Transaction // Collection of transactions per block bodies uncles [][]*types.Header // Collection of uncles per block bodies time time.Time // Arrival time of the blocks' contents @@ -218,8 +220,8 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { // FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. -func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header { - log.Trace("Filtering headers", "headers", len(headers)) +func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { + log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher filter := make(chan *headerFilterTask) @@ -231,7 +233,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type } // Request the filtering of the header list select { - case filter <- &headerFilterTask{headers: headers, time: time}: + case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: return nil } @@ -246,8 +248,8 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type // FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. -func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { - log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles)) +func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { + log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher filter := make(chan *bodyFilterTask) @@ -259,7 +261,7 @@ func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]* } // Request the filtering of the body list select { - case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}: + case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: case <-f.quit: return nil, nil } @@ -444,7 +446,7 @@ func (f *Fetcher) loop() { hash := header.Hash() // Filter fetcher-requested headers from other synchronisation algorithms - if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) @@ -523,7 +525,7 @@ func (f *Fetcher) loop() { txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i]) - if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash { + if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { // Mark the body matched, reassemble if still unknown matched = true diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index 85d2f8645..9889e6cc5 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -153,7 +153,7 @@ func (f *fetcherTester) dropPeer(peer string) { } // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer. -func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn { +func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn { closure := make(map[common.Hash]*types.Block) for hash, block := range blocks { closure[hash] = block @@ -166,14 +166,14 @@ func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, d headers = append(headers, block.Header()) } // Return on a new thread - go f.fetcher.FilterHeaders(headers, time.Now().Add(drift)) + go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift)) return nil } } // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer. -func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn { +func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn { closure := make(map[common.Hash]*types.Block) for hash, block := range blocks { closure[hash] = block @@ -191,7 +191,7 @@ func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, dri } } // Return on a new thread - go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift)) + go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift)) return nil } @@ -282,8 +282,8 @@ func testSequentialAnnouncements(t *testing.T, protocol int) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks until all are imported imported := make(chan *types.Block) @@ -309,22 +309,28 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) { // Assemble a tester with a built in counter for the requests tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack) + firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0) + secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack) + secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0) counter := uint32(0) - headerWrapper := func(hash common.Hash) error { + firstHeaderWrapper := func(hash common.Hash) error { + atomic.AddUint32(&counter, 1) + return firstHeaderFetcher(hash) + } + secondHeaderWrapper := func(hash common.Hash) error { atomic.AddUint32(&counter, 1) - return headerFetcher(hash) + return secondHeaderFetcher(hash) } // Iteratively announce blocks until all are imported imported := make(chan *types.Block) tester.fetcher.importedHook = func(block *types.Block) { imported <- block } for i := len(hashes) - 2; i >= 0; i-- { - tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher) - tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), headerWrapper, bodyFetcher) - tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), headerWrapper, bodyFetcher) + tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher) + tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher) + tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher) verifyImportEvent(t, imported, true) } verifyImportDone(t, imported) @@ -347,8 +353,8 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, but overlap them continuously overlap := 16 @@ -381,8 +387,8 @@ func testPendingDeduplication(t *testing.T, protocol int) { // Assemble a tester with a built in counter and delayed fetcher tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0) delay := 50 * time.Millisecond counter := uint32(0) @@ -425,8 +431,8 @@ func testRandomArrivalImport(t *testing.T, protocol int) { skip := targetBlocks / 2 tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) @@ -456,8 +462,8 @@ func testQueueGapFill(t *testing.T, protocol int) { skip := targetBlocks / 2 tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) @@ -486,8 +492,8 @@ func testImportDeduplication(t *testing.T, protocol int) { // Create the tester and wrap the importer with a counter tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) counter := uint32(0) tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) { @@ -570,8 +576,8 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} tester.lock.Unlock() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0) fetching := make(chan struct{}, 2) tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } @@ -603,14 +609,14 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { hashes, blocks := makeChain(1, 0, genesis) tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack) + badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) imported := make(chan *types.Block) tester.fetcher.importedHook = func(block *types.Block) { imported <- block } // Announce a block with a bad number, check for immediate drop - tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) verifyImportEvent(t, imported, false) tester.lock.RLock() @@ -620,8 +626,11 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { if !dropped { t.Fatalf("peer with invalid numbered announcement not dropped") } + + goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack) + goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0) // Make sure a good announcement passes without a drop - tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher) verifyImportEvent(t, imported, true) tester.lock.RLock() @@ -645,8 +654,8 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { hashes, blocks := makeChain(32, 0, genesis) tester := newTester() - headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - bodyFetcher := tester.makeBodyFetcher(blocks, 0) + headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Add a monitoring hook for all internal events fetching := make(chan []common.Hash) @@ -697,12 +706,12 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { // Create a valid chain and an infinite junk chain targetBlocks := hashLimit + 2*maxQueueDist hashes, blocks := makeChain(targetBlocks, 0, genesis) - validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) - validBodyFetcher := tester.makeBodyFetcher(blocks, 0) + validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) + validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) attack, _ := makeChain(targetBlocks, 0, unknownBlock) - attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack) - attackerBodyFetcher := tester.makeBodyFetcher(nil, 0) + attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack) + attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0) // Feed the tester a huge hashset from the attacker, and a limited from the valid peer for i := 0; i < len(attack); i++ { diff --git a/eth/filters/api.go b/eth/filters/api.go index 61647a5d0..03c1d6afc 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -52,7 +52,6 @@ type filter struct { // information related to the Ethereum protocol such als blocks, transactions and logs. type PublicFilterAPI struct { backend Backend - useMipMap bool mux *event.TypeMux quit chan struct{} chainDb ethdb.Database @@ -64,14 +63,12 @@ type PublicFilterAPI struct { // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ - backend: backend, - useMipMap: !lightMode, - mux: backend.EventMux(), - chainDb: backend.ChainDb(), - events: NewEventSystem(backend.EventMux(), backend, lightMode), - filters: make(map[rpc.ID]*filter), + backend: backend, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), } - go api.timeoutLoop() return api @@ -326,20 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { + // Convert the RPC block numbers into internal representations if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } + // Create and run the filter to get all the logs + filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics) - filter := New(api.backend, api.useMipMap) - filter.SetBeginBlock(crit.FromBlock.Int64()) - filter.SetEndBlock(crit.ToBlock.Int64()) - filter.SetAddresses(crit.Addresses) - filter.SetTopics(crit.Topics) - - logs, err := filter.Find(ctx) + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } return returnLogs(logs), err } @@ -373,21 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty return nil, fmt.Errorf("filter not found") } - filter := New(api.backend, api.useMipMap) + begin := rpc.LatestBlockNumber.Int64() if f.crit.FromBlock != nil { - filter.SetBeginBlock(f.crit.FromBlock.Int64()) - } else { - filter.SetBeginBlock(rpc.LatestBlockNumber.Int64()) + begin = f.crit.FromBlock.Int64() } + end := rpc.LatestBlockNumber.Int64() if f.crit.ToBlock != nil { - filter.SetEndBlock(f.crit.ToBlock.Int64()) - } else { - filter.SetEndBlock(rpc.LatestBlockNumber.Int64()) + end = f.crit.ToBlock.Int64() } - filter.SetAddresses(f.crit.Addresses) - filter.SetTopics(f.crit.Topics) + // Create and run the filter to get all the logs + filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) - logs, err := filter.Find(ctx) + logs, err := filter.Logs(ctx) if err != nil { return nil, err } @@ -395,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty } // GetFilterChanges returns the logs for the filter with the given id since -// last time is was called. This can be used for polling. +// last time it was called. This can be used for polling. // // For pending transaction and block filters the result is []common.Hash. // (pending)Log filters return []Log. @@ -504,7 +498,6 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { switch topic := t.(type) { case nil: // ignore topic when matching logs - args.Topics[i] = []common.Hash{{}} case string: // match specific topic @@ -513,12 +506,16 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { return err } args.Topics[i] = []common.Hash{top} + case []interface{}: // or case e.g. [null, "topic0", "topic1"] for _, rawTopic := range topic { if rawTopic == nil { - args.Topics[i] = append(args.Topics[i], common.Hash{}) - } else if topic, ok := rawTopic.(string); ok { + // null component, match all + args.Topics[i] = nil + break + } + if topic, ok := rawTopic.(string); ok { parsed, err := decodeTopic(topic) if err != nil { return err diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 068a5ea24..4ae37f977 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -34,7 +34,6 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca") topic1 = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3") topic2 = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce") - nullTopic = common.Hash{} ) // default values @@ -150,11 +149,8 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { if test6.Topics[0][0] != topic0 { t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0) } - if len(test6.Topics[1]) != 1 { - t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1])) - } - if test6.Topics[1][0] != nullTopic { - t.Fatalf("got %x, expected empty hash", test6.Topics[1][0]) + if len(test6.Topics[1]) != 0 { + t.Fatalf("expected 0 topic, got %d", len(test6.Topics[1])) } if len(test6.Topics[2]) != 1 { t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2])) @@ -180,18 +176,10 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { topic0, topic1, test7.Topics[0][0], test7.Topics[0][1], ) } - if len(test7.Topics[1]) != 1 { - t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1])) - } - if test7.Topics[1][0] != nullTopic { - t.Fatalf("expected empty hash, got %x", test7.Topics[1][0]) - } - if len(test7.Topics[2]) != 2 { - t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2])) + if len(test7.Topics[1]) != 0 { + t.Fatalf("expected 0 topic, got %d topics", len(test7.Topics[1])) } - if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic { - t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]", - topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1], - ) + if len(test7.Topics[2]) != 0 { + t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2])) } } diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go new file mode 100644 index 000000000..0a0929bc1 --- /dev/null +++ b/eth/filters/bench_test.go @@ -0,0 +1,201 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package filters + +import ( + "bytes" + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" +) + +func BenchmarkBloomBits512(b *testing.B) { + benchmarkBloomBits(b, 512) +} + +func BenchmarkBloomBits1k(b *testing.B) { + benchmarkBloomBits(b, 1024) +} + +func BenchmarkBloomBits2k(b *testing.B) { + benchmarkBloomBits(b, 2048) +} + +func BenchmarkBloomBits4k(b *testing.B) { + benchmarkBloomBits(b, 4096) +} + +func BenchmarkBloomBits8k(b *testing.B) { + benchmarkBloomBits(b, 8192) +} + +func BenchmarkBloomBits16k(b *testing.B) { + benchmarkBloomBits(b, 16384) +} + +func BenchmarkBloomBits32k(b *testing.B) { + benchmarkBloomBits(b, 32768) +} + +const benchFilterCnt = 2000 + +func benchmarkBloomBits(b *testing.B, sectionSize uint64) { + benchDataDir := node.DefaultDataDir() + "/geth/chaindata" + fmt.Println("Running bloombits benchmark section size:", sectionSize) + + db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024) + if err != nil { + b.Fatalf("error opening database at %v: %v", benchDataDir, err) + } + head := core.GetHeadBlockHash(db) + if head == (common.Hash{}) { + b.Fatalf("chain data not found at %v", benchDataDir) + } + + clearBloomBits(db) + fmt.Println("Generating bloombits data...") + headNum := core.GetBlockNumber(db, head) + if headNum < sectionSize+512 { + b.Fatalf("not enough blocks for running a benchmark") + } + + start := time.Now() + cnt := (headNum - 512) / sectionSize + var dataSize, compSize uint64 + for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ { + bc, err := bloombits.NewGenerator(uint(sectionSize)) + if err != nil { + b.Fatalf("failed to create generator: %v", err) + } + var header *types.Header + for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ { + hash := core.GetCanonicalHash(db, i) + header = core.GetHeader(db, hash, i) + if header == nil { + b.Fatalf("Error creating bloomBits data") + } + bc.AddBloom(uint(i-sectionIdx*sectionSize), header.Bloom) + } + sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1) + for i := 0; i < types.BloomBitLength; i++ { + data, err := bc.Bitset(uint(i)) + if err != nil { + b.Fatalf("failed to retrieve bitset: %v", err) + } + comp := bitutil.CompressBytes(data) + dataSize += uint64(len(data)) + compSize += uint64(len(comp)) + core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp) + } + //if sectionIdx%50 == 0 { + // fmt.Println(" section", sectionIdx, "/", cnt) + //} + } + + d := time.Since(start) + fmt.Println("Finished generating bloombits data") + fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") + fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) + + fmt.Println("Running filter benchmarks...") + start = time.Now() + mux := new(event.TypeMux) + var backend *testBackend + + for i := 0; i < benchFilterCnt; i++ { + if i%20 == 0 { + db.Close() + db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024) + backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + } + var addr common.Address + addr[0] = byte(i) + addr[1] = byte(i / 256) + filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + if _, err := filter.Logs(context.Background()); err != nil { + b.Error("filter.Find error:", err) + } + } + d = time.Since(start) + fmt.Println("Finished running filter benchmarks") + fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") + db.Close() +} + +func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) { + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(startPrefix) + for it.Valid() { + key := it.Key() + cmpLen := len(key) + if len(endPrefix) < cmpLen { + cmpLen = len(endPrefix) + } + if bytes.Compare(key[:cmpLen], endPrefix) == 1 { + break + } + fn(common.CopyBytes(key)) + it.Next() + } + it.Release() +} + +var bloomBitsPrefix = []byte("bloomBits-") + +func clearBloomBits(db ethdb.Database) { + fmt.Println("Clearing bloombits data...") + forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) { + db.Delete(key) + }) +} + +func BenchmarkNoBloomBits(b *testing.B) { + benchDataDir := node.DefaultDataDir() + "/geth/chaindata" + fmt.Println("Running benchmark without bloombits") + db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024) + if err != nil { + b.Fatalf("error opening database at %v: %v", benchDataDir, err) + } + head := core.GetHeadBlockHash(db) + if head == (common.Hash{}) { + b.Fatalf("chain data not found at %v", benchDataDir) + } + headNum := core.GetBlockNumber(db, head) + + clearBloomBits(db) + + fmt.Println("Running filter benchmarks...") + start := time.Now() + mux := new(event.TypeMux) + backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + filter := New(backend, 0, int64(headNum), []common.Address{{}}, nil) + filter.Logs(context.Background()) + d := time.Since(start) + fmt.Println("Finished running filter benchmarks") + fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") + db.Close() +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0a0b81224..e208f8f38 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,12 +18,11 @@ package filters import ( "context" - "math" "math/big" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -35,165 +34,185 @@ type Backend interface { EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } // Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend - useMipMap bool - - created time.Time + backend Backend db ethdb.Database begin, end int64 addresses []common.Address topics [][]common.Hash + + matcher *bloombits.Matcher } // New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. -// MipMaps allow past blocks to be searched much more efficiently, but are not available -// to light clients. -func New(backend Backend, useMipMap bool) *Filter { +func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. + var filters [][][]byte + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filters = append(filters, filter) + } + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filters = append(filters, filter) + } + // Assemble and return the filter + size, _ := backend.BloomStatus() + return &Filter{ backend: backend, - useMipMap: useMipMap, + begin: begin, + end: end, + addresses: addresses, + topics: topics, db: backend.ChainDb(), + matcher: bloombits.NewMatcher(size, filters), } } -// SetBeginBlock sets the earliest block for filtering. -// -1 = latest block (i.e., the current block) -// hash = particular hash from-to -func (f *Filter) SetBeginBlock(begin int64) { - f.begin = begin -} - -// SetEndBlock sets the latest block for filtering. -func (f *Filter) SetEndBlock(end int64) { - f.end = end -} - -// SetAddresses matches only logs that are generated from addresses that are included -// in the given addresses. -func (f *Filter) SetAddresses(addr []common.Address) { - f.addresses = addr -} - -// SetTopics matches only logs that have topics matching the given topics. -func (f *Filter) SetTopics(topics [][]common.Hash) { - f.topics = topics -} - -// FindOnce searches the blockchain for matching log entries, returning -// all matching entries from the first block that contains matches, -// updating the start point of the filter accordingly. If no results are -// found, a nil slice is returned. -func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) { - head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if head == nil { +// Logs searches the blockchain for matching log entries, returning all from the +// first block that contains matches, updating the start of the filter accordingly. +func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // Figure out the limits of the filter range + header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if header == nil { return nil, nil } - headBlockNumber := head.Number.Uint64() + head := header.Number.Uint64() - var beginBlockNo uint64 = uint64(f.begin) if f.begin == -1 { - beginBlockNo = headBlockNumber + f.begin = int64(head) } - var endBlockNo uint64 = uint64(f.end) + end := uint64(f.end) if f.end == -1 { - endBlockNo = headBlockNumber + end = head } - - // if no addresses are present we can't make use of fast search which - // uses the mipmap bloom filters to check for fast inclusion and uses - // higher range probability in order to ensure at least a false positive - if !f.useMipMap || len(f.addresses) == 0 { - logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo) - f.begin = int64(blockNumber + 1) - return logs, err - } - - logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0) - f.begin = int64(blockNumber + 1) - return logs, nil -} - -// Run filters logs with the current parameters set -func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) { - for { - newLogs, err := f.FindOnce(ctx) - if len(newLogs) == 0 || err != nil { + // Gather all indexed logs, and finish with non indexed ones + var ( + logs []*types.Log + err error + ) + size, sections := f.backend.BloomStatus() + if indexed := sections * size; indexed > uint64(f.begin) { + if indexed > end { + logs, err = f.indexedLogs(ctx, end) + } else { + logs, err = f.indexedLogs(ctx, indexed-1) + } + if err != nil { return logs, err } - logs = append(logs, newLogs...) } + rest, err := f.unindexedLogs(ctx, end) + logs = append(logs, rest...) + return logs, err } -func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) { - level := core.MIPMapLevels[depth] - // normalise numerator so we can work in level specific batches and - // work with the proper range checks - for num := start / level * level; num <= end; num += level { - // find addresses in bloom filters - bloom := core.GetMipmapBloom(f.db, num, level) - // Don't bother checking the first time through the loop - we're probably picking - // up where a previous run left off. - first := true - for _, addr := range f.addresses { - if first || bloom.TestBytes(addr[:]) { - first = false - // range check normalised values and make sure that - // we're resolving the correct range instead of the - // normalised values. - start := uint64(math.Max(float64(num), float64(start))) - end := uint64(math.Min(float64(num+level-1), float64(end))) - if depth+1 == len(core.MIPMapLevels) { - l, blockNumber, _ := f.getLogs(context.Background(), start, end) - if len(l) > 0 { - return l, blockNumber - } - } else { - l, blockNumber := f.mipFind(start, end, depth+1) - if len(l) > 0 { - return l, blockNumber - } +// indexedLogs returns the logs matching the filter criteria based on the bloom +// bits indexed available locally or via the network. +func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + // Create a matcher session and request servicing from the backend + matches := make(chan uint64, 64) + + session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) + if err != nil { + return nil, err + } + defer session.Close() + + f.backend.ServiceFilter(ctx, session) + + // Iterate over the matches until exhausted or context closed + var logs []*types.Log + + for { + select { + case number, ok := <-matches: + // Abort if all matches have been fulfilled + if !ok { + err := session.Error() + if err == nil { + f.begin = int64(end) + 1 } + return logs, err + } + f.begin = int64(number) + 1 + // Retrieve the suggested block and pull any truly matching logs + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + if header == nil || err != nil { + return logs, err } + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + + case <-ctx.Done(): + return logs, ctx.Err() } } - - return nil, end } -func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) { - for i := start; i <= end; i++ { - blockNumber := rpc.BlockNumber(i) - header, err := f.backend.HeaderByNumber(ctx, blockNumber) +// indexedLogs returns the logs matching the filter criteria based on raw block +// iteration and bloom matching. +func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + var logs []*types.Log + + for ; f.begin <= int64(end); f.begin++ { + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { - return logs, end, err + return logs, err } - - // Use bloom filtering to see if this block is interesting given the - // current parameters - if f.bloomFilter(header.Bloom) { - // Get the logs of the block - receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) if err != nil { - return nil, end, err - } - var unfiltered []*types.Log - for _, receipt := range receipts { - unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...) - } - logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - if len(logs) > 0 { - return logs, uint64(blockNumber), nil + return logs, err } + logs = append(logs, found...) } } + return logs, nil +} - return logs, end, nil +// checkMatches checks if the receipts belonging to the given header contain any log events that +// match the filter criteria. This function is called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + // Get the logs of the block + receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil, err + } + var unfiltered []*types.Log + for _, receipt := range receipts { + unfiltered = append(unfiltered, receipt.Logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + if len(logs) > 0 { + return logs, nil + } + return nil, nil } func includes(addresses []common.Address, a common.Address) bool { @@ -221,39 +240,27 @@ Logs: if len(addresses) > 0 && !includes(addresses, log.Address) { continue } - - logTopics := make([]common.Hash, len(topics)) - copy(logTopics, log.Topics) - // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { continue Logs } - for i, topics := range topics { - var match bool + match := len(topics) == 0 // empty rule set == wildcard for _, topic := range topics { - // common.Hash{} is a match all (wildcard) - if (topic == common.Hash{}) || log.Topics[i] == topic { + if log.Topics[i] == topic { match = true break } } - if !match { continue Logs } } ret = append(ret, log) } - return ret } -func (f *Filter) bloomFilter(bloom types.Bloom) bool { - return bloomFilter(bloom, f.addresses, f.topics) -} - func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool { if len(addresses) > 0 { var included bool @@ -263,16 +270,15 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo break } } - if !included { return false } } for _, sub := range topics { - var included bool + included := len(sub) == 0 // empty rule set == wildcard for _, topic := range sub { - if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) { + if types.BloomLookup(bloom, topic) { included = true break } @@ -281,6 +287,5 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo return false } } - return true } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 7abace1e6..e08cedb27 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -54,6 +54,19 @@ const ( LastIndexSubscription ) +const ( + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) @@ -74,7 +87,6 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { mux *event.TypeMux - sub *event.TypeMuxSubscription backend Backend lightMode bool lastHead *types.Header @@ -200,7 +212,6 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -218,7 +229,6 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -236,7 +246,6 @@ func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*ty installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -253,7 +262,6 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -270,64 +278,56 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) { +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { if ev == nil { return } - switch e := ev.Data.(type) { + switch e := ev.(type) { case []*types.Log: if len(e) > 0 { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } } case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - case core.PendingLogsEvent: - for _, f := range filters[PendingLogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs + case *event.TypeMuxEvent: + switch muxe := e.Data.(type) { + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } } } } case core.TxPreEvent: for _, f := range filters[PendingTransactionsSubscription] { - if ev.Time.After(f.created) { - f.hashes <- e.Tx.Hash() - } + f.hashes <- e.Tx.Hash() } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { - if ev.Time.After(f.created) { - f.headers <- e.Block.Header() - } + f.headers <- e.Block.Header() } if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } }) @@ -396,9 +396,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { var ( index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{}) + sub = es.mux.Subscribe(core.PendingLogsEvent{}) + // Subscribe TxPreEvent form txpool + txCh = make(chan core.TxPreEvent, txChanSize) + txSub = es.backend.SubscribeTxPreEvent(txCh) + // Subscribe RemovedLogsEvent + rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) + rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) + // Subscribe []*types.Log + logsCh = make(chan []*types.Log, logsChanSize) + logsSub = es.backend.SubscribeLogsEvent(logsCh) + // Subscribe ChainEvent + chainEvCh = make(chan core.ChainEvent, chainEvChanSize) + chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) ) + // Unsubscribe all events + defer sub.Unsubscribe() + defer txSub.Unsubscribe() + defer rmLogsSub.Unsubscribe() + defer logsSub.Unsubscribe() + defer chainEvSub.Unsubscribe() + for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } @@ -410,6 +429,17 @@ func (es *EventSystem) eventLoop() { return } es.broadcast(index, ev) + + // Handle subscribed events + case ev := <-txCh: + es.broadcast(index, ev) + case ev := <-rmLogsCh: + es.broadcast(index, ev) + case ev := <-logsCh: + es.broadcast(index, ev) + case ev := <-chainEvCh: + es.broadcast(index, ev) + case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -428,6 +458,16 @@ func (es *EventSystem) eventLoop() { delete(index[f.typ], f.id) } close(f.err) + + // System stopped + case <-txSub.Err(): + return + case <-rmLogsSub.Err(): + return + case <-logsSub.Err(): + return + case <-chainEvSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 822580b56..7da114fda 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,13 +18,16 @@ package filters import ( "context" + "fmt" "math/big" + "math/rand" "reflect" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -33,8 +36,13 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database + mux *event.TypeMux + db ethdb.Database + sections uint64 + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -63,6 +71,53 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t return core.GetBlockReceipts(b.db, blockHash, num), nil } +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.txFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.rmLogsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.logsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.chainFeed.Subscribe(ch) +} + +func (b *testBackend) BloomStatus() (uint64, uint64) { + return params.BloomBitsBlocks, b.sections +} + +func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { + requests := make(chan chan *bloombits.Retrieval) + + go session.Multiplex(16, 0, requests) + go func() { + for { + // Wait for a service request or a shutdown + select { + case <-ctx.Done(): + return + + case request := <-requests: + task := <-request + + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + if rand.Int()%4 != 0 { // Handle occasional missing deliveries + head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1) + task.Bitsets[i], _ = core.GetBloomBits(b.db, task.Bit, section, head) + } + } + request <- task + } + } + }() +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -74,7 +129,11 @@ func TestBlockSubscription(t *testing.T) { var ( mux = new(event.TypeMux) db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} api = NewPublicFilterAPI(backend, false) genesis = new(core.Genesis).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) @@ -113,7 +172,7 @@ func TestBlockSubscription(t *testing.T) { time.Sleep(1 * time.Second) for _, e := range chainEvents { - mux.Post(e) + chainFeed.Send(e) } <-sub0.Err() @@ -125,10 +184,14 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -146,9 +209,10 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(1 * time.Second) for _, tx := range transactions { ev := core.TxPreEvent{Tx: tx} - mux.Post(ev) + txFeed.Send(ev) } + timeout := time.Now().Add(1 * time.Second) for { results, err := api.GetFilterChanges(fid0) if err != nil { @@ -160,10 +224,18 @@ func TestPendingTxFilter(t *testing.T) { if len(hashes) >= len(transactions) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + return + } for i := range hashes { if hashes[i] != transactions[i].Hash() { t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) @@ -175,10 +247,14 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -220,10 +296,14 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -241,15 +321,19 @@ func TestInvalidLogFilterCreation(t *testing.T) { } } -// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed. func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -279,7 +363,7 @@ func TestLogFilter(t *testing.T) { // match all 0: {FilterCriteria{}, allLogs, ""}, // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, []*types.Log{}, ""}, + 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, // match logs based on addresses, ignore topics 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, // match none due to no matching topics (match with address) @@ -300,6 +384,8 @@ func TestLogFilter(t *testing.T) { 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // all "mined" and pending logs with topic firstTopic 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + // match all logs due to wildcard topic + 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -310,8 +396,8 @@ func TestLogFilter(t *testing.T) { // raise events time.Sleep(1 * time.Second) - if err := mux.Post(allLogs); err != nil { - t.Fatal(err) + if nsend := logsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Shoud have at least one subscription") } if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) @@ -319,6 +405,7 @@ func TestLogFilter(t *testing.T) { for i, tt := range testCases { var fetched []*types.Log + timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { @@ -329,6 +416,10 @@ func TestLogFilter(t *testing.T) { if len(fetched) >= len(tt.expected) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } @@ -349,15 +440,19 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -366,7 +461,7 @@ func TestPendingLogsSubscription(t *testing.T) { firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") + fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") allLogs = []core.PendingLogsEvent{ @@ -378,7 +473,7 @@ func TestPendingLogsSubscription(t *testing.T) { {Logs: []*types.Log{ {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{forthTopic}, BlockNumber: 5}, + {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, }}, } @@ -400,7 +495,7 @@ func TestPendingLogsSubscription(t *testing.T) { // match all {FilterCriteria{}, convertLogs(allLogs), nil, nil}, // match none due to no matching addresses - {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{{}}}, []*types.Log{}, nil, nil}, + {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil}, // match logs based on addresses, ignore topics {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // match none due to no matching topics (match with address) @@ -412,7 +507,7 @@ func TestPendingLogsSubscription(t *testing.T) { // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // multiple pending logs, should match only 2 topics from the logs in block 5 - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, forthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, } ) @@ -439,15 +534,15 @@ func TestPendingLogsSubscription(t *testing.T) { } if len(fetched) != len(tt.expected) { - t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))) } for l := range fetched { if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) + panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i)) } if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - t.Errorf("invalid log on index %d for case %d", l, i) + panic(fmt.Sprintf("invalid log on index %d for case %d", l, i)) } } }() @@ -455,6 +550,7 @@ func TestPendingLogsSubscription(t *testing.T) { // raise events time.Sleep(1 * time.Second) + // allLogs are type of core.PendingLogsEvent for _, l := range allLogs { if err := mux.Post(l); err != nil { t.Fatal(err) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index cd5e7cafd..11235e95a 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -33,7 +33,7 @@ import ( ) func makeReceipt(addr common.Address) *types.Receipt { - receipt := types.NewReceipt(nil, new(big.Int)) + receipt := types.NewReceipt(nil, false, new(big.Int)) receipt.Logs = []*types.Log{ {Address: addr}, } @@ -41,54 +41,46 @@ func makeReceipt(addr common.Address) *types.Receipt { return receipt } -func BenchmarkMipmaps(b *testing.B) { - dir, err := ioutil.TempDir("", "mipmap") +func BenchmarkFilters(b *testing.B) { + dir, err := ioutil.TempDir("", "filtertest") if err != nil { b.Fatal(err) } defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000)) chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 100010, func(i int, gen *core.BlockGen) { - var receipts types.Receipts switch i { case 2403: receipt := makeReceipt(addr1) - receipts = types.Receipts{receipt} gen.AddUncheckedReceipt(receipt) case 1034: receipt := makeReceipt(addr2) - receipts = types.Receipts{receipt} gen.AddUncheckedReceipt(receipt) case 34: receipt := makeReceipt(addr3) - receipts = types.Receipts{receipt} gen.AddUncheckedReceipt(receipt) case 99999: receipt := makeReceipt(addr4) - receipts = types.Receipts{receipt} gen.AddUncheckedReceipt(receipt) } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - b.Fatal(err) - } - core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { core.WriteBlock(db, block) @@ -104,13 +96,10 @@ func BenchmarkMipmaps(b *testing.B) { } b.ResetTimer() - filter := New(backend, true) - filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4}) - filter.SetBeginBlock(0) - filter.SetEndBlock(-1) + filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { - logs, _ := filter.Find(context.Background()) + logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { b.Fatal("expected 4 logs, got", len(logs)) } @@ -118,18 +107,22 @@ func BenchmarkMipmaps(b *testing.B) { } func TestFilters(t *testing.T) { - dir, err := ioutil.TempDir("", "mipmap") + dir, err := ioutil.TempDir("", "filtertest") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) @@ -140,10 +133,9 @@ func TestFilters(t *testing.T) { genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000)) chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 1000, func(i int, gen *core.BlockGen) { - var receipts types.Receipts switch i { case 1: - receipt := types.NewReceipt(nil, new(big.Int)) + receipt := types.NewReceipt(nil, false, new(big.Int)) receipt.Logs = []*types.Log{ { Address: addr, @@ -151,9 +143,8 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} case 2: - receipt := types.NewReceipt(nil, new(big.Int)) + receipt := types.NewReceipt(nil, false, new(big.Int)) receipt.Logs = []*types.Log{ { Address: addr, @@ -161,9 +152,8 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} case 998: - receipt := types.NewReceipt(nil, new(big.Int)) + receipt := types.NewReceipt(nil, false, new(big.Int)) receipt.Logs = []*types.Log{ { Address: addr, @@ -171,9 +161,8 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} case 999: - receipt := types.NewReceipt(nil, new(big.Int)) + receipt := types.NewReceipt(nil, false, new(big.Int)) receipt.Logs = []*types.Log{ { Address: addr, @@ -181,18 +170,7 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} - } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) } - // i is used as block number for the writes but since the i - // starts at 0 and block 0 (genesis) is already present increment - // by one - core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { core.WriteBlock(db, block) @@ -207,23 +185,15 @@ func TestFilters(t *testing.T) { } } - filter := New(backend, true) - filter.SetAddresses([]common.Address{addr}) - filter.SetTopics([][]common.Hash{{hash1, hash2, hash3, hash4}}) - filter.SetBeginBlock(0) - filter.SetEndBlock(-1) + filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) - logs, _ := filter.Find(context.Background()) + logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = New(backend, true) - filter.SetAddresses([]common.Address{addr}) - filter.SetTopics([][]common.Hash{{hash3}}) - filter.SetBeginBlock(900) - filter.SetEndBlock(999) - logs, _ = filter.Find(context.Background()) + filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) } @@ -231,12 +201,8 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, true) - filter.SetAddresses([]common.Address{addr}) - filter.SetTopics([][]common.Hash{{hash3}}) - filter.SetBeginBlock(990) - filter.SetEndBlock(-1) - logs, _ = filter.Find(context.Background()) + filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) } @@ -244,44 +210,32 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, true) - filter.SetTopics([][]common.Hash{{hash1, hash2}}) - filter.SetBeginBlock(1) - filter.SetEndBlock(10) + filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) - logs, _ = filter.Find(context.Background()) + logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { t.Error("expected 2 log, got", len(logs)) } failHash := common.BytesToHash([]byte("fail")) - filter = New(backend, true) - filter.SetTopics([][]common.Hash{{failHash}}) - filter.SetBeginBlock(0) - filter.SetEndBlock(-1) + filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}}) - logs, _ = filter.Find(context.Background()) + logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = New(backend, true) - filter.SetAddresses([]common.Address{failAddr}) - filter.SetBeginBlock(0) - filter.SetEndBlock(-1) + filter = New(backend, 0, -1, []common.Address{failAddr}, nil) - logs, _ = filter.Find(context.Background()) + logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = New(backend, true) - filter.SetTopics([][]common.Hash{{failHash}, {hash1}}) - filter.SetBeginBlock(0) - filter.SetEndBlock(-1) + filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) - logs, _ = filter.Find(context.Background()) + logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } diff --git a/eth/gen_config.go b/eth/gen_config.go index 477479419..4a4cd7b9c 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -47,7 +47,6 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.SyncMode = c.SyncMode enc.LightServ = c.LightServ enc.LightPeers = c.LightPeers - enc.MaxPeers = c.MaxPeers enc.SkipBcVersionCheck = c.SkipBcVersionCheck enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseCache = c.DatabaseCache @@ -119,9 +118,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.LightPeers != nil { c.LightPeers = *dec.LightPeers } - if dec.MaxPeers != nil { - c.MaxPeers = *dec.MaxPeers - } if dec.SkipBcVersionCheck != nil { c.SkipBcVersionCheck = *dec.SkipBcVersionCheck } diff --git a/eth/handler.go b/eth/handler.go index 8c2f5660d..bec5126dc 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,6 +45,10 @@ import ( const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 ) var ( @@ -78,7 +82,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub *event.TypeMuxSubscription + txCh chan core.TxPreEvent + txSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -87,8 +92,6 @@ type ProtocolManager struct { quitSync chan struct{} noMorePeers chan struct{} - lesServer LesServer - // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -96,7 +99,7 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, maxPeers int, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, @@ -105,7 +108,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne blockchain: blockchain, chaindb: chaindb, chainconfig: config, - maxPeers: maxPeers, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), @@ -159,10 +161,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash, - blockchain.GetBlockByHash, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, - blockchain.GetTdByHash, blockchain.InsertHeaderChain, manager.blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback, - manager.removePeer) + manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) @@ -203,10 +202,14 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) Start() { +func (pm *ProtocolManager) Start(maxPeers int) { + pm.maxPeers = maxPeers + // broadcast transactions - pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + pm.txCh = make(chan core.TxPreEvent, txChanSize) + pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() @@ -270,7 +273,7 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -447,7 +450,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case - headers = pm.fetcher.FilterHeaders(headers, time.Now()) + headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) @@ -500,7 +503,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Filter out any explicitly requested bodies, deliver the rest to the downloader filter := len(trasactions) > 0 || len(uncles) > 0 if filter { - trasactions, uncles = pm.fetcher.FilterBodies(trasactions, uncles, time.Now()) + trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now()) } if len(trasactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) @@ -606,7 +609,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Schedule all the unknown hashes for retrieval unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { - if !pm.blockchain.HasBlock(block.Hash) { + if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } @@ -663,7 +666,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkTransaction(tx.Hash()) } - pm.txpool.AddBatch(txs) + pm.txpool.AddRemotes(txs) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -693,9 +696,10 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return } // Otherwise if the block is indeed in out own chain, announce it - if pm.blockchain.HasBlock(hash) { + if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } @@ -728,10 +732,15 @@ func (self *ProtocolManager) minedBroadcastLoop() { } func (self *ProtocolManager) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.Data.(core.TxPreEvent) - self.BroadcastTx(event.Tx.Hash(), event.Tx) + for { + select { + case event := <-self.txCh: + self.BroadcastTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.txSub.Err(): + return + } } } diff --git a/eth/handler_test.go b/eth/handler_test.go index 413ed2bff..6752cd2a8 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -374,7 +374,7 @@ func testGetNodeData(t *testing.T, protocol int) { } accounts := []common.Address{testBank, acc1Addr, acc2Addr} for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { - trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), statedb) + trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb)) for j, acc := range accounts { state, _ := pm.blockchain.State() @@ -474,13 +474,13 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{}) ) - pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db) + pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } - pm.Start() + pm.Start(1000) defer pm.Stop() // Connect a new peer and check that we receive the DAO challenge diff --git a/eth/helper_test.go b/eth/helper_test.go index 0260b9d77..f02242b15 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -59,18 +59,18 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) ) chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { panic(err) } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, 1000, evmux, &testTxPool{added: newtx}, engine, blockchain, db) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db) if err != nil { return nil, err } - pm.Start() + pm.Start(1000) return pm, nil } @@ -88,15 +88,16 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i // testTxPool is a fake, helper transaction pool for testing purposes type testTxPool struct { - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + txFeed event.Feed + pool []*types.Transaction // Collection of all transactions + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } -// AddBatch appends a batch of transactions to the pool, and notifies any +// AddRemotes appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) AddBatch(txs []*types.Transaction) error { +func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error { p.lock.Lock() defer p.lock.Unlock() @@ -104,8 +105,7 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) error { if p.added != nil { p.added <- txs } - - return nil + return make([]error, len(txs)) } // Pending returns all the transactions known to the pool @@ -124,6 +124,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize)) diff --git a/eth/protocol.go b/eth/protocol.go index 4bc8bee72..cd7db57f2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -22,7 +22,9 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" ) @@ -94,12 +96,16 @@ var errorToString = map[int]string{ } type txPool interface { - // AddBatch should add the given transactions to the pool. - AddBatch([]*types.Transaction) error + // AddRemotes should add the given transactions to the pool. + AddRemotes([]*types.Transaction) []error // Pending should return pending transactions. // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) + + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 2056ee0a8..d3a44ae91 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) { for nonce := range alltxs { alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) } - pm.txpool.AddBatch(alltxs) + pm.txpool.AddRemotes(alltxs) // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup diff --git a/eth/sync.go b/eth/sync.go index 8784b225d..a8ae64617 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -138,7 +138,9 @@ func (pm *ProtocolManager) syncer() { defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations - forceSync := time.Tick(forceSyncCycle) + forceSync := time.NewTicker(forceSyncCycle) + defer forceSync.Stop() + for { select { case <-pm.newPeerCh: @@ -148,7 +150,7 @@ func (pm *ProtocolManager) syncer() { } go pm.synchronise(pm.peers.BestPeer()) - case <-forceSync: + case <-forceSync.C: // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) @@ -186,7 +188,17 @@ func (pm *ProtocolManager) synchronise(peer *peer) { atomic.StoreUint32(&pm.fastSync, 1) mode = downloader.FastSync } - if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { + // Run the sync cycle, and disable fast sync if we've went past the pivot block + err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode) + + if atomic.LoadUint32(&pm.fastSync) == 1 { + // Disable fast sync if we indeed have something in our chain + if pm.blockchain.CurrentBlock().NumberU64() > 0 { + log.Info("Fast sync complete, auto disabling") + atomic.StoreUint32(&pm.fastSync, 0) + } + } + if err != nil { return } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done @@ -199,12 +211,4 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // more reliably update peers or the local TD state. go pm.BroadcastBlock(head, false) } - // If fast sync was enabled, and we synced up, disable it - if atomic.LoadUint32(&pm.fastSync) == 1 { - // Disable fast sync if we indeed have something in our chain - if pm.blockchain.CurrentBlock().NumberU64() > 0 { - log.Info("Fast sync complete, auto disabling") - atomic.StoreUint32(&pm.fastSync, 0) - } - } } |