aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/api.go37
-rw-r--r--eth/api_backend.go86
-rw-r--r--eth/api_test.go2
-rw-r--r--eth/backend.go72
-rw-r--r--eth/backend_test.go83
-rw-r--r--eth/bind.go8
-rw-r--r--eth/bloombits.go143
-rw-r--r--eth/config.go1
-rw-r--r--eth/db_upgrade.go321
-rw-r--r--eth/downloader/downloader.go560
-rw-r--r--eth/downloader/downloader_test.go371
-rw-r--r--eth/downloader/fakepeer.go160
-rw-r--r--eth/downloader/metrics.go6
-rw-r--r--eth/downloader/peer.go193
-rw-r--r--eth/downloader/queue.go303
-rw-r--r--eth/downloader/statesync.go475
-rw-r--r--eth/downloader/types.go41
-rw-r--r--eth/fetcher/fetcher.go18
-rw-r--r--eth/fetcher/fetcher_test.go79
-rw-r--r--eth/filters/api.go55
-rw-r--r--eth/filters/api_test.go24
-rw-r--r--eth/filters/bench_test.go201
-rw-r--r--eth/filters/filter.go275
-rw-r--r--eth/filters/filter_system.go104
-rw-r--r--eth/filters/filter_system_test.go170
-rw-r--r--eth/filters/filter_test.go136
-rw-r--r--eth/gen_config.go4
-rw-r--r--eth/handler.go51
-rw-r--r--eth/handler_test.go8
-rw-r--r--eth/helper_test.go22
-rw-r--r--eth/protocol.go10
-rw-r--r--eth/protocol_test.go2
-rw-r--r--eth/sync.go26
33 files changed, 2328 insertions, 1719 deletions
diff --git a/eth/api.go b/eth/api.go
index 81570988c..e91f51bb9 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -51,7 +51,7 @@ type PublicEthereumAPI struct {
e *Ethereum
}
-// NewPublicEthereumAPI creates a new Etheruem protocol API for full nodes.
+// NewPublicEthereumAPI creates a new Ethereum protocol API for full nodes.
func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI {
return &PublicEthereumAPI{e}
}
@@ -205,7 +205,7 @@ func (api *PrivateMinerAPI) GetHashrate() uint64 {
return uint64(api.e.miner.HashRate())
}
-// PrivateAdminAPI is the collection of Etheruem full node-related APIs
+// PrivateAdminAPI is the collection of Ethereum full node-related APIs
// exposed over the private admin endpoint.
type PrivateAdminAPI struct {
eth *Ethereum
@@ -241,7 +241,7 @@ func (api *PrivateAdminAPI) ExportChain(file string) (bool, error) {
func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool {
for _, b := range bs {
- if !chain.HasBlock(b.Hash()) {
+ if !chain.HasBlock(b.Hash(), b.NumberU64()) {
return false
}
}
@@ -298,7 +298,7 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}
-// PublicDebugAPI is the collection of Etheruem full node APIs exposed
+// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
eth *Ethereum
@@ -335,7 +335,7 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error
return stateDb.RawDump(), nil
}
-// PrivateDebugAPI is the collection of Etheruem full node APIs exposed over
+// PrivateDebugAPI is the collection of Ethereum full node APIs exposed over
// the private debugging endpoint.
type PrivateDebugAPI struct {
config *params.ChainConfig
@@ -465,26 +465,6 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf
return true, structLogger.StructLogs(), nil
}
-// callmsg is the message type used for call transitions.
-type callmsg struct {
- addr common.Address
- to *common.Address
- gas, gasPrice *big.Int
- value *big.Int
- data []byte
-}
-
-// accessor boilerplate to implement core.Message
-func (m callmsg) From() (common.Address, error) { return m.addr, nil }
-func (m callmsg) FromFrontier() (common.Address, error) { return m.addr, nil }
-func (m callmsg) Nonce() uint64 { return 0 }
-func (m callmsg) CheckNonce() bool { return false }
-func (m callmsg) To() *common.Address { return m.to }
-func (m callmsg) GasPrice() *big.Int { return m.gasPrice }
-func (m callmsg) Gas() *big.Int { return m.gas }
-func (m callmsg) Value() *big.Int { return m.value }
-func (m callmsg) Data() []byte { return m.data }
-
// formatError formats a Go error into either an empty string or the data content
// of the error itself.
func formatError(err error) string {
@@ -543,7 +523,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common.
// Run the transaction with tracing enabled.
vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{Debug: true, Tracer: tracer})
- ret, gas, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()))
+ ret, gas, failed, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()))
if err != nil {
return nil, fmt.Errorf("tracing failed: %v", err)
}
@@ -551,6 +531,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common.
case *vm.StructLogger:
return &ethapi.ExecutionResult{
Gas: gas,
+ Failed: failed,
ReturnValue: fmt.Sprintf("%x", ret),
StructLogs: ethapi.FormatLogs(tracer.StructLogs()),
}, nil
@@ -590,7 +571,7 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int) (co
vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{})
gp := new(core.GasPool).AddGas(tx.Gas())
- _, _, err := core.ApplyMessage(vmenv, msg, gp)
+ _, _, _, err := core.ApplyMessage(vmenv, msg, gp)
if err != nil {
return nil, vm.Context{}, nil, fmt.Errorf("tx %x failed: %v", tx.Hash(), err)
}
@@ -637,7 +618,7 @@ func (api *PrivateDebugAPI) StorageRangeAt(ctx context.Context, blockHash common
return storageRangeAt(st, keyStart, maxResult), nil
}
-func storageRangeAt(st *trie.SecureTrie, start []byte, maxResult int) StorageRangeResult {
+func storageRangeAt(st state.Trie, start []byte, maxResult int) StorageRangeResult {
it := trie.NewIterator(st.NodeIterator(start))
result := StorageRangeResult{Storage: storageMap{}}
for i := 0; i < maxResult && it.Next(); i++ {
diff --git a/eth/api_backend.go b/eth/api_backend.go
index fe108d272..91f392f94 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@@ -31,7 +32,6 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -81,11 +81,11 @@ func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumb
return b.eth.blockchain.GetBlockByNumber(uint64(blockNr)), nil
}
-func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (ethapi.State, *types.Header, error) {
+func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) {
// Pending state is only known by the miner
if blockNr == rpc.PendingBlockNumber {
block, state := b.eth.miner.Pending()
- return EthApiState{state}, block.Header(), nil
+ return state, block.Header(), nil
}
// Otherwise resolve the block number and return its state
header, err := b.HeaderByNumber(ctx, blockNr)
@@ -93,7 +93,7 @@ func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.
return nil, nil, err
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
- return EthApiState{stateDb}, header, err
+ return stateDb, header, err
}
func (b *EthApiBackend) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) {
@@ -108,40 +108,43 @@ func (b *EthApiBackend) GetTd(blockHash common.Hash) *big.Int {
return b.eth.blockchain.GetTdByHash(blockHash)
}
-func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state ethapi.State, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) {
- statedb := state.(EthApiState).state
- from := statedb.GetOrNewStateObject(msg.From())
- from.SetBalance(math.MaxBig256)
+func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) {
+ state.SetBalance(msg.From(), math.MaxBig256)
vmError := func() error { return nil }
context := core.NewEVMContext(msg, header, b.eth.BlockChain(), nil)
- return vm.NewEVM(context, statedb, b.eth.chainConfig, vmCfg), vmError, nil
+ return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
}
-func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
+func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainEvent(ch)
+}
- b.eth.txPool.SetLocal(signedTx)
- return b.eth.txPool.Add(signedTx)
+func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}
-func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
+func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainSideEvent(ch)
+}
- b.eth.txPool.Remove(txHash)
+func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
-func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
+func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
+ return b.eth.txPool.AddLocal(signedTx)
+}
+func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
pending, err := b.eth.txPool.Pending()
if err != nil {
return nil, err
}
-
var txs types.Transactions
for _, batch := range pending {
txs = append(txs, batch...)
@@ -150,33 +153,25 @@ func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
}
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
-
return b.eth.txPool.Get(hash)
}
func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
-
return b.eth.txPool.State().GetNonce(addr), nil
}
func (b *EthApiBackend) Stats() (pending int, queued int) {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
-
return b.eth.txPool.Stats()
}
func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
- b.eth.txMu.Lock()
- defer b.eth.txMu.Unlock()
-
return b.eth.TxPool().Content()
}
+func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.eth.TxPool().SubscribeTxPreEvent(ch)
+}
+
func (b *EthApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}
@@ -201,22 +196,13 @@ func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}
-type EthApiState struct {
- state *state.StateDB
-}
-
-func (s EthApiState) GetBalance(ctx context.Context, addr common.Address) (*big.Int, error) {
- return s.state.GetBalance(addr), nil
+func (b *EthApiBackend) BloomStatus() (uint64, uint64) {
+ sections, _, _ := b.eth.bloomIndexer.Sections()
+ return params.BloomBitsBlocks, sections
}
-func (s EthApiState) GetCode(ctx context.Context, addr common.Address) ([]byte, error) {
- return s.state.GetCode(addr), nil
-}
-
-func (s EthApiState) GetState(ctx context.Context, a common.Address, b common.Hash) (common.Hash, error) {
- return s.state.GetState(a, b), nil
-}
-
-func (s EthApiState) GetNonce(ctx context.Context, addr common.Address) (uint64, error) {
- return s.state.GetNonce(addr), nil
+func (b *EthApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < bloomFilterThreads; i++ {
+ go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
+ }
}
diff --git a/eth/api_test.go b/eth/api_test.go
index f8d2e9c76..49ce38688 100644
--- a/eth/api_test.go
+++ b/eth/api_test.go
@@ -32,7 +32,7 @@ func TestStorageRangeAt(t *testing.T) {
// Create a state where account 0x010000... has a few storage entries.
var (
db, _ = ethdb.NewMemDatabase()
- state, _ = state.New(common.Hash{}, db)
+ state, _ = state.New(common.Hash{}, state.NewDatabase(db))
addr = common.Address{0x01}
keys = []common.Hash{ // hashes of Keys of storage
common.HexToHash("340dd630ad21bf010b4e676dbfa9ba9a02175262d1fa356232cfde6cb5b47ef2"),
diff --git a/eth/backend.go b/eth/backend.go
index 639792333..1cd9e8fff 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@@ -53,20 +54,24 @@ type LesServer interface {
Start(srvr *p2p.Server)
Stop()
Protocols() []p2p.Protocol
+ SetBloomBitsIndexer(bbIndexer *core.ChainIndexer)
}
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
+ config *Config
chainConfig *params.ChainConfig
+
// Channel for shutting down the service
- shutdownChan chan bool // Channel for shutting down the ethereum
- stopDbUpgrade func() // stop chain db sequential key upgrade
+ shutdownChan chan bool // Channel for shutting down the ethereum
+ stopDbUpgrade func() error // stop chain db sequential key upgrade
+
// Handlers
txPool *core.TxPool
- txMu sync.Mutex
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
+
// DB interfaces
chainDb ethdb.Database // Block chain database
@@ -74,6 +79,9 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
+
ApiBackend *EthApiBackend
miner *miner.Miner
@@ -88,7 +96,7 @@ type Ethereum struct {
func (s *Ethereum) AddLesServer(ls LesServer) {
s.lesServer = ls
- s.protocolManager.lesServer = ls
+ ls.SetBloomBitsIndexer(s.bloomIndexer)
}
// New creates a new Ethereum object (including the
@@ -100,12 +108,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
-
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
- stopDbUpgrade := upgradeSequentialKeys(chainDb)
+ stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
@@ -113,6 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
+ config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
@@ -123,11 +131,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
- if err := addMipmapBloomBins(chainDb); err != nil {
- return nil, err
- }
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
@@ -139,7 +146,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
- eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig)
+ eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
@@ -149,25 +156,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
+ eth.bloomIndexer.Start(eth.blockchain)
- newPool := core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
- eth.txPool = newPool
-
- maxPeers := config.MaxPeers
- if config.LightServ > 0 {
- // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections
- // temporary solution until the new peer connectivity API is finished
- halfPeers := maxPeers / 2
- maxPeers -= config.LightPeers
- if maxPeers < halfPeers {
- maxPeers = halfPeers
- }
+ if config.TxPool.Journal != "" {
+ config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
+ eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
- if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
+ if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
-
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
@@ -201,10 +199,13 @@ func makeExtraData(extra []byte) []byte {
// CreateDB creates the chain database.
func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) {
db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles)
+ if err != nil {
+ return nil, err
+ }
if db, ok := db.(*ethdb.LDBDatabase); ok {
db.Meter("eth/db/chaindata/")
}
- return db, err
+ return db, nil
}
// CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service
@@ -328,7 +329,7 @@ func (s *Ethereum) StartMining(local bool) error {
wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
if wallet == nil || err != nil {
log.Error("Etherbase account unavailable locally", "err", err)
- return fmt.Errorf("singer missing: %v", err)
+ return fmt.Errorf("signer missing: %v", err)
}
clique.Authorize(eb, wallet.SignHash)
}
@@ -363,17 +364,29 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
- } else {
- return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
+ return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers()
+
+ // Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
- s.protocolManager.Start()
+ // Figure out a max peers count based on the server limits
+ maxPeers := srvr.MaxPeers
+ if s.config.LightServ > 0 {
+ maxPeers -= s.config.LightPeers
+ if maxPeers < srvr.MaxPeers/2 {
+ maxPeers = srvr.MaxPeers / 2
+ }
+ }
+ // Start the networking layer and the light server if requested
+ s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
@@ -386,6 +399,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
+ s.bloomIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
diff --git a/eth/backend_test.go b/eth/backend_test.go
deleted file mode 100644
index f60e3214c..000000000
--- a/eth/backend_test.go
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package eth
-
-import (
- "math/big"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/params"
-)
-
-func TestMipmapUpgrade(t *testing.T) {
- db, _ := ethdb.NewMemDatabase()
- addr := common.BytesToAddress([]byte("jeff"))
- genesis := new(core.Genesis).MustCommit(db)
-
- chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
- switch i {
- case 1:
- receipt := types.NewReceipt(nil, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
- case 2:
- receipt := types.NewReceipt(nil, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
- }
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- t.Fatal(err)
- }
- })
- for i, block := range chain {
- core.WriteBlock(db, block)
- if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil {
- t.Fatal("error writing block receipts:", err)
- }
- }
-
- err := addMipmapBloomBins(db)
- if err != nil {
- t.Fatal(err)
- }
-
- bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0])
- if (bloom == types.Bloom{}) {
- t.Error("got empty bloom filter")
- }
-
- data, _ := db.Get([]byte("setting-mipmap-version"))
- if len(data) == 0 {
- t.Error("setting-mipmap-version not written to database")
- }
-}
diff --git a/eth/bind.go b/eth/bind.go
index e5abd8617..d09977dbc 100644
--- a/eth/bind.go
+++ b/eth/bind.go
@@ -43,7 +43,7 @@ type ContractBackend struct {
}
// NewContractBackend creates a new native contract backend using an existing
-// Etheruem object.
+// Ethereum object.
func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend {
return &ContractBackend{
eapi: ethapi.NewPublicEthereumAPI(apiBackend),
@@ -54,14 +54,12 @@ func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend {
// CodeAt retrieves any code associated with the contract from the local API.
func (b *ContractBackend) CodeAt(ctx context.Context, contract common.Address, blockNum *big.Int) ([]byte, error) {
- out, err := b.bcapi.GetCode(ctx, contract, toBlockNumber(blockNum))
- return common.FromHex(out), err
+ return b.bcapi.GetCode(ctx, contract, toBlockNumber(blockNum))
}
// CodeAt retrieves any code associated with the contract from the local API.
func (b *ContractBackend) PendingCodeAt(ctx context.Context, contract common.Address) ([]byte, error) {
- out, err := b.bcapi.GetCode(ctx, contract, rpc.PendingBlockNumber)
- return common.FromHex(out), err
+ return b.bcapi.GetCode(ctx, contract, rpc.PendingBlockNumber)
}
// ContractCall implements bind.ContractCaller executing an Ethereum contract
diff --git a/eth/bloombits.go b/eth/bloombits.go
new file mode 100644
index 000000000..c5597391c
--- /dev/null
+++ b/eth/bloombits.go
@@ -0,0 +1,143 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ bloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ bloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ bloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ bloomRetrievalWait = time.Duration(0)
+)
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (eth *Ethereum) startBloomHandlers() {
+ for i := 0; i < bloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-eth.shutdownChan:
+ return
+
+ case request := <-eth.bloomRequests:
+ task := <-request
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
+ if compVector, err := core.GetBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
+ if blob, err := bitutil.DecompressBytes(compVector, int(params.BloomBitsBlocks)/8); err == nil {
+ task.Bitsets[i] = blob
+ } else {
+ task.Error = err
+ }
+ } else {
+ task.Error = err
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+}
+
+const (
+ // bloomConfirms is the number of confirmation blocks before a bloom section is
+ // considered probably final and its rotated bits are calculated.
+ bloomConfirms = 256
+
+ // bloomThrottling is the time to wait between processing two consecutive index
+ // sections. It's useful during chain upgrades to prevent disk overload.
+ bloomThrottling = 100 * time.Millisecond
+)
+
+// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type BloomIndexer struct {
+ size uint64 // section size to generate bloombits for
+
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
+}
+
+// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
+// canonical chain for fast logs filtering.
+func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
+ backend := &BloomIndexer{
+ db: db,
+ size: size,
+ }
+ table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix))
+
+ return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
+}
+
+// Reset implements core.ChainIndexerBackend, starting a new bloombits index
+// section.
+func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error {
+ gen, err := bloombits.NewGenerator(uint(b.size))
+ b.gen, b.section, b.head = gen, section, common.Hash{}
+ return err
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *BloomIndexer) Process(header *types.Header) {
+ b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
+ b.head = header.Hash()
+}
+
+// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *BloomIndexer) Commit() error {
+ batch := b.db.NewBatch()
+
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ return err
+ }
+ core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
+ }
+ return batch.Write()
+}
diff --git a/eth/config.go b/eth/config.go
index 4109cff8b..7bcfd403e 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -79,7 +79,6 @@ type Config struct {
// Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
- MaxPeers int `toml:"-"` // Maximum number of global peers
// Database options
SkipBcVersionCheck bool `toml:"-"`
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index 82cdd7e55..d41afa17c 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -19,278 +19,117 @@ package eth
import (
"bytes"
- "encoding/binary"
- "fmt"
- "math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
-var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys")
+var deduplicateData = []byte("dbUpgrade_20170714deduplicateData")
-// upgradeSequentialKeys checks the chain database version and
+// upgradeDeduplicateData checks the chain database version and
// starts a background process to make upgrades if necessary.
// Returns a stop function that blocks until the process has
// been safely stopped.
-func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) {
- data, _ := db.Get(useSequentialKeys)
+func upgradeDeduplicateData(db ethdb.Database) func() error {
+ // If the database is already converted or empty, bail out
+ data, _ := db.Get(deduplicateData)
if len(data) > 0 && data[0] == 42 {
- return nil // already converted
+ return nil
}
-
if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 {
- db.Put(useSequentialKeys, []byte{42})
- return nil // empty database, nothing to do
+ db.Put(deduplicateData, []byte{42})
+ return nil
}
-
- log.Warn("Upgrading chain database to use sequential keys")
-
- stopChn := make(chan struct{})
- stoppedChn := make(chan struct{})
+ // Start the deduplication upgrade on a new goroutine
+ log.Warn("Upgrading database to use lookup entries")
+ stop := make(chan chan error)
go func() {
- stopFn := func() bool {
- select {
- case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved
- case <-stopChn:
- return true
- }
- return false
- }
-
- err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn)
- if err == nil && !stopped {
- err, stopped = upgradeSequentialBlocks(db, stopFn)
- }
- if err == nil && !stopped {
- err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn)
- }
- if err == nil && !stopped {
- log.Info("Database conversion successful")
- db.Put(useSequentialKeys, []byte{42})
- }
- if err != nil {
- log.Error("Database conversion failed", "err", err)
- }
- close(stoppedChn)
- }()
-
- return func() {
- close(stopChn)
- <-stoppedChn
- }
-}
-
-// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from
-// the database, writes them in new format and deletes the old ones if successful.
-func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("block-num-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer func() {
- it.Release()
- }()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- keyPtr := it.Key()
- if len(keyPtr) < 20 {
- cnt++
- if cnt%100000 == 0 {
+ // Create an iterator to read the entire database and covert old lookup entires
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ defer func() {
+ if it != nil {
it.Release()
- it = db.(*ethdb.LDBDatabase).NewIterator()
- it.Seek(keyPtr)
- log.Info("Converting canonical numbers", "count", cnt)
}
- number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64()
- newKey := []byte("h12345678n")
- binary.BigEndian.PutUint64(newKey[1:9], number)
- if err := db.Put(newKey, it.Value()); err != nil {
- return err, false
+ }()
+
+ var (
+ converted uint64
+ failed error
+ )
+ for failed == nil && it.Next() {
+ // Skip any entries that don't look like old transaction meta entires (<hash>0x01)
+ key := it.Key()
+ if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 {
+ continue
}
- if err := db.Delete(keyPtr); err != nil {
- return err, false
+ // Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>)
+ var meta struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
}
- }
-
- if stopFn() {
- return nil, true
- }
- it.Next()
- }
- if cnt > 0 {
- log.Info("converted canonical numbers", "count", cnt)
- }
- return nil, false
-}
-
-// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block
-// receipts from the database, writes them in new format and deletes the old ones
-// if successful.
-func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("block-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer func() {
- it.Release()
- }()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- keyPtr := it.Key()
- if len(keyPtr) >= 38 {
- cnt++
- if cnt%10000 == 0 {
- it.Release()
- it = db.(*ethdb.LDBDatabase).NewIterator()
- it.Seek(keyPtr)
- log.Info("Converting blocks", "count", cnt)
+ if err := rlp.DecodeBytes(it.Value(), &meta); err != nil {
+ continue
}
- // convert header, body, td and block receipts
- var keyPrefix [38]byte
- copy(keyPrefix[:], keyPtr[0:38])
- hash := keyPrefix[6:38]
- if err := upgradeSequentialBlockData(db, hash); err != nil {
- return err, false
- }
- // delete old db entries belonging to this hash
- for bytes.HasPrefix(it.Key(), keyPrefix[:]) {
- if err := db.Delete(it.Key()); err != nil {
- return err, false
+ // Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix))
+ hash := key[:common.HashLength]
+
+ if hash[0] == byte('l') {
+ // Potential clash, the "old" `hash` must point to a live transaction.
+ if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) {
+ continue
}
- it.Next()
}
- if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil {
- return err, false
+ // Convert the old metadata to a new lookup entry, delete duplicate data
+ if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry
+ if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data
+ if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data
+ if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata
+ break
+ }
+ }
+ }
}
- } else {
- it.Next()
- }
-
- if stopFn() {
- return nil, true
- }
- }
- if cnt > 0 {
- log.Info("Converted blocks", "count", cnt)
- }
- return nil, false
-}
-
-// upgradeSequentialOrphanedReceipts removes any old format block receipts from the
-// database that did not have a corresponding block
-func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) {
- prefix := []byte("receipts-block-")
- it := db.(*ethdb.LDBDatabase).NewIterator()
- defer it.Release()
- it.Seek(prefix)
- cnt := 0
- for bytes.HasPrefix(it.Key(), prefix) {
- // phase 2 already converted receipts belonging to existing
- // blocks, just remove if there's anything left
- cnt++
- if err := db.Delete(it.Key()); err != nil {
- return err, false
- }
-
- if stopFn() {
- return nil, true
- }
- it.Next()
- }
- if cnt > 0 {
- log.Info("Removed orphaned block receipts", "count", cnt)
- }
- return nil, false
-}
+ // Bump the conversion counter, and recreate the iterator occasionally to
+ // avoid too high memory consumption.
+ converted++
+ if converted%100000 == 0 {
+ it.Release()
+ it = db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(key)
-// upgradeSequentialBlockData upgrades the header, body, td and block receipts
-// database entries belonging to a single hash (doesn't delete old data).
-func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error {
- // get old chain data and block number
- headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...))
- if len(headerRLP) == 0 {
- return nil
- }
- header := new(types.Header)
- if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil {
- return err
- }
- number := header.Number.Uint64()
- bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...))
- tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...))
- receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...))
- // store new hash -> number association
- encNum := make([]byte, 8)
- binary.BigEndian.PutUint64(encNum, number)
- if err := db.Put(append([]byte("H"), hash...), encNum); err != nil {
- return err
- }
- // store new chain data
- if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil {
- return err
- }
- if len(tdRLP) != 0 {
- if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil {
- return err
- }
- }
- if len(bodyRLP) != 0 {
- if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil {
- return err
- }
- }
- if len(receiptsRLP) != 0 {
- if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil {
- return err
+ log.Info("Deduplicating database entries", "deduped", converted)
+ }
+ // Check for termination, or continue after a bit of a timeout
+ select {
+ case errc := <-stop:
+ errc <- nil
+ return
+ case <-time.After(time.Microsecond * 100):
+ }
}
- }
- return nil
-}
-
-func addMipmapBloomBins(db ethdb.Database) (err error) {
- const mipmapVersion uint = 2
-
- // check if the version is set. We ignore data for now since there's
- // only one version so we can easily ignore it for now
- var data []byte
- data, _ = db.Get([]byte("setting-mipmap-version"))
- if len(data) > 0 {
- var version uint
- if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion {
- return nil
+ // Upgrade finished, mark a such and terminate
+ if failed == nil {
+ log.Info("Database deduplication successful", "deduped", converted)
+ db.Put(deduplicateData, []byte{42})
+ } else {
+ log.Error("Database deduplication failed", "deduped", converted, "err", failed)
}
- }
+ it.Release()
+ it = nil
- defer func() {
- if err == nil {
- var val []byte
- val, err = rlp.EncodeToBytes(mipmapVersion)
- if err == nil {
- err = db.Put([]byte("setting-mipmap-version"), val)
- }
- return
- }
+ errc := <-stop
+ errc <- failed
}()
- latestHash := core.GetHeadBlockHash(db)
- latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash))
- if latestBlock == nil { // clean database
- return
- }
-
- tstart := time.Now()
- log.Warn("Upgrading db log bloom bins")
- for i := uint64(0); i <= latestBlock.NumberU64(); i++ {
- hash := core.GetCanonicalHash(db, i)
- if (hash == common.Hash{}) {
- return fmt.Errorf("chain db corrupted. Could not find block %d.", i)
- }
- core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i))
+ // Assembly the cancellation callback
+ return func() error {
+ errc := make(chan error)
+ stop <- errc
+ return <-errc
}
- log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart)))
- return nil
}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 839969f03..cca4fe7a9 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)
@@ -99,8 +98,9 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ stateDB ethdb.Database
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
@@ -109,26 +109,16 @@ type Downloader struct {
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
- syncStatsChainOrigin uint64 // Origin block number where syncing started at
- syncStatsChainHeight uint64 // Highest block number known when syncing started
- syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+ lightchain LightChain
+ blockchain BlockChain
+
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
- commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- rollback chainRollbackFn // Removes a batch of recently added chain links
- dropPeer peerDropFn // Drops a peer for misbehaving
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -136,16 +126,18 @@ type Downloader struct {
notified int32
// Channels
- newPeerCh chan *peer
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ // for stateFetcher
+ stateSyncStart chan *stateSync
+ trackStateReq chan *stateReq
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
@@ -161,45 +153,83 @@ type Downloader struct {
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
+// LightChain encapsulates functions required to synchronise a light chain.
+type LightChain interface {
+ // HasHeader verifies a header's presence in the local chain.
+ HasHeader(h common.Hash, number uint64) bool
+
+ // GetHeaderByHash retrieves a header from the local chain.
+ GetHeaderByHash(common.Hash) *types.Header
+
+ // CurrentHeader retrieves the head header from the local chain.
+ CurrentHeader() *types.Header
+
+ // GetTdByHash returns the total difficulty of a local block.
+ GetTdByHash(common.Hash) *big.Int
+
+ // InsertHeaderChain inserts a batch of headers into the local chain.
+ InsertHeaderChain([]*types.Header, int) (int, error)
+
+ // Rollback removes a few recently added elements from the local chain.
+ Rollback([]common.Hash)
+}
+
+// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
+type BlockChain interface {
+ LightChain
+
+ // HasBlockAndState verifies block and associated states' presence in the local chain.
+ HasBlockAndState(common.Hash) bool
+
+ // GetBlockByHash retrieves a block from the local chain.
+ GetBlockByHash(common.Hash) *types.Block
+
+ // CurrentBlock retrieves the head block from the local chain.
+ CurrentBlock() *types.Block
+
+ // CurrentFastBlock retrieves the head fast block from the local chain.
+ CurrentFastBlock() *types.Block
+
+ // FastSyncCommitHead directly commits the head block to a certain entity.
+ FastSyncCommitHead(common.Hash) error
+
+ // InsertChain inserts a batch of blocks into the local chain.
+ InsertChain(types.Blocks) (int, error)
+
+ // InsertReceiptChain inserts a batch of receipts into the local chain.
+ InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
+}
+
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
- getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
- headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
- insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+ if lightchain == nil {
+ lightchain = chain
+ }
dl := &Downloader{
- mode: mode,
- mux: mux,
- queue: newQueue(stateDb),
- peers: newPeerSet(),
- rttEstimate: uint64(rttMaxEstimate),
- rttConfidence: uint64(1000000),
- hasHeader: hasHeader,
- hasBlockAndState: hasBlockAndState,
- getHeader: getHeader,
- getBlock: getBlock,
- headHeader: headHeader,
- headBlock: headBlock,
- headFastBlock: headFastBlock,
- commitHeadBlock: commitHeadBlock,
- getTd: getTd,
- insertHeaders: insertHeaders,
- insertBlocks: insertBlocks,
- insertReceipts: insertReceipts,
- rollback: rollback,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- headerCh: make(chan dataPack, 1),
- bodyCh: make(chan dataPack, 1),
- receiptCh: make(chan dataPack, 1),
- stateCh: make(chan dataPack, 1),
- bodyWakeCh: make(chan bool, 1),
- receiptWakeCh: make(chan bool, 1),
- stateWakeCh: make(chan bool, 1),
- headerProcCh: make(chan []*types.Header, 1),
- quitCh: make(chan struct{}),
+ mode: mode,
+ stateDB: stateDb,
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ rttEstimate: uint64(rttMaxEstimate),
+ rttConfidence: uint64(1000000),
+ blockchain: chain,
+ lightchain: lightchain,
+ dropPeer: dropPeer,
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ headerProcCh: make(chan []*types.Header, 1),
+ quitCh: make(chan struct{}),
+ stateCh: make(chan dataPack),
+ stateSyncStart: make(chan *stateSync),
+ trackStateReq: make(chan *stateReq),
}
go dl.qosTuner()
+ go dl.stateFetcher()
return dl
}
@@ -211,9 +241,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
func (d *Downloader) Progress() ethereum.SyncProgress {
- // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
- pendingStates := uint64(d.queue.PendingNodeData())
-
// Lock the current stats and return the progress
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
@@ -221,18 +248,18 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
current := uint64(0)
switch d.mode {
case FullSync:
- current = d.headBlock().NumberU64()
+ current = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
- current = d.headFastBlock().NumberU64()
+ current = d.blockchain.CurrentFastBlock().NumberU64()
case LightSync:
- current = d.headHeader().Number.Uint64()
+ current = d.lightchain.CurrentHeader().Number.Uint64()
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
- PulledStates: d.syncStatsStateDone,
- KnownStates: d.syncStatsStateDone + pendingStates,
+ PulledStates: d.syncStatsState.processed,
+ KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}
@@ -243,13 +270,11 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
-func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
+func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
logger := log.New("peer", id)
logger.Trace("Registering sync peer")
- if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil {
+ if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
logger.Error("Failed to register sync peer", "err", err)
return err
}
@@ -258,6 +283,11 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
return nil
}
+// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
+func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
+ return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
+}
+
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
@@ -324,13 +354,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
- for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
@@ -369,7 +399,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
-func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
+func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
@@ -439,30 +469,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
- return d.spawnSync(origin+1,
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
- )
+
+ fetchers := []func() error{
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, td) },
+ }
+ if d.mode == FastSync {
+ fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+ } else if d.mode == FullSync {
+ fetchers = append(fetchers, d.processFullSyncContent)
+ }
+ err = d.spawnSync(fetchers)
+ if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
+ // If sync failed in the critical section, bump the fail counter.
+ atomic.AddUint32(&d.fsPivotFails, 1)
+ }
+ return err
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
+func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
- errc := make(chan error, len(fetchers)+1)
- wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.processContent() }()
+ errc := make(chan error, len(fetchers))
+ wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
- for i := 0; i < len(fetchers)+1; i++ {
- if i == len(fetchers) {
+ for i := 0; i < len(fetchers); i++ {
+ if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
@@ -475,11 +515,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.Cancel()
wg.Wait()
-
- // If sync failed in the critical section, bump the fail counter
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
return err
}
@@ -517,12 +552,12 @@ func (d *Downloader) Terminate() {
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
-func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
+func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Retrieving remote chain height")
// Request the advertised remote head block and wait for the response
- head, _ := p.currentHead()
- go p.getRelHeaders(head, 1, 0, false)
+ head, _ := p.peer.Head()
+ go p.peer.RequestHeadersByHash(head, 1, 0, false)
ttl := d.requestTTL()
timeout := time.After(ttl)
@@ -552,7 +587,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
return nil, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -564,15 +598,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
-func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
+func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
- floor, ceil := int64(-1), d.headHeader().Number.Uint64()
+ floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
if d.mode == FullSync {
- ceil = d.headBlock().NumberU64()
+ ceil = d.blockchain.CurrentBlock().NumberU64()
} else if d.mode == FastSync {
- ceil = d.headFastBlock().NumberU64()
+ ceil = d.blockchain.CurrentFastBlock().NumberU64()
}
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
@@ -592,7 +626,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
if count > limit {
count = limit
}
- go p.getAbsHeaders(uint64(from), count, 15, false)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -632,7 +666,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue
}
// Otherwise check if we already know the header or not
- if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
@@ -649,7 +683,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -675,7 +708,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
ttl := d.requestTTL()
timeout := time.After(ttl)
- go p.getAbsHeaders(uint64(check), 1, 0, false)
+ go p.peer.RequestHeadersByNumber(check, 1, 0, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
@@ -698,11 +731,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
arrived = true
// Modify the search interval based on the response
- if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
+ if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
end = check
break
}
- header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
+ header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
@@ -714,7 +747,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -737,7 +769,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
-func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")
@@ -757,10 +789,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
- go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
} else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
- go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
+ go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
@@ -827,7 +859,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -862,12 +894,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
}
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
- capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+ capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
@@ -891,9 +923,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
- capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
+ capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -915,9 +947,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
- capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
+ fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
+ capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
+ setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -927,68 +959,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
-// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
-// available peers, reserving a chunk of nodes for each, waiting for delivery and
-// also periodically checking for timeouts.
-func (d *Downloader) fetchNodeData() error {
- log.Debug("Downloading node state data")
-
- var (
- deliver = func(packet dataPack) (int, error) {
- start := time.Now()
- return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
- // If the peer returned old-requested data, forgive
- if err == trie.ErrNotRequested {
- log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
- return
- }
- if err != nil {
- // If the node data processing failed, the root hash is very wrong, abort
- log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
- d.Cancel()
- return
- }
- // Processing succeeded, notify state fetcher of continuation
- pending := d.queue.PendingNodeData()
- if pending > 0 {
- select {
- case d.stateWakeCh <- true:
- default:
- }
- }
- d.syncStatsLock.Lock()
- d.syncStatsStateDone += uint64(delivered)
- syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
- d.syncStatsLock.Unlock()
-
- // If real database progress was made, reset any fast-sync pivot failure
- if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
- log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
- atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
- // Log a message to the user and return
- if delivered > 0 {
- log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
- }
- })
- }
- expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
- throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
- return d.queue.ReserveNodeData(p, count), false, nil
- }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
- )
- err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
- d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
- d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
-
- log.Debug("Node state data download terminated", "err", err)
- return err
-}
-
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
@@ -1015,9 +985,9 @@ func (d *Downloader) fetchNodeData() error {
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
- expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
- fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
+ fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
+ idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1123,6 +1093,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
throttled = true
break
}
+ // Short circuit if there is no more available task.
+ if pending() == 0 {
+ break
+ }
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
@@ -1182,29 +1156,25 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
for i, header := range rollback {
hashes[i] = header.Hash()
}
- lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
- if d.headFastBlock != nil {
- lastFastBlock = d.headFastBlock().Number()
- }
- if d.headBlock != nil {
- lastBlock = d.headBlock().Number()
+ lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
+ if d.mode != LightSync {
+ lastFastBlock = d.blockchain.CurrentFastBlock().Number()
+ lastBlock = d.blockchain.CurrentBlock().Number()
}
- d.rollback(hashes)
+ d.lightchain.Rollback(hashes)
curFastBlock, curBlock := common.Big0, common.Big0
- if d.headFastBlock != nil {
- curFastBlock = d.headFastBlock().Number()
- }
- if d.headBlock != nil {
- curBlock = d.headBlock().Number()
+ if d.mode != LightSync {
+ curFastBlock = d.blockchain.CurrentFastBlock().Number()
+ curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back headers", "count", len(hashes),
- "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number),
+ "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
- // If we didn't ever fail, lock in te pivot header (must! not! change!)
+ // If we didn't ever fail, lock in the pivot header (must! not! change!)
if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback {
if header.Number.Uint64() == pivot {
@@ -1229,7 +1199,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1248,7 +1218,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d.mode != LightSync {
- if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
+ if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 {
return errStallingPeer
}
}
@@ -1260,7 +1230,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
- if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
+ if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 {
return errStallingPeer
}
}
@@ -1290,7 +1260,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
- if !d.hasHeader(header.Hash()) {
+ if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
}
}
@@ -1299,7 +1269,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
- if n, err := d.insertHeaders(chunk, frequency); err != nil {
+ if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, chunk[:n]...)
@@ -1341,7 +1311,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
@@ -1351,71 +1321,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
-// processContent takes fetch results from the queue and tries to import them
-// into the chain. The type of import operation will depend on the result contents.
-func (d *Downloader) processContent() error {
- pivot := d.queue.FastSyncPivot()
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.WaitResults()
if len(results) == 0 {
- return nil // queue empty
+ return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
- // Actually import the blocks
- first, last := results[0].Header, results[len(results)-1].Header
+ if err := d.importBlockResults(results); err != nil {
+ return err
+ }
+ }
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+ for len(results) != 0 {
+ // Check for any termination requests. This makes clean shutdown faster.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnum", last.Number, "lasthash", last.Hash(),
)
- for len(results) != 0 {
- // Check for any termination requests
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- var (
- blocks = make([]*types.Block, 0, maxResultsProcess)
- receipts = make([]types.Receipts, 0, maxResultsProcess)
- )
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- for _, result := range results[:items] {
- switch {
- case d.mode == FullSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- case d.mode == FastSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- if result.Header.Number.Uint64() <= pivot {
- receipts = append(receipts, result.Receipts)
- }
- }
- }
- // Try to process the results, aborting if there's an error
- var (
- err error
- index int
- )
- switch {
- case len(receipts) > 0:
- index, err = d.insertReceipts(blocks, receipts)
- if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
- log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash())
- index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
- }
- default:
- index, err = d.insertBlocks(blocks)
+ blocks := make([]*types.Block, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+ // Start syncing state of the reported head block.
+ // This should get us most of the state of the pivot block.
+ stateSync := d.syncState(latest.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+
+ pivot := d.queue.FastSyncPivot()
+ for {
+ results := d.queue.WaitResults()
+ if len(results) == 0 {
+ return stateSync.Cancel()
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ P, beforeP, afterP := splitAroundPivot(pivot, results)
+ if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+ return err
+ }
+ if P != nil {
+ stateSync.Cancel()
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
}
- if err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ }
+ if err := d.importBlockResults(afterP); err != nil {
+ return err
+ }
+ }
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+ for _, result := range results {
+ num := result.Header.Number.Uint64()
+ switch {
+ case num < pivot:
+ before = append(before, result)
+ case num == pivot:
+ p = result
+ default:
+ after = append(after, result)
+ }
+ }
+ return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+ for len(results) != 0 {
+ // Check for any termination requests.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, items)
+ receipts := make([]types.Receipts, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+ b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ // Sync the pivot block state. This should complete reasonably quickly because
+ // we've already synced up to the reported head block state earlier.
+ if err := d.syncState(b.Root()).Wait(); err != nil {
+ return err
+ }
+ log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ return err
}
+ return d.blockchain.FastSyncCommitHead(b.Hash())
}
// DeliverHeaders injects a new batch of block headers received from a remote
@@ -1468,7 +1518,7 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
func (d *Downloader) qosTuner() {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
- rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+ rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
// A new RTT cycle passed, increase our confidence in the estimated RTT
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 267a0def9..58f6e9a62 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -96,9 +96,7 @@ func newTester() *downloadTester {
tester.stateDb, _ = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
- tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
- tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
- tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer)
+ tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
return tester
}
@@ -218,14 +216,14 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
return err
}
-// hasHeader checks if a header is present in the testers canonical chain.
-func (dl *downloadTester) hasHeader(hash common.Hash) bool {
- return dl.getHeader(hash) != nil
+// HasHeader checks if a header is present in the testers canonical chain.
+func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
+ return dl.GetHeaderByHash(hash) != nil
}
-// hasBlock checks if a block and associated state is present in the testers canonical chain.
-func (dl *downloadTester) hasBlock(hash common.Hash) bool {
- block := dl.getBlock(hash)
+// HasBlockAndState checks if a block and associated state is present in the testers canonical chain.
+func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool {
+ block := dl.GetBlockByHash(hash)
if block == nil {
return false
}
@@ -233,24 +231,24 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool {
return err == nil
}
-// getHeader retrieves a header from the testers canonical chain.
-func (dl *downloadTester) getHeader(hash common.Hash) *types.Header {
+// GetHeader retrieves a header from the testers canonical chain.
+func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.ownHeaders[hash]
}
-// getBlock retrieves a block from the testers canonical chain.
-func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
+// GetBlock retrieves a block from the testers canonical chain.
+func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.ownBlocks[hash]
}
-// headHeader retrieves the current head header from the canonical chain.
-func (dl *downloadTester) headHeader() *types.Header {
+// CurrentHeader retrieves the current head header from the canonical chain.
+func (dl *downloadTester) CurrentHeader() *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
@@ -262,8 +260,8 @@ func (dl *downloadTester) headHeader() *types.Header {
return dl.genesis.Header()
}
-// headBlock retrieves the current head block from the canonical chain.
-func (dl *downloadTester) headBlock() *types.Block {
+// CurrentBlock retrieves the current head block from the canonical chain.
+func (dl *downloadTester) CurrentBlock() *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
@@ -277,8 +275,8 @@ func (dl *downloadTester) headBlock() *types.Block {
return dl.genesis
}
-// headFastBlock retrieves the current head fast-sync block from the canonical chain.
-func (dl *downloadTester) headFastBlock() *types.Block {
+// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
+func (dl *downloadTester) CurrentFastBlock() *types.Block {
dl.lock.RLock()
defer dl.lock.RUnlock()
@@ -290,26 +288,26 @@ func (dl *downloadTester) headFastBlock() *types.Block {
return dl.genesis
}
-// commitHeadBlock manually sets the head block to a given hash.
-func (dl *downloadTester) commitHeadBlock(hash common.Hash) error {
+// FastSyncCommitHead manually sets the head block to a given hash.
+func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
// For now only check that the state trie is correct
- if block := dl.getBlock(hash); block != nil {
+ if block := dl.GetBlockByHash(hash); block != nil {
_, err := trie.NewSecure(block.Root(), dl.stateDb, 0)
return err
}
return fmt.Errorf("non existent block: %x", hash[:4])
}
-// getTd retrieves the block's total difficulty from the canonical chain.
-func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
+// GetTdByHash retrieves the block's total difficulty from the canonical chain.
+func (dl *downloadTester) GetTdByHash(hash common.Hash) *big.Int {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.ownChainTd[hash]
}
-// insertHeaders injects a new batch of headers into the simulated chain.
-func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int) (int, error) {
+// InsertHeaderChain injects a new batch of headers into the simulated chain.
+func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -337,8 +335,8 @@ func (dl *downloadTester) insertHeaders(headers []*types.Header, checkFreq int)
return len(headers), nil
}
-// insertBlocks injects a new batch of blocks into the simulated chain.
-func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
+// InsertChain injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -359,8 +357,8 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
return len(blocks), nil
}
-// insertReceipts injects a new batch of receipts into the simulated chain.
-func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) {
+// InsertReceiptChain injects a new batch of receipts into the simulated chain.
+func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -377,8 +375,8 @@ func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.R
return len(blocks), nil
}
-// rollback removes some recently added elements from the chain.
-func (dl *downloadTester) rollback(hashes []common.Hash) {
+// Rollback removes some recently added elements from the chain.
+func (dl *downloadTester) Rollback(hashes []common.Hash) {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -405,15 +403,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.lock.Lock()
defer dl.lock.Unlock()
- var err error
- switch version {
- case 62:
- err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
- case 63:
- err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
- case 64:
- err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
- }
+ var err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl: dl, id: id, delay: delay})
if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes))
@@ -471,139 +461,151 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id)
}
-// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash
+type downloadTesterPeer struct {
+ dl *downloadTester
+ id string
+ delay time.Duration
+ lock sync.RWMutex
+}
+
+// setDelay is a thread safe setter for the network delay value.
+func (dlp *downloadTesterPeer) setDelay(delay time.Duration) {
+ dlp.lock.Lock()
+ defer dlp.lock.Unlock()
+
+ dlp.delay = delay
+}
+
+// waitDelay is a thread safe way to sleep for the configured time.
+func (dlp *downloadTesterPeer) waitDelay() {
+ dlp.lock.RLock()
+ delay := dlp.delay
+ dlp.lock.RUnlock()
+
+ time.Sleep(delay)
+}
+
+// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
-func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) {
- return func() (common.Hash, *big.Int) {
- dl.lock.RLock()
- defer dl.lock.RUnlock()
+func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
- return dl.peerHashes[id][0], nil
- }
+ return dlp.dl.peerHashes[dlp.id][0], nil
}
-// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed
+// RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
-func (dl *downloadTester) peerGetRelHeadersFn(id string, delay time.Duration) func(common.Hash, int, int, bool) error {
- return func(origin common.Hash, amount int, skip int, reverse bool) error {
- // Find the canonical number of the hash
- dl.lock.RLock()
- number := uint64(0)
- for num, hash := range dl.peerHashes[id] {
- if hash == origin {
- number = uint64(len(dl.peerHashes[id]) - num - 1)
- break
- }
+func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
+ // Find the canonical number of the hash
+ dlp.dl.lock.RLock()
+ number := uint64(0)
+ for num, hash := range dlp.dl.peerHashes[dlp.id] {
+ if hash == origin {
+ number = uint64(len(dlp.dl.peerHashes[dlp.id]) - num - 1)
+ break
}
- dl.lock.RUnlock()
-
- // Use the absolute header fetcher to satisfy the query
- return dl.peerGetAbsHeadersFn(id, delay)(number, amount, skip, reverse)
}
+ dlp.dl.lock.RUnlock()
+
+ // Use the absolute header fetcher to satisfy the query
+ return dlp.RequestHeadersByNumber(number, amount, skip, reverse)
}
-// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered
+// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
-func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error {
- return func(origin uint64, amount int, skip int, reverse bool) error {
- time.Sleep(delay)
-
- dl.lock.RLock()
- defer dl.lock.RUnlock()
-
- // Gather the next batch of headers
- hashes := dl.peerHashes[id]
- headers := dl.peerHeaders[id]
- result := make([]*types.Header, 0, amount)
- for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ {
- if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok {
- result = append(result, header)
- }
+func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
+ dlp.waitDelay()
+
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
+
+ // Gather the next batch of headers
+ hashes := dlp.dl.peerHashes[dlp.id]
+ headers := dlp.dl.peerHeaders[dlp.id]
+ result := make([]*types.Header, 0, amount)
+ for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ {
+ if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok {
+ result = append(result, header)
}
- // Delay delivery a bit to allow attacks to unfold
- go func() {
- time.Sleep(time.Millisecond)
- dl.downloader.DeliverHeaders(id, result)
- }()
- return nil
}
+ // Delay delivery a bit to allow attacks to unfold
+ go func() {
+ time.Sleep(time.Millisecond)
+ dlp.dl.downloader.DeliverHeaders(dlp.id, result)
+ }()
+ return nil
}
-// peerGetBodiesFn constructs a getBlockBodies method associated with a particular
+// RequestBodies constructs a getBlockBodies method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block bodies from the particularly requested peer.
-func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error {
- return func(hashes []common.Hash) error {
- time.Sleep(delay)
+func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
+ dlp.waitDelay()
- dl.lock.RLock()
- defer dl.lock.RUnlock()
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
- blocks := dl.peerBlocks[id]
+ blocks := dlp.dl.peerBlocks[dlp.id]
- transactions := make([][]*types.Transaction, 0, len(hashes))
- uncles := make([][]*types.Header, 0, len(hashes))
+ transactions := make([][]*types.Transaction, 0, len(hashes))
+ uncles := make([][]*types.Header, 0, len(hashes))
- for _, hash := range hashes {
- if block, ok := blocks[hash]; ok {
- transactions = append(transactions, block.Transactions())
- uncles = append(uncles, block.Uncles())
- }
+ for _, hash := range hashes {
+ if block, ok := blocks[hash]; ok {
+ transactions = append(transactions, block.Transactions())
+ uncles = append(uncles, block.Uncles())
}
- go dl.downloader.DeliverBodies(id, transactions, uncles)
-
- return nil
}
+ go dlp.dl.downloader.DeliverBodies(dlp.id, transactions, uncles)
+
+ return nil
}
-// peerGetReceiptsFn constructs a getReceipts method associated with a particular
+// RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
-func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error {
- return func(hashes []common.Hash) error {
- time.Sleep(delay)
+func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
+ dlp.waitDelay()
- dl.lock.RLock()
- defer dl.lock.RUnlock()
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
- receipts := dl.peerReceipts[id]
+ receipts := dlp.dl.peerReceipts[dlp.id]
- results := make([][]*types.Receipt, 0, len(hashes))
- for _, hash := range hashes {
- if receipt, ok := receipts[hash]; ok {
- results = append(results, receipt)
- }
+ results := make([][]*types.Receipt, 0, len(hashes))
+ for _, hash := range hashes {
+ if receipt, ok := receipts[hash]; ok {
+ results = append(results, receipt)
}
- go dl.downloader.DeliverReceipts(id, results)
-
- return nil
}
+ go dlp.dl.downloader.DeliverReceipts(dlp.id, results)
+
+ return nil
}
-// peerGetNodeDataFn constructs a getNodeData method associated with a particular
+// RequestNodeData constructs a getNodeData method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of node state data from the particularly requested peer.
-func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error {
- return func(hashes []common.Hash) error {
- time.Sleep(delay)
-
- dl.lock.RLock()
- defer dl.lock.RUnlock()
-
- results := make([][]byte, 0, len(hashes))
- for _, hash := range hashes {
- if data, err := dl.peerDb.Get(hash.Bytes()); err == nil {
- if !dl.peerMissingStates[id][hash] {
- results = append(results, data)
- }
+func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
+ dlp.waitDelay()
+
+ dlp.dl.lock.RLock()
+ defer dlp.dl.lock.RUnlock()
+
+ results := make([][]byte, 0, len(hashes))
+ for _, hash := range hashes {
+ if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
+ if !dlp.dl.peerMissingStates[dlp.id][hash] {
+ results = append(results, data)
}
}
- go dl.downloader.DeliverNodeData(id, results)
-
- return nil
}
+ go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
+
+ return nil
}
// assertOwnChain checks if the local chain contains the correct number of items
@@ -657,7 +659,7 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot)
}
if index > 0 {
- if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil || err != nil {
+ if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(tester.stateDb)); statedb == nil || err != nil {
t.Fatalf("state reconstruction failed: %v", err)
}
}
@@ -1212,7 +1214,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("fast-attack", nil, mode); err == nil {
t.Fatalf("succeeded fast attacker synchronisation")
}
- if head := tester.headHeader().Number.Int64(); int(head) > MaxHeaderFetch {
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
}
// Attempt to sync with an attacker that feeds junk during the block import phase.
@@ -1226,11 +1228,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("block-attack", nil, mode); err == nil {
t.Fatalf("succeeded block attacker synchronisation")
}
- if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == FastSync {
- if head := tester.headBlock().NumberU64(); head != 0 {
+ if head := tester.CurrentBlock().NumberU64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
@@ -1251,11 +1253,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("withhold-attack", nil, mode); err == nil {
t.Fatalf("succeeded withholding attacker synchronisation")
}
- if head := tester.headHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
+ if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == FastSync {
- if head := tester.headBlock().NumberU64(); head != 0 {
+ if head := tester.CurrentBlock().NumberU64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
@@ -1396,7 +1398,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("peer-half", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1413,7 +1415,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("peer-full", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1469,7 +1471,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("fork A", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1489,7 +1491,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("fork B", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1550,7 +1552,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("faulty", nil, mode); err == nil {
- t.Fatalf("succeeded faulty synchronisation")
+ panic("succeeded faulty synchronisation")
}
}()
<-starting
@@ -1567,7 +1569,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("valid", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1628,7 +1630,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("attack", nil, mode); err == nil {
- t.Fatalf("succeeded attacker synchronisation")
+ panic("succeeded attacker synchronisation")
}
}()
<-starting
@@ -1645,7 +1647,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("valid", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1670,6 +1672,48 @@ func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64,
func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+type floodingTestPeer struct {
+ peer Peer
+ tester *downloadTester
+}
+
+func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
+func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
+ return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
+}
+func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
+ return ftp.peer.RequestBodies(hashes)
+}
+func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
+ return ftp.peer.RequestReceipts(hashes)
+}
+func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
+ return ftp.peer.RequestNodeData(hashes)
+}
+
+func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
+ deliveriesDone <- struct{}{}
+ }()
+ }
+ // Deliver the actual requested headers.
+ go ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
+ // None of the extra deliveries should block.
+ timeout := time.After(15 * time.Second)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+}
+
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
@@ -1677,7 +1721,6 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
defer master.terminate()
hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false)
- fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ {
tester := newTester()
tester.peerDb = master.peerDb
@@ -1685,29 +1728,11 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries.
- tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
- deliveriesDone := make(chan struct{}, 500)
- for i := 0; i < cap(deliveriesDone); i++ {
- peer := fmt.Sprintf("fake-peer%d", i)
- go func() {
- tester.downloader.DeliverHeaders(peer, fakeHeads)
- deliveriesDone <- struct{}{}
- }()
- }
- // Deliver the actual requested headers.
- impl := tester.peerGetAbsHeadersFn("peer", 0)
- go impl(from, count, skip, reverse)
- // None of the extra deliveries should block.
- timeout := time.After(15 * time.Second)
- for i := 0; i < cap(deliveriesDone); i++ {
- select {
- case <-deliveriesDone:
- case <-timeout:
- panic("blocked")
- }
- }
- return nil
+ tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
+ tester.downloader.peers.peers["peer"].peer,
+ tester,
}
+
if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err)
}
@@ -1739,7 +1764,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
}
- tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section
+ (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(500 * time.Millisecond) // Enough to reach the critical section
// Synchronise with the peer a few times and make sure they fail until the retry limit
for i := 0; i < int(fsCriticalTrials)-1; i++ {
@@ -1758,7 +1783,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
tester.lock.Lock()
tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]]
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
- tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0)
+ (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(0)
tester.lock.Unlock()
}
}
diff --git a/eth/downloader/fakepeer.go b/eth/downloader/fakepeer.go
new file mode 100644
index 000000000..b45acff7d
--- /dev/null
+++ b/eth/downloader/fakepeer.go
@@ -0,0 +1,160 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+// FakePeer is a mock downloader peer that operates on a local database instance
+// instead of being an actual live node. It's useful for testing and to implement
+// sync commands from an xisting local database.
+type FakePeer struct {
+ id string
+ db ethdb.Database
+ hc *core.HeaderChain
+ dl *Downloader
+}
+
+// NewFakePeer creates a new mock downloader peer with the given data sources.
+func NewFakePeer(id string, db ethdb.Database, hc *core.HeaderChain, dl *Downloader) *FakePeer {
+ return &FakePeer{id: id, db: db, hc: hc, dl: dl}
+}
+
+// Head implements downloader.Peer, returning the current head hash and number
+// of the best known header.
+func (p *FakePeer) Head() (common.Hash, *big.Int) {
+ header := p.hc.CurrentHeader()
+ return header.Hash(), header.Number
+}
+
+// RequestHeadersByHash implements downloader.Peer, returning a batch of headers
+// defined by the origin hash and the associaed query parameters.
+func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, reverse bool) error {
+ var (
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < amount {
+ origin := p.hc.GetHeaderByHash(hash)
+ if origin == nil {
+ break
+ }
+ number := origin.Number.Uint64()
+ headers = append(headers, origin)
+ if reverse {
+ for i := 0; i <= skip; i++ {
+ if header := p.hc.GetHeader(hash, number); header != nil {
+ hash = header.ParentHash
+ number--
+ } else {
+ unknown = true
+ break
+ }
+ }
+ } else {
+ var (
+ current = origin.Number.Uint64()
+ next = current + uint64(skip) + 1
+ )
+ if header := p.hc.GetHeaderByNumber(next); header != nil {
+ if p.hc.GetBlockHashesFromHash(header.Hash(), uint64(skip+1))[skip] == hash {
+ hash = header.Hash()
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ }
+ }
+ p.dl.DeliverHeaders(p.id, headers)
+ return nil
+}
+
+// RequestHeadersByNumber implements downloader.Peer, returning a batch of headers
+// defined by the origin number and the associaed query parameters.
+func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, reverse bool) error {
+ var (
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < amount {
+ origin := p.hc.GetHeaderByNumber(number)
+ if origin == nil {
+ break
+ }
+ if reverse {
+ if number >= uint64(skip+1) {
+ number -= uint64(skip + 1)
+ } else {
+ unknown = true
+ }
+ } else {
+ number += uint64(skip + 1)
+ }
+ headers = append(headers, origin)
+ }
+ p.dl.DeliverHeaders(p.id, headers)
+ return nil
+}
+
+// RequestBodies implements downloader.Peer, returning a batch of block bodies
+// corresponding to the specified block hashes.
+func (p *FakePeer) RequestBodies(hashes []common.Hash) error {
+ var (
+ txs [][]*types.Transaction
+ uncles [][]*types.Header
+ )
+ for _, hash := range hashes {
+ block := core.GetBlock(p.db, hash, p.hc.GetBlockNumber(hash))
+
+ txs = append(txs, block.Transactions())
+ uncles = append(uncles, block.Uncles())
+ }
+ p.dl.DeliverBodies(p.id, txs, uncles)
+ return nil
+}
+
+// RequestReceipts implements downloader.Peer, returning a batch of transaction
+// receipts corresponding to the specified block hashes.
+func (p *FakePeer) RequestReceipts(hashes []common.Hash) error {
+ var receipts [][]*types.Receipt
+ for _, hash := range hashes {
+ receipts = append(receipts, core.GetBlockReceipts(p.db, hash, p.hc.GetBlockNumber(hash)))
+ }
+ p.dl.DeliverReceipts(p.id, receipts)
+ return nil
+}
+
+// RequestNodeData implements downloader.Peer, returning a batch of state trie
+// nodes corresponding to the specified trie hashes.
+func (p *FakePeer) RequestNodeData(hashes []common.Hash) error {
+ var data [][]byte
+ for _, hash := range hashes {
+ if entry, err := p.db.Get(hash.Bytes()); err == nil {
+ data = append(data, entry)
+ }
+ }
+ p.dl.DeliverNodeData(p.id, data)
+ return nil
+}
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index 0d76c7dfd..58764ccf0 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -38,8 +38,6 @@ var (
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
- stateInMeter = metrics.NewMeter("eth/downloader/states/in")
- stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
- stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
- stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
+ stateInMeter = metrics.NewMeter("eth/downloader/states/in")
+ stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 15a912f1f..e638744ea 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@@ -38,24 +39,14 @@ const (
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
)
-// Head hash and total difficulty retriever for
-type currentHeadRetrievalFn func() (common.Hash, *big.Int)
-
-// Block header and body fetchers belonging to eth/62 and above
-type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
-type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
-type blockBodyFetcherFn func([]common.Hash) error
-type receiptFetcherFn func([]common.Hash) error
-type stateFetcherFn func([]common.Hash) error
-
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
-// peer represents an active peer from which hashes and blocks are retrieved.
-type peer struct {
+// peerConnection represents an active peer from which hashes and blocks are retrieved.
+type peerConnection struct {
id string // Unique identifier of the peer
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
@@ -77,37 +68,57 @@ type peer struct {
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
- currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer
-
- getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
- getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
- getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
-
- getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
- getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
+ peer Peer
version int // Eth protocol version number to switch strategies
log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex
}
-// newPeer create a new downloader peer, with specific hash and block retrieval
-// mechanisms.
-func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn, getNodeData stateFetcherFn, logger log.Logger) *peer {
+// LightPeer encapsulates the methods required to synchronise with a remote light peer.
+type LightPeer interface {
+ Head() (common.Hash, *big.Int)
+ RequestHeadersByHash(common.Hash, int, int, bool) error
+ RequestHeadersByNumber(uint64, int, int, bool) error
+}
+
+// Peer encapsulates the methods required to synchronise with a remote full peer.
+type Peer interface {
+ LightPeer
+ RequestBodies([]common.Hash) error
+ RequestReceipts([]common.Hash) error
+ RequestNodeData([]common.Hash) error
+}
- return &peer{
+// lightPeerWrapper wraps a LightPeer struct, stubbing out the Peer-only methods.
+type lightPeerWrapper struct {
+ peer LightPeer
+}
+
+func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
+func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error {
+ return w.peer.RequestHeadersByHash(h, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error {
+ return w.peer.RequestHeadersByNumber(i, amount, skip, reverse)
+}
+func (w *lightPeerWrapper) RequestBodies([]common.Hash) error {
+ panic("RequestBodies not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestReceipts([]common.Hash) error {
+ panic("RequestReceipts not supported in light client mode sync")
+}
+func (w *lightPeerWrapper) RequestNodeData([]common.Hash) error {
+ panic("RequestNodeData not supported in light client mode sync")
+}
+
+// newPeerConnection creates a new downloader peer.
+func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection {
+ return &peerConnection{
id: id,
lacking: make(map[common.Hash]struct{}),
- currentHead: currentHead,
- getRelHeaders: getRelHeaders,
- getAbsHeaders: getAbsHeaders,
- getBlockBodies: getBlockBodies,
-
- getReceipts: getReceipts,
- getNodeData: getNodeData,
+ peer: peer,
version: version,
log: logger,
@@ -115,7 +126,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
}
// Reset clears the internal state of a peer entity.
-func (p *peer) Reset() {
+func (p *peerConnection) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
@@ -133,7 +144,7 @@ func (p *peer) Reset() {
}
// FetchHeaders sends a header retrieval request to the remote peer.
-func (p *peer) FetchHeaders(from uint64, count int) error {
+func (p *peerConnection) FetchHeaders(from uint64, count int) error {
// Sanity check the protocol version
if p.version < 62 {
panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
@@ -145,13 +156,13 @@ func (p *peer) FetchHeaders(from uint64, count int) error {
p.headerStarted = time.Now()
// Issue the header retrieval request (absolut upwards without gaps)
- go p.getAbsHeaders(from, count, 0, false)
+ go p.peer.RequestHeadersByNumber(from, count, 0, false)
return nil
}
// FetchBodies sends a block body retrieval request to the remote peer.
-func (p *peer) FetchBodies(request *fetchRequest) error {
+func (p *peerConnection) FetchBodies(request *fetchRequest) error {
// Sanity check the protocol version
if p.version < 62 {
panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version))
@@ -167,13 +178,13 @@ func (p *peer) FetchBodies(request *fetchRequest) error {
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
- go p.getBlockBodies(hashes)
+ go p.peer.RequestBodies(hashes)
return nil
}
// FetchReceipts sends a receipt retrieval request to the remote peer.
-func (p *peer) FetchReceipts(request *fetchRequest) error {
+func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
// Sanity check the protocol version
if p.version < 63 {
panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version))
@@ -189,13 +200,13 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
- go p.getReceipts(hashes)
+ go p.peer.RequestReceipts(hashes)
return nil
}
// FetchNodeData sends a node state data retrieval request to the remote peer.
-func (p *peer) FetchNodeData(request *fetchRequest) error {
+func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
// Sanity check the protocol version
if p.version < 63 {
panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
@@ -206,12 +217,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
}
p.stateStarted = time.Now()
- // Convert the hash set to a retrievable slice
- hashes := make([]common.Hash, 0, len(request.Hashes))
- for hash := range request.Hashes {
- hashes = append(hashes, hash)
- }
- go p.getNodeData(hashes)
+ go p.peer.RequestNodeData(hashes)
return nil
}
@@ -219,41 +225,41 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured
// just now.
-func (p *peer) SetHeadersIdle(delivered int) {
+func (p *peerConnection) SetHeadersIdle(delivered int) {
p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
}
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured
// just now.
-func (p *peer) SetBlocksIdle(delivered int) {
+func (p *peerConnection) SetBlocksIdle(delivered int) {
p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
// requests. Its estimated body retrieval throughput is updated with that measured
// just now.
-func (p *peer) SetBodiesIdle(delivered int) {
+func (p *peerConnection) SetBodiesIdle(delivered int) {
p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
// retrieval requests. Its estimated receipt retrieval throughput is updated
// with that measured just now.
-func (p *peer) SetReceiptsIdle(delivered int) {
+func (p *peerConnection) SetReceiptsIdle(delivered int) {
p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
}
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
// data retrieval requests. Its estimated state retrieval throughput is updated
// with that measured just now.
-func (p *peer) SetNodeDataIdle(delivered int) {
+func (p *peerConnection) SetNodeDataIdle(delivered int) {
p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its estimated retrieval throughput is updated with that measured just now.
-func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
// Irrelevant of the scaling, make sure the peer ends up idle
defer atomic.StoreInt32(idle, 0)
@@ -280,7 +286,7 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
-func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
+func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -289,7 +295,7 @@ func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
-func (p *peer) BlockCapacity(targetRTT time.Duration) int {
+func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -298,7 +304,7 @@ func (p *peer) BlockCapacity(targetRTT time.Duration) int {
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
-func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
+func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -307,7 +313,7 @@ func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
-func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
+func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -317,7 +323,7 @@ func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.
-func (p *peer) MarkLacking(hash common.Hash) {
+func (p *peerConnection) MarkLacking(hash common.Hash) {
p.lock.Lock()
defer p.lock.Unlock()
@@ -332,7 +338,7 @@ func (p *peer) MarkLacking(hash common.Hash) {
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it).
-func (p *peer) Lacks(hash common.Hash) bool {
+func (p *peerConnection) Lacks(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -343,17 +349,29 @@ func (p *peer) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
+ peers map[string]*peerConnection
+ newPeerFeed event.Feed
+ peerDropFeed event.Feed
+ lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
+ peers: make(map[string]*peerConnection),
}
}
+// SubscribeNewPeers subscribes to peer arrival events.
+func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
+ return ps.newPeerFeed.Subscribe(ch)
+}
+
+// SubscribePeerDrops subscribes to peer departure events.
+func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
+ return ps.peerDropFeed.Subscribe(ch)
+}
+
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
@@ -371,15 +389,14 @@ func (ps *peerSet) Reset() {
// The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
-func (ps *peerSet) Register(p *peer) error {
+func (ps *peerSet) Register(p *peerConnection) error {
// Retrieve the current median RTT as a sane default
p.rtt = ps.medianRTT()
// Register the new peer with some meaningful defaults
ps.lock.Lock()
- defer ps.lock.Unlock()
-
if _, ok := ps.peers[p.id]; ok {
+ ps.lock.Unlock()
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
@@ -399,6 +416,9 @@ func (ps *peerSet) Register(p *peer) error {
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p
+ ps.lock.Unlock()
+
+ ps.newPeerFeed.Send(p)
return nil
}
@@ -406,17 +426,20 @@ func (ps *peerSet) Register(p *peer) error {
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[id]; !ok {
+ p, ok := ps.peers[id]
+ if !ok {
+ defer ps.lock.Unlock()
return errNotRegistered
}
delete(ps.peers, id)
+ ps.lock.Unlock()
+
+ ps.peerDropFeed.Send(p)
return nil
}
// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) *peerConnection {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -432,11 +455,11 @@ func (ps *peerSet) Len() int {
}
// AllPeers retrieves a flat list of all the peers within the set.
-func (ps *peerSet) AllPeers() []*peer {
+func (ps *peerSet) AllPeers() []*peerConnection {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
+ list := make([]*peerConnection, 0, len(ps.peers))
for _, p := range ps.peers {
list = append(list, p)
}
@@ -445,11 +468,11 @@ func (ps *peerSet) AllPeers() []*peer {
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation.
-func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
- idle := func(p *peer) bool {
+func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
- throughput := func(p *peer) float64 {
+ throughput := func(p *peerConnection) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.headerThroughput
@@ -459,11 +482,11 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation.
-func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
- idle := func(p *peer) bool {
+func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- throughput := func(p *peer) float64 {
+ throughput := func(p *peerConnection) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.blockThroughput
@@ -473,11 +496,11 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
// within the active peer set, ordered by their reputation.
-func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
- idle := func(p *peer) bool {
+func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
- throughput := func(p *peer) float64 {
+ throughput := func(p *peerConnection) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.receiptThroughput
@@ -487,11 +510,11 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
// peers within the active peer set, ordered by their reputation.
-func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
- idle := func(p *peer) bool {
+func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
+ idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
- throughput := func(p *peer) float64 {
+ throughput := func(p *peerConnection) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.stateThroughput
@@ -502,11 +525,11 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their measure throughput.
-func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
- idle, total := make([]*peer, 0, len(ps.peers)), 0
+ idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 855097c45..6926f1d8c 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -26,20 +26,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
-var (
- blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
- maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
-)
+var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
var (
errNoFetchesPending = errors.New("no fetches pending")
@@ -48,7 +41,7 @@ var (
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
- Peer *peer // Peer to which the request was sent
+ Peer *peerConnection // Peer to which the request was sent
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
@@ -94,15 +87,6 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
- stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order
- stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
- stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
- statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
-
- stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
- stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
- stateWriters int // [eth/63] Number of running state DB writer goroutines
-
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
@@ -112,7 +96,7 @@ type queue struct {
}
// newQueue creates a new download queue for scheduling block retrieval.
-func newQueue(stateDb ethdb.Database) *queue {
+func newQueue() *queue {
lock := new(sync.Mutex)
return &queue{
headerPendPool: make(map[string]*fetchRequest),
@@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
- stateTaskPool: make(map[common.Hash]int),
- stateTaskQueue: prque.New(),
- statePendPool: make(map[string]*fetchRequest),
- stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
active: sync.NewCond(lock),
lock: lock,
@@ -158,12 +138,6 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
- q.stateTaskIndex = 0
- q.stateTaskPool = make(map[common.Hash]int)
- q.stateTaskQueue.Reset()
- q.statePendPool = make(map[string]*fetchRequest)
- q.stateScheduler = nil
-
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
}
@@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
-// PendingNodeData retrieves the number of node data entries pending for retrieval.
-func (q *queue) PendingNodeData() int {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.pendingNodeDataLocked()
-}
-
-// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval.
-// The caller must hold q.lock.
-func (q *queue) pendingNodeDataLocked() int {
- var n int
- if q.stateScheduler != nil {
- n = q.stateScheduler.Pending()
- }
- // Ensure that PendingNodeData doesn't return 0 until all state is written.
- if q.stateWriters > 0 {
- n++
- }
- return n
-}
-
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func (q *queue) InFlightHeaders() bool {
@@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool {
return len(q.receiptPendPool) > 0
}
-// InFlightNodeData retrieves whether there are node data entry fetch requests
-// currently in flight.
-func (q *queue) InFlightNodeData() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return len(q.statePendPool)+q.stateWriters > 0
-}
-
-// Idle returns if the queue is fully idle or has some data still inside. This
-// method is used by the tester to detect termination events.
+// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
- queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
- pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
+ queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
- if q.stateScheduler != nil {
- queued += q.stateScheduler.Pending()
- }
return (queued + pending + cached) == 0
}
@@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
- if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
- // Pivoting point of the fast sync, switch the state retrieval to this
- log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash)
-
- q.stateTaskIndex = 0
- q.stateTaskPool = make(map[common.Hash]int)
- q.stateTaskQueue.Reset()
- for _, req := range q.statePendPool {
- req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
- }
-
- q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
- }
inserts = append(inserts, header)
q.headerHead = hash
from++
@@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int {
if result == nil || result.Pending > 0 {
return i
}
- // Special handling for the fast-sync pivot block:
- if q.mode == FastSync {
- bnum := result.Header.Number.Uint64()
- if bnum == q.fastSyncPivot {
- // If the state of the pivot block is not
- // available yet, we cannot proceed and return 0.
- //
- // Stop before processing the pivot block to ensure that
- // resultCache has space for fsHeaderForceVerify items. Not
- // doing this could leave us unable to download the required
- // amount of headers.
- if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 {
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- return i
- }
- }
- }
- // If we're just the fast sync pivot, stop as well
- // because the following batch needs different insertion.
- // This simplifies handling the switchover in d.process.
- if bnum == q.fastSyncPivot+1 && i > 0 {
- return i
}
}
}
@@ -481,7 +391,7 @@ func (q *queue) countProcessableItems() int {
// ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches.
-func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
+func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
@@ -519,85 +429,10 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
return request
}
-// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
-// any previously failed download.
-func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
- // Create a task generator to fetch status-fetch tasks if all schedules ones are done
- generator := func(max int) {
- if q.stateScheduler != nil {
- for _, hash := range q.stateScheduler.Missing(max) {
- q.stateTaskPool[hash] = q.stateTaskIndex
- q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
- q.stateTaskIndex++
- }
- }
- }
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates)
-}
-
-// reserveHashes reserves a set of hashes for the given peer, skipping previously
-// failed ones.
-//
-// Note, this method expects the queue lock to be already held for writing. The
-// reason the lock is not obtained in here is because the parameters already need
-// to access the queue, so they already need a lock anyway.
-func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
- // Short circuit if the peer's already downloading something (sanity check to
- // not corrupt state)
- if _, ok := pendPool[p.id]; ok {
- return nil
- }
- // Calculate an upper limit on the hashes we might fetch (i.e. throttling)
- allowance := maxPending
- if allowance > 0 {
- for _, request := range pendPool {
- allowance -= len(request.Hashes)
- }
- }
- // If there's a task generator, ask it to fill our task queue
- if taskGen != nil && taskQueue.Size() < allowance {
- taskGen(allowance - taskQueue.Size())
- }
- if taskQueue.Empty() {
- return nil
- }
- // Retrieve a batch of hashes, skipping previously failed ones
- send := make(map[common.Hash]int)
- skip := make(map[common.Hash]int)
-
- for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
- hash, priority := taskQueue.Pop()
- if p.Lacks(hash.(common.Hash)) {
- skip[hash.(common.Hash)] = int(priority)
- } else {
- send[hash.(common.Hash)] = int(priority)
- }
- }
- // Merge all the skipped hashes back
- for hash, index := range skip {
- taskQueue.Push(hash, float32(index))
- }
- // Assemble and return the block download request
- if len(send) == 0 {
- return nil
- }
- request := &fetchRequest{
- Peer: p,
- Hashes: send,
- Time: time.Now(),
- }
- pendPool[p.id] = request
-
- return request
-}
-
// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
-func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
+func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
@@ -610,7 +445,7 @@ func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
// any previously failed downloads. Beside the next batch of needed fetches, it
// also returns a flag whether empty receipts were queued requiring importing.
-func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) {
+func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash
}
@@ -627,7 +462,7 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error)
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
@@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
-// CancelNodeData aborts a node state data fetch request, returning all pending
-// hashes to the task queue.
-func (q *queue) CancelNodeData(request *fetchRequest) {
- q.cancel(request, q.stateTaskQueue, q.statePendPool)
-}
-
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
@@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) {
}
delete(q.receiptPendPool, peerId)
}
- if request, ok := q.statePendPool[peerId]; ok {
- for hash, index := range request.Hashes {
- q.stateTaskQueue.Push(hash, float32(index))
- }
- delete(q.statePendPool, peerId)
- }
}
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
@@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}
-// ExpireNodeData checks for in flight node data requests that exceeded a timeout
-// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
-}
-
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
@@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
-// DeliverNodeData injects a node state data retrieval response into the queue.
-// The method returns the number of node state accepted from the delivery.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Short circuit if the data was never requested
- request := q.statePendPool[id]
- if request == nil {
- return 0, errNoFetchesPending
- }
- stateReqTimer.UpdateSince(request.Time)
- delete(q.statePendPool, id)
-
- // If no data was retrieved, mark their hashes as unavailable for the origin peer
- if len(data) == 0 {
- for hash := range request.Hashes {
- request.Peer.MarkLacking(hash)
- }
- }
- // Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
- process := []trie.SyncResult{}
- for _, blob := range data {
- // Skip any state trie entries that were not requested
- hash := common.BytesToHash(crypto.Keccak256(blob))
- if _, ok := request.Hashes[hash]; !ok {
- errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
- continue
- }
- // Inject the next state trie item into the processing queue
- process = append(process, trie.SyncResult{Hash: hash, Data: blob})
- delete(request.Hashes, hash)
- delete(q.stateTaskPool, hash)
- }
- // Return all failed or missing fetches to the queue
- for hash, index := range request.Hashes {
- q.stateTaskQueue.Push(hash, float32(index))
- }
- if q.stateScheduler == nil {
- return 0, errNoFetchesPending
- }
-
- // Run valid nodes through the trie download scheduler. It writes completed nodes to a
- // batch, which is committed asynchronously. This may lead to over-fetches because the
- // scheduler treats everything as written after Process has returned, but it's
- // unlikely to be an issue in practice.
- batch := q.stateDatabase.NewBatch()
- progressed, nproc, procerr := q.stateScheduler.Process(process, batch)
- q.stateWriters += 1
- go func() {
- if procerr == nil {
- nproc = len(process)
- procerr = batch.Write()
- }
- // Return processing errors through the callback so the sync gets canceled. The
- // number of writers is decremented prior to the call so PendingNodeData will
- // return zero when the callback runs.
- q.lock.Lock()
- q.stateWriters -= 1
- q.lock.Unlock()
- callback(nproc, progressed, procerr)
- // Wake up WaitResults after the state has been written because it might be
- // waiting for completion of the pivot block's state download.
- q.active.Signal()
- }()
-
- // If none of the data items were good, it's a stale delivery
- switch {
- case len(errs) == 0:
- return len(process), nil
- case len(errs) == len(request.Hashes):
- return len(process), errStaleDelivery
- default:
- return len(process), fmt.Errorf("multiple failures: %v", errs)
- }
-}
-
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
@@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.
}
q.fastSyncPivot = pivot
q.mode = mode
-
- // If long running fast sync, also start up a head stateretrieval immediately
- if mode == FastSync && pivot > 0 {
- q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
- }
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
new file mode 100644
index 000000000..a0b05c9be
--- /dev/null
+++ b/eth/downloader/statesync.go
@@ -0,0 +1,475 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "hash"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+// stateReq represents a batch of state fetch requests groupped together into
+// a single data retrieval network packet.
+type stateReq struct {
+ items []common.Hash // Hashes of the state items to download
+ tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
+ timeout time.Duration // Maximum round trip time for this to complete
+ timer *time.Timer // Timer to fire when the RTT timeout expires
+ peer *peerConnection // Peer that we're requesting from
+ response [][]byte // Response data of the peer (nil for timeouts)
+ dropped bool // Flag whether the peer dropped off early
+}
+
+// timedOut returns if this request timed out.
+func (req *stateReq) timedOut() bool {
+ return req.response == nil
+}
+
+// stateSyncStats is a collection of progress stats to report during a state trie
+// sync to RPC requests as well as to display in user logs.
+type stateSyncStats struct {
+ processed uint64 // Number of state entries processed
+ duplicate uint64 // Number of state entries downloaded twice
+ unexpected uint64 // Number of non-requested state entries received
+ pending uint64 // Number of still pending state entries
+}
+
+// syncState starts downloading state with the given root hash.
+func (d *Downloader) syncState(root common.Hash) *stateSync {
+ s := newStateSync(d, root)
+ select {
+ case d.stateSyncStart <- s:
+ case <-d.quitCh:
+ s.err = errCancelStateFetch
+ close(s.done)
+ }
+ return s
+}
+
+// stateFetcher manages the active state sync and accepts requests
+// on its behalf.
+func (d *Downloader) stateFetcher() {
+ for {
+ select {
+ case s := <-d.stateSyncStart:
+ for next := s; next != nil; {
+ next = d.runStateSync(next)
+ }
+ case <-d.stateCh:
+ // Ignore state responses while no sync is running.
+ case <-d.quitCh:
+ return
+ }
+ }
+}
+
+// runStateSync runs a state synchronisation until it completes or another root
+// hash is requested to be switched over to.
+func (d *Downloader) runStateSync(s *stateSync) *stateSync {
+ var (
+ active = make(map[string]*stateReq) // Currently in-flight requests
+ finished []*stateReq // Completed or failed requests
+ timeout = make(chan *stateReq) // Timed out active requests
+ )
+ defer func() {
+ // Cancel active request timers on exit. Also set peers to idle so they're
+ // available for the next sync.
+ for _, req := range active {
+ req.timer.Stop()
+ req.peer.SetNodeDataIdle(len(req.items))
+ }
+ }()
+ // Run the state sync.
+ go s.run()
+ defer s.Cancel()
+
+ // Listen for peer departure events to cancel assigned tasks
+ peerDrop := make(chan *peerConnection, 1024)
+ peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
+ defer peerSub.Unsubscribe()
+
+ for {
+ // Enable sending of the first buffered element if there is one.
+ var (
+ deliverReq *stateReq
+ deliverReqCh chan *stateReq
+ )
+ if len(finished) > 0 {
+ deliverReq = finished[0]
+ deliverReqCh = s.deliver
+ }
+
+ select {
+ // The stateSync lifecycle:
+ case next := <-d.stateSyncStart:
+ return next
+
+ case <-s.done:
+ return nil
+
+ // Send the next finished request to the current sync:
+ case deliverReqCh <- deliverReq:
+ finished = append(finished[:0], finished[1:]...)
+
+ // Handle incoming state packs:
+ case pack := <-d.stateCh:
+ // Discard any data not requested (or previsouly timed out)
+ req := active[pack.PeerId()]
+ if req == nil {
+ log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.response = pack.(*statePack).states
+
+ finished = append(finished, req)
+ delete(active, pack.PeerId())
+
+ // Handle dropped peer connections:
+ case p := <-peerDrop:
+ // Skip if no request is currently pending
+ req := active[p.id]
+ if req == nil {
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.dropped = true
+
+ finished = append(finished, req)
+ delete(active, p.id)
+
+ // Handle timed-out requests:
+ case req := <-timeout:
+ // If the peer is already requesting something else, ignore the stale timeout.
+ // This can happen when the timeout and the delivery happens simultaneously,
+ // causing both pathways to trigger.
+ if active[req.peer.id] != req {
+ continue
+ }
+ // Move the timed out data back into the download queue
+ finished = append(finished, req)
+ delete(active, req.peer.id)
+
+ // Track outgoing state requests:
+ case req := <-d.trackStateReq:
+ // If an active request already exists for this peer, we have a problem. In
+ // theory the trie node schedule must never assign two requests to the same
+ // peer. In practive however, a peer might receive a request, disconnect and
+ // immediately reconnect before the previous times out. In this case the first
+ // request is never honored, alas we must not silently overwrite it, as that
+ // causes valid requests to go missing and sync to get stuck.
+ if old := active[req.peer.id]; old != nil {
+ log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
+
+ // Make sure the previous one doesn't get siletly lost
+ old.timer.Stop()
+ old.dropped = true
+
+ finished = append(finished, old)
+ }
+ // Start a timer to notify the sync loop if the peer stalled.
+ req.timer = time.AfterFunc(req.timeout, func() {
+ select {
+ case timeout <- req:
+ case <-s.done:
+ // Prevent leaking of timer goroutines in the unlikely case where a
+ // timer is fired just before exiting runStateSync.
+ }
+ })
+ active[req.peer.id] = req
+ }
+ }
+}
+
+// stateSync schedules requests for downloading a particular state trie defined
+// by a given state root.
+type stateSync struct {
+ d *Downloader // Downloader instance to access and manage current peerset
+
+ sched *trie.TrieSync // State trie sync scheduler defining the tasks
+ keccak hash.Hash // Keccak256 hasher to verify deliveries with
+ tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
+
+ numUncommitted int
+ bytesUncommitted int
+
+ deliver chan *stateReq // Delivery channel multiplexing peer responses
+ cancel chan struct{} // Channel to signal a termination request
+ cancelOnce sync.Once // Ensures cancel only ever gets called once
+ done chan struct{} // Channel to signal termination completion
+ err error // Any error hit during sync (set before completion)
+}
+
+// stateTask represents a single trie node download taks, containing a set of
+// peers already attempted retrieval from to detect stalled syncs and abort.
+type stateTask struct {
+ attempts map[string]struct{}
+}
+
+// newStateSync creates a new state trie download scheduler. This method does not
+// yet start the sync. The user needs to call run to initiate.
+func newStateSync(d *Downloader, root common.Hash) *stateSync {
+ return &stateSync{
+ d: d,
+ sched: state.NewStateSync(root, d.stateDB),
+ keccak: sha3.NewKeccak256(),
+ tasks: make(map[common.Hash]*stateTask),
+ deliver: make(chan *stateReq),
+ cancel: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+}
+
+// run starts the task assignment and response processing loop, blocking until
+// it finishes, and finally notifying any goroutines waiting for the loop to
+// finish.
+func (s *stateSync) run() {
+ s.err = s.loop()
+ close(s.done)
+}
+
+// Wait blocks until the sync is done or canceled.
+func (s *stateSync) Wait() error {
+ <-s.done
+ return s.err
+}
+
+// Cancel cancels the sync and waits until it has shut down.
+func (s *stateSync) Cancel() error {
+ s.cancelOnce.Do(func() { close(s.cancel) })
+ return s.Wait()
+}
+
+// loop is the main event loop of a state trie sync. It it responsible for the
+// assignment of new tasks to peers (including sending it to them) as well as
+// for the processing of inbound data. Note, that the loop does not directly
+// receive data from peers, rather those are buffered up in the downloader and
+// pushed here async. The reason is to decouple processing from data receipt
+// and timeouts.
+func (s *stateSync) loop() error {
+ // Listen for new peer events to assign tasks to them
+ newPeer := make(chan *peerConnection, 1024)
+ peerSub := s.d.peers.SubscribeNewPeers(newPeer)
+ defer peerSub.Unsubscribe()
+
+ // Keep assigning new tasks until the sync completes or aborts
+ for s.sched.Pending() > 0 {
+ if err := s.commit(false); err != nil {
+ return err
+ }
+ s.assignTasks()
+ // Tasks assigned, wait for something to happen
+ select {
+ case <-newPeer:
+ // New peer arrived, try to assign it download tasks
+
+ case <-s.cancel:
+ return errCancelStateFetch
+
+ case req := <-s.deliver:
+ // Response, disconnect or timeout triggered, drop the peer if stalling
+ log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
+ if len(req.items) <= 2 && !req.dropped && req.timedOut() {
+ // 2 items are the minimum requested, if even that times out, we've no use of
+ // this peer at the moment.
+ log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
+ s.d.dropPeer(req.peer.id)
+ }
+ // Process all the received blobs and check for stale delivery
+ stale, err := s.process(req)
+ if err != nil {
+ log.Warn("Node data write error", "err", err)
+ return err
+ }
+ // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
+ if !stale {
+ req.peer.SetNodeDataIdle(len(req.response))
+ }
+ }
+ }
+ return s.commit(true)
+}
+
+func (s *stateSync) commit(force bool) error {
+ if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
+ return nil
+ }
+ start := time.Now()
+ b := s.d.stateDB.NewBatch()
+ s.sched.Commit(b)
+ if err := b.Write(); err != nil {
+ return fmt.Errorf("DB write error: %v", err)
+ }
+ s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
+ s.numUncommitted = 0
+ s.bytesUncommitted = 0
+ return nil
+}
+
+// assignTasks attempts to assing new tasks to all idle peers, either from the
+// batch currently being retried, or fetching new data from the trie sync itself.
+func (s *stateSync) assignTasks() {
+ // Iterate over all idle peers and try to assign them state fetches
+ peers, _ := s.d.peers.NodeDataIdlePeers()
+ for _, p := range peers {
+ // Assign a batch of fetches proportional to the estimated latency/bandwidth
+ cap := p.NodeDataCapacity(s.d.requestRTT())
+ req := &stateReq{peer: p, timeout: s.d.requestTTL()}
+ s.fillTasks(cap, req)
+
+ // If the peer was assigned tasks to fetch, send the network request
+ if len(req.items) > 0 {
+ req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
+ select {
+ case s.d.trackStateReq <- req:
+ req.peer.FetchNodeData(req.items)
+ case <-s.cancel:
+ }
+ }
+ }
+}
+
+// fillTasks fills the given request object with a maximum of n state download
+// tasks to send to the remote peer.
+func (s *stateSync) fillTasks(n int, req *stateReq) {
+ // Refill available tasks from the scheduler.
+ if len(s.tasks) < n {
+ new := s.sched.Missing(n - len(s.tasks))
+ for _, hash := range new {
+ s.tasks[hash] = &stateTask{make(map[string]struct{})}
+ }
+ }
+ // Find tasks that haven't been tried with the request's peer.
+ req.items = make([]common.Hash, 0, n)
+ req.tasks = make(map[common.Hash]*stateTask, n)
+ for hash, t := range s.tasks {
+ // Stop when we've gathered enough requests
+ if len(req.items) == n {
+ break
+ }
+ // Skip any requests we've already tried from this peer
+ if _, ok := t.attempts[req.peer.id]; ok {
+ continue
+ }
+ // Assign the request to this peer
+ t.attempts[req.peer.id] = struct{}{}
+ req.items = append(req.items, hash)
+ req.tasks[hash] = t
+ delete(s.tasks, hash)
+ }
+}
+
+// process iterates over a batch of delivered state data, injecting each item
+// into a running state sync, re-queuing any items that were requested but not
+// delivered.
+func (s *stateSync) process(req *stateReq) (bool, error) {
+ // Collect processing stats and update progress if valid data was received
+ duplicate, unexpected := 0, 0
+
+ defer func(start time.Time) {
+ if duplicate > 0 || unexpected > 0 {
+ s.updateStats(0, duplicate, unexpected, time.Since(start))
+ }
+ }(time.Now())
+
+ // Iterate over all the delivered data and inject one-by-one into the trie
+ progress, stale := false, len(req.response) > 0
+
+ for _, blob := range req.response {
+ prog, hash, err := s.processNodeData(blob)
+ switch err {
+ case nil:
+ s.numUncommitted++
+ s.bytesUncommitted += len(blob)
+ progress = progress || prog
+ case trie.ErrNotRequested:
+ unexpected++
+ case trie.ErrAlreadyProcessed:
+ duplicate++
+ default:
+ return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+ }
+ // If the node delivered a requested item, mark the delivery non-stale
+ if _, ok := req.tasks[hash]; ok {
+ delete(req.tasks, hash)
+ stale = false
+ }
+ }
+ // If we're inside the critical section, reset fail counter since we progressed.
+ if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
+ log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
+ atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
+ }
+
+ // Put unfulfilled tasks back into the retry queue
+ npeers := s.d.peers.Len()
+ for hash, task := range req.tasks {
+ // If the node did deliver something, missing items may be due to a protocol
+ // limit or a previous timeout + delayed delivery. Both cases should permit
+ // the node to retry the missing items (to avoid single-peer stalls).
+ if len(req.response) > 0 || req.timedOut() {
+ delete(task.attempts, req.peer.id)
+ }
+ // If we've requested the node too many times already, it may be a malicious
+ // sync where nobody has the right data. Abort.
+ if len(task.attempts) >= npeers {
+ return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+ }
+ // Missing item, place into the retry queue.
+ s.tasks[hash] = task
+ }
+ return stale, nil
+}
+
+// processNodeData tries to inject a trie node data blob delivered from a remote
+// peer into the state trie, returning whether anything useful was written or any
+// error occurred.
+func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
+ res := trie.SyncResult{Data: blob}
+ s.keccak.Reset()
+ s.keccak.Write(blob)
+ s.keccak.Sum(res.Hash[:0])
+ committed, _, err := s.sched.Process([]trie.SyncResult{res})
+ return committed, res.Hash, err
+}
+
+// updateStats bumps the various state sync progress counters and displays a log
+// message for the user to see.
+func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
+ s.d.syncStatsLock.Lock()
+ defer s.d.syncStatsLock.Unlock()
+
+ s.d.syncStatsState.pending = uint64(s.sched.Pending())
+ s.d.syncStatsState.processed += uint64(written)
+ s.d.syncStatsState.duplicate += uint64(duplicate)
+ s.d.syncStatsState.unexpected += uint64(unexpected)
+
+ if written > 0 || duplicate > 0 || unexpected > 0 {
+ log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+ }
+}
diff --git a/eth/downloader/types.go b/eth/downloader/types.go
index e10510486..3f30ea9dd 100644
--- a/eth/downloader/types.go
+++ b/eth/downloader/types.go
@@ -18,51 +18,10 @@ package downloader
import (
"fmt"
- "math/big"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
-// headerCheckFn is a callback type for verifying a header's presence in the local chain.
-type headerCheckFn func(common.Hash) bool
-
-// blockAndStateCheckFn is a callback type for verifying block and associated states' presence in the local chain.
-type blockAndStateCheckFn func(common.Hash) bool
-
-// headerRetrievalFn is a callback type for retrieving a header from the local chain.
-type headerRetrievalFn func(common.Hash) *types.Header
-
-// blockRetrievalFn is a callback type for retrieving a block from the local chain.
-type blockRetrievalFn func(common.Hash) *types.Block
-
-// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
-type headHeaderRetrievalFn func() *types.Header
-
-// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
-type headBlockRetrievalFn func() *types.Block
-
-// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
-type headFastBlockRetrievalFn func() *types.Block
-
-// headBlockCommitterFn is a callback for directly committing the head block to a certain entity.
-type headBlockCommitterFn func(common.Hash) error
-
-// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
-type tdRetrievalFn func(common.Hash) *big.Int
-
-// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
-type headerChainInsertFn func([]*types.Header, int) (int, error)
-
-// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
-type blockChainInsertFn func(types.Blocks) (int, error)
-
-// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
-type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
-
-// chainRollbackFn is a callback type to remove a few recently added elements from the local chain.
-type chainRollbackFn func([]common.Hash)
-
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index 98cc1a76b..50966f5ee 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -83,6 +83,7 @@ type announce struct {
// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
+ peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
}
@@ -90,6 +91,7 @@ type headerFilterTask struct {
// headerFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
+ peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
@@ -218,8 +220,8 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
-func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
- log.Trace("Filtering headers", "headers", len(headers))
+func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
+ log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
filter := make(chan *headerFilterTask)
@@ -231,7 +233,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
}
// Request the filtering of the header list
select {
- case filter <- &headerFilterTask{headers: headers, time: time}:
+ case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case <-f.quit:
return nil
}
@@ -246,8 +248,8 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
-func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
- log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles))
+func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+ log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
@@ -259,7 +261,7 @@ func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*
}
// Request the filtering of the body list
select {
- case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
+ case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case <-f.quit:
return nil, nil
}
@@ -444,7 +446,7 @@ func (f *Fetcher) loop() {
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms
- if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
+ if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
@@ -523,7 +525,7 @@ func (f *Fetcher) loop() {
txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
uncleHash := types.CalcUncleHash(task.uncles[i])
- if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
+ if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
// Mark the body matched, reassemble if still unknown
matched = true
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go
index 85d2f8645..9889e6cc5 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/fetcher_test.go
@@ -153,7 +153,7 @@ func (f *fetcherTester) dropPeer(peer string) {
}
// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
-func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
+func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
@@ -166,14 +166,14 @@ func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, d
headers = append(headers, block.Header())
}
// Return on a new thread
- go f.fetcher.FilterHeaders(headers, time.Now().Add(drift))
+ go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
return nil
}
}
// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
-func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
+func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
@@ -191,7 +191,7 @@ func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, dri
}
}
// Return on a new thread
- go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift))
+ go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
return nil
}
@@ -282,8 +282,8 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
@@ -309,22 +309,28 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
// Assemble a tester with a built in counter for the requests
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
+ firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
+ secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
+ secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
counter := uint32(0)
- headerWrapper := func(hash common.Hash) error {
+ firstHeaderWrapper := func(hash common.Hash) error {
+ atomic.AddUint32(&counter, 1)
+ return firstHeaderFetcher(hash)
+ }
+ secondHeaderWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1)
- return headerFetcher(hash)
+ return secondHeaderFetcher(hash)
}
// Iteratively announce blocks until all are imported
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- {
- tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
- tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), headerWrapper, bodyFetcher)
- tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), headerWrapper, bodyFetcher)
+ tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
+ tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
@@ -347,8 +353,8 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, but overlap them continuously
overlap := 16
@@ -381,8 +387,8 @@ func testPendingDeduplication(t *testing.T, protocol int) {
// Assemble a tester with a built in counter and delayed fetcher
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
delay := 50 * time.Millisecond
counter := uint32(0)
@@ -425,8 +431,8 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
skip := targetBlocks / 2
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
@@ -456,8 +462,8 @@ func testQueueGapFill(t *testing.T, protocol int) {
skip := targetBlocks / 2
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan *types.Block, len(hashes)-1)
@@ -486,8 +492,8 @@ func testImportDeduplication(t *testing.T, protocol int) {
// Create the tester and wrap the importer with a counter
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
counter := uint32(0)
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
@@ -570,8 +576,8 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
tester.lock.Unlock()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
fetching := make(chan struct{}, 2)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
@@ -603,14 +609,14 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
hashes, blocks := makeChain(1, 0, genesis)
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
+ badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
imported := make(chan *types.Block)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
// Announce a block with a bad number, check for immediate drop
- tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
verifyImportEvent(t, imported, false)
tester.lock.RLock()
@@ -620,8 +626,11 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
if !dropped {
t.Fatalf("peer with invalid numbered announcement not dropped")
}
+
+ goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
+ goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
// Make sure a good announcement passes without a drop
- tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
+ tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
verifyImportEvent(t, imported, true)
tester.lock.RLock()
@@ -645,8 +654,8 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
hashes, blocks := makeChain(32, 0, genesis)
tester := newTester()
- headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- bodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Add a monitoring hook for all internal events
fetching := make(chan []common.Hash)
@@ -697,12 +706,12 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// Create a valid chain and an infinite junk chain
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
- validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
- validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
+ validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
+ validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
attack, _ := makeChain(targetBlocks, 0, unknownBlock)
- attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
- attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
+ attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
+ attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
for i := 0; i < len(attack); i++ {
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 61647a5d0..03c1d6afc 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -52,7 +52,6 @@ type filter struct {
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
- useMipMap bool
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
@@ -64,14 +63,12 @@ type PublicFilterAPI struct {
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- useMipMap: !lightMode,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
-
go api.timeoutLoop()
return api
@@ -326,20 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ // Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
- filter := New(api.backend, api.useMipMap)
- filter.SetBeginBlock(crit.FromBlock.Int64())
- filter.SetEndBlock(crit.ToBlock.Int64())
- filter.SetAddresses(crit.Addresses)
- filter.SetTopics(crit.Topics)
-
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
return returnLogs(logs), err
}
@@ -373,21 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- filter := New(api.backend, api.useMipMap)
+ begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
- filter.SetBeginBlock(f.crit.FromBlock.Int64())
- } else {
- filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
+ begin = f.crit.FromBlock.Int64()
}
+ end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
- filter.SetEndBlock(f.crit.ToBlock.Int64())
- } else {
- filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
+ end = f.crit.ToBlock.Int64()
}
- filter.SetAddresses(f.crit.Addresses)
- filter.SetTopics(f.crit.Topics)
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
@@ -395,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
}
// GetFilterChanges returns the logs for the filter with the given id since
-// last time is was called. This can be used for polling.
+// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
@@ -504,7 +498,6 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
switch topic := t.(type) {
case nil:
// ignore topic when matching logs
- args.Topics[i] = []common.Hash{{}}
case string:
// match specific topic
@@ -513,12 +506,16 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
return err
}
args.Topics[i] = []common.Hash{top}
+
case []interface{}:
// or case e.g. [null, "topic0", "topic1"]
for _, rawTopic := range topic {
if rawTopic == nil {
- args.Topics[i] = append(args.Topics[i], common.Hash{})
- } else if topic, ok := rawTopic.(string); ok {
+ // null component, match all
+ args.Topics[i] = nil
+ break
+ }
+ if topic, ok := rawTopic.(string); ok {
parsed, err := decodeTopic(topic)
if err != nil {
return err
diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go
index 068a5ea24..4ae37f977 100644
--- a/eth/filters/api_test.go
+++ b/eth/filters/api_test.go
@@ -34,7 +34,6 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca")
topic1 = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3")
topic2 = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce")
- nullTopic = common.Hash{}
)
// default values
@@ -150,11 +149,8 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
if test6.Topics[0][0] != topic0 {
t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0)
}
- if len(test6.Topics[1]) != 1 {
- t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1]))
- }
- if test6.Topics[1][0] != nullTopic {
- t.Fatalf("got %x, expected empty hash", test6.Topics[1][0])
+ if len(test6.Topics[1]) != 0 {
+ t.Fatalf("expected 0 topic, got %d", len(test6.Topics[1]))
}
if len(test6.Topics[2]) != 1 {
t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2]))
@@ -180,18 +176,10 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
topic0, topic1, test7.Topics[0][0], test7.Topics[0][1],
)
}
- if len(test7.Topics[1]) != 1 {
- t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1]))
- }
- if test7.Topics[1][0] != nullTopic {
- t.Fatalf("expected empty hash, got %x", test7.Topics[1][0])
- }
- if len(test7.Topics[2]) != 2 {
- t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2]))
+ if len(test7.Topics[1]) != 0 {
+ t.Fatalf("expected 0 topic, got %d topics", len(test7.Topics[1]))
}
- if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic {
- t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]",
- topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1],
- )
+ if len(test7.Topics[2]) != 0 {
+ t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
}
}
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
new file mode 100644
index 000000000..0a0929bc1
--- /dev/null
+++ b/eth/filters/bench_test.go
@@ -0,0 +1,201 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+)
+
+func BenchmarkBloomBits512(b *testing.B) {
+ benchmarkBloomBits(b, 512)
+}
+
+func BenchmarkBloomBits1k(b *testing.B) {
+ benchmarkBloomBits(b, 1024)
+}
+
+func BenchmarkBloomBits2k(b *testing.B) {
+ benchmarkBloomBits(b, 2048)
+}
+
+func BenchmarkBloomBits4k(b *testing.B) {
+ benchmarkBloomBits(b, 4096)
+}
+
+func BenchmarkBloomBits8k(b *testing.B) {
+ benchmarkBloomBits(b, 8192)
+}
+
+func BenchmarkBloomBits16k(b *testing.B) {
+ benchmarkBloomBits(b, 16384)
+}
+
+func BenchmarkBloomBits32k(b *testing.B) {
+ benchmarkBloomBits(b, 32768)
+}
+
+const benchFilterCnt = 2000
+
+func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running bloombits benchmark section size:", sectionSize)
+
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+
+ clearBloomBits(db)
+ fmt.Println("Generating bloombits data...")
+ headNum := core.GetBlockNumber(db, head)
+ if headNum < sectionSize+512 {
+ b.Fatalf("not enough blocks for running a benchmark")
+ }
+
+ start := time.Now()
+ cnt := (headNum - 512) / sectionSize
+ var dataSize, compSize uint64
+ for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
+ bc, err := bloombits.NewGenerator(uint(sectionSize))
+ if err != nil {
+ b.Fatalf("failed to create generator: %v", err)
+ }
+ var header *types.Header
+ for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
+ hash := core.GetCanonicalHash(db, i)
+ header = core.GetHeader(db, hash, i)
+ if header == nil {
+ b.Fatalf("Error creating bloomBits data")
+ }
+ bc.AddBloom(uint(i-sectionIdx*sectionSize), header.Bloom)
+ }
+ sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
+ for i := 0; i < types.BloomBitLength; i++ {
+ data, err := bc.Bitset(uint(i))
+ if err != nil {
+ b.Fatalf("failed to retrieve bitset: %v", err)
+ }
+ comp := bitutil.CompressBytes(data)
+ dataSize += uint64(len(data))
+ compSize += uint64(len(comp))
+ core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
+ }
+ //if sectionIdx%50 == 0 {
+ // fmt.Println(" section", sectionIdx, "/", cnt)
+ //}
+ }
+
+ d := time.Since(start)
+ fmt.Println("Finished generating bloombits data")
+ fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
+ fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))
+
+ fmt.Println("Running filter benchmarks...")
+ start = time.Now()
+ mux := new(event.TypeMux)
+ var backend *testBackend
+
+ for i := 0; i < benchFilterCnt; i++ {
+ if i%20 == 0 {
+ db.Close()
+ db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ }
+ var addr common.Address
+ addr[0] = byte(i)
+ addr[1] = byte(i / 256)
+ filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
+ if _, err := filter.Logs(context.Background()); err != nil {
+ b.Error("filter.Find error:", err)
+ }
+ }
+ d = time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
+ db.Close()
+}
+
+func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) {
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(startPrefix)
+ for it.Valid() {
+ key := it.Key()
+ cmpLen := len(key)
+ if len(endPrefix) < cmpLen {
+ cmpLen = len(endPrefix)
+ }
+ if bytes.Compare(key[:cmpLen], endPrefix) == 1 {
+ break
+ }
+ fn(common.CopyBytes(key))
+ it.Next()
+ }
+ it.Release()
+}
+
+var bloomBitsPrefix = []byte("bloomBits-")
+
+func clearBloomBits(db ethdb.Database) {
+ fmt.Println("Clearing bloombits data...")
+ forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) {
+ db.Delete(key)
+ })
+}
+
+func BenchmarkNoBloomBits(b *testing.B) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running benchmark without bloombits")
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+ headNum := core.GetBlockNumber(db, head)
+
+ clearBloomBits(db)
+
+ fmt.Println("Running filter benchmarks...")
+ start := time.Now()
+ mux := new(event.TypeMux)
+ backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ filter := New(backend, 0, int64(headNum), []common.Address{{}}, nil)
+ filter.Logs(context.Background())
+ d := time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
+ db.Close()
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 0a0b81224..e208f8f38 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,12 +18,11 @@ package filters
import (
"context"
- "math"
"math/big"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -35,165 +34,185 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
-
- created time.Time
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single bloombits filter
+ // system. Since the bloombits are not positional, nil topics are permitted,
+ // which get flattened into a nil byte slice.
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ // Assemble and return the filter
+ size, _ := backend.BloomStatus()
+
return &Filter{
backend: backend,
- useMipMap: useMipMap,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, filters),
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
+ end = head
}
-
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
- }
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
- f.begin = int64(blockNumber + 1)
- return logs, nil
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
- }
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
- }
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close()
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ err := session.Error()
+ if err == nil {
+ f.begin = int64(end) + 1
}
+ return logs, err
+ }
+ f.begin = int64(number) + 1
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
}
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
}
}
-
- return nil, end
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
- return logs, end, err
+ return logs, err
}
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
if err != nil {
- return nil, end, err
- }
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ return logs, err
}
+ logs = append(logs, found...)
}
}
+ return logs, nil
+}
- return logs, end, nil
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, receipt.Logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func includes(addresses []common.Address, a common.Address) bool {
@@ -221,39 +240,27 @@ Logs:
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
-
- logTopics := make([]common.Hash, len(topics))
- copy(logTopics, log.Topics)
-
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
}
-
for i, topics := range topics {
- var match bool
+ match := len(topics) == 0 // empty rule set == wildcard
for _, topic := range topics {
- // common.Hash{} is a match all (wildcard)
- if (topic == common.Hash{}) || log.Topics[i] == topic {
+ if log.Topics[i] == topic {
match = true
break
}
}
-
if !match {
continue Logs
}
}
ret = append(ret, log)
}
-
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
@@ -263,16 +270,15 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
break
}
}
-
if !included {
return false
}
}
for _, sub := range topics {
- var included bool
+ included := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
- if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) {
+ if types.BloomLookup(bloom, topic) {
included = true
break
}
@@ -281,6 +287,5 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
return false
}
}
-
return true
}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 7abace1e6..e08cedb27 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
LastIndexSubscription
)
+const (
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
@@ -74,7 +87,6 @@ type subscription struct {
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
- sub *event.TypeMuxSubscription
backend Backend
lightMode bool
lastHead *types.Header
@@ -200,7 +212,6 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan
installed: make(chan struct{}),
err: make(chan error),
}
-
return es.subscribe(sub)
}
@@ -218,7 +229,6 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log
installed: make(chan struct{}),
err: make(chan error),
}
-
return es.subscribe(sub)
}
@@ -236,7 +246,6 @@ func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*ty
installed: make(chan struct{}),
err: make(chan error),
}
-
return es.subscribe(sub)
}
@@ -253,7 +262,6 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
installed: make(chan struct{}),
err: make(chan error),
}
-
return es.subscribe(sub)
}
@@ -270,64 +278,56 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
installed: make(chan struct{}),
err: make(chan error),
}
-
return es.subscribe(sub)
}
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
return
}
- switch e := ev.Data.(type) {
+ switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- case core.PendingLogsEvent:
- for _, f := range filters[PendingLogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
+ case *event.TypeMuxEvent:
+ switch muxe := e.Data.(type) {
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
- if ev.Time.After(f.created) {
- f.hashes <- e.Tx.Hash()
- }
+ f.hashes <- e.Tx.Hash()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
- if ev.Time.After(f.created) {
- f.headers <- e.Block.Header()
- }
+ f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
})
@@ -396,9 +396,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
- sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+ sub = es.mux.Subscribe(core.PendingLogsEvent{})
+ // Subscribe TxPreEvent form txpool
+ txCh = make(chan core.TxPreEvent, txChanSize)
+ txSub = es.backend.SubscribeTxPreEvent(txCh)
+ // Subscribe RemovedLogsEvent
+ rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+ rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+ // Subscribe []*types.Log
+ logsCh = make(chan []*types.Log, logsChanSize)
+ logsSub = es.backend.SubscribeLogsEvent(logsCh)
+ // Subscribe ChainEvent
+ chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
+ chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)
+ // Unsubscribe all events
+ defer sub.Unsubscribe()
+ defer txSub.Unsubscribe()
+ defer rmLogsSub.Unsubscribe()
+ defer logsSub.Unsubscribe()
+ defer chainEvSub.Unsubscribe()
+
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
@@ -410,6 +429,17 @@ func (es *EventSystem) eventLoop() {
return
}
es.broadcast(index, ev)
+
+ // Handle subscribed events
+ case ev := <-txCh:
+ es.broadcast(index, ev)
+ case ev := <-rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-logsCh:
+ es.broadcast(index, ev)
+ case ev := <-chainEvCh:
+ es.broadcast(index, ev)
+
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
@@ -428,6 +458,16 @@ func (es *EventSystem) eventLoop() {
delete(index[f.typ], f.id)
}
close(f.err)
+
+ // System stopped
+ case <-txSub.Err():
+ return
+ case <-rmLogsSub.Err():
+ return
+ case <-logsSub.Err():
+ return
+ case <-chainEvSub.Err():
+ return
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 822580b56..7da114fda 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -18,13 +18,16 @@ package filters
import (
"context"
+ "fmt"
"math/big"
+ "math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -33,8 +36,13 @@ import (
)
type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
+ mux *event.TypeMux
+ db ethdb.Database
+ sections uint64
+ txFeed *event.Feed
+ rmLogsFeed *event.Feed
+ logsFeed *event.Feed
+ chainFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@@ -63,6 +71,53 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
return core.GetBlockReceipts(b.db, blockHash, num), nil
}
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.chainFeed.Subscribe(ch)
+}
+
+func (b *testBackend) BloomStatus() (uint64, uint64) {
+ return params.BloomBitsBlocks, b.sections
+}
+
+func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ requests := make(chan chan *bloombits.Retrieval)
+
+ go session.Multiplex(16, 0, requests)
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-ctx.Done():
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
+ task.Bitsets[i], _ = core.GetBloomBits(b.db, task.Bit, section, head)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -74,7 +129,11 @@ func TestBlockSubscription(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -113,7 +172,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
- mux.Post(e)
+ chainFeed.Send(e)
}
<-sub0.Err()
@@ -125,10 +184,14 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -146,9 +209,10 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
- mux.Post(ev)
+ txFeed.Send(ev)
}
+ timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
@@ -160,10 +224,18 @@ func TestPendingTxFilter(t *testing.T) {
if len(hashes) >= len(transactions) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
+ if len(hashes) != len(transactions) {
+ t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+ return
+ }
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -175,10 +247,14 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -220,10 +296,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -241,15 +321,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
}
}
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -279,7 +363,7 @@ func TestLogFilter(t *testing.T) {
// match all
0: {FilterCriteria{}, allLogs, ""},
// match none due to no matching addresses
- 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, []*types.Log{}, ""},
+ 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""},
// match logs based on addresses, ignore topics
2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
// match none due to no matching topics (match with address)
@@ -300,6 +384,8 @@ func TestLogFilter(t *testing.T) {
10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
// all "mined" and pending logs with topic firstTopic
11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""},
+ // match all logs due to wildcard topic
+ 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
}
)
@@ -310,8 +396,8 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
- if err := mux.Post(allLogs); err != nil {
- t.Fatal(err)
+ if nsend := logsFeed.Send(allLogs); nsend == 0 {
+ t.Fatal("Shoud have at least one subscription")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
@@ -319,6 +405,7 @@ func TestLogFilter(t *testing.T) {
for i, tt := range testCases {
var fetched []*types.Log
+ timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
@@ -329,6 +416,10 @@ func TestLogFilter(t *testing.T) {
if len(fetched) >= len(tt.expected) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
@@ -349,15 +440,19 @@ func TestLogFilter(t *testing.T) {
}
}
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -366,7 +461,7 @@ func TestPendingLogsSubscription(t *testing.T) {
firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
- forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
+ fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
allLogs = []core.PendingLogsEvent{
@@ -378,7 +473,7 @@ func TestPendingLogsSubscription(t *testing.T) {
{Logs: []*types.Log{
{Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
{Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
- {Address: thirdAddress, Topics: []common.Hash{forthTopic}, BlockNumber: 5},
+ {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
}},
}
@@ -400,7 +495,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// match all
{FilterCriteria{}, convertLogs(allLogs), nil, nil},
// match none due to no matching addresses
- {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{{}}}, []*types.Log{}, nil, nil},
+ {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
// match logs based on addresses, ignore topics
{FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
// match none due to no matching topics (match with address)
@@ -412,7 +507,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
// multiple pending logs, should match only 2 topics from the logs in block 5
- {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, forthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
+ {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
}
)
@@ -439,15 +534,15 @@ func TestPendingLogsSubscription(t *testing.T) {
}
if len(fetched) != len(tt.expected) {
- t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+ panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)))
}
for l := range fetched {
if fetched[l].Removed {
- t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+ panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i))
}
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
- t.Errorf("invalid log on index %d for case %d", l, i)
+ panic(fmt.Sprintf("invalid log on index %d for case %d", l, i))
}
}
}()
@@ -455,6 +550,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
+ // allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index cd5e7cafd..11235e95a 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -33,7 +33,7 @@ import (
)
func makeReceipt(addr common.Address) *types.Receipt {
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{Address: addr},
}
@@ -41,54 +41,46 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
-func BenchmarkMipmaps(b *testing.B) {
- dir, err := ioutil.TempDir("", "mipmap")
+func BenchmarkFilters(b *testing.B) {
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- addr2 = common.BytesToAddress([]byte("jeff"))
- addr3 = common.BytesToAddress([]byte("ethereum"))
- addr4 = common.BytesToAddress([]byte("random addresses please"))
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ addr2 = common.BytesToAddress([]byte("jeff"))
+ addr3 = common.BytesToAddress([]byte("ethereum"))
+ addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 100010, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 2403:
receipt := makeReceipt(addr1)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 1034:
receipt := makeReceipt(addr2)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 34:
receipt := makeReceipt(addr3)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 99999:
receipt := makeReceipt(addr4)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
}
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- b.Fatal(err)
- }
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -104,13 +96,10 @@ func BenchmarkMipmaps(b *testing.B) {
}
b.ResetTimer()
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
@@ -118,18 +107,22 @@ func BenchmarkMipmaps(b *testing.B) {
}
func TestFilters(t *testing.T) {
- dir, err := ioutil.TempDir("", "mipmap")
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr = crypto.PubkeyToAddress(key1.PublicKey)
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))
@@ -140,10 +133,9 @@ func TestFilters(t *testing.T) {
genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 1000, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 1:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -151,9 +143,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 2:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -161,9 +152,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 998:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -171,9 +161,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 999:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -181,18 +170,7 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
- }
-
- // store the receipts
- err := core.WriteReceipts(db, receipts)
- if err != nil {
- t.Fatal(err)
}
- // i is used as block number for the writes but since the i
- // starts at 0 and block 0 (genesis) is already present increment
- // by one
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -207,23 +185,15 @@ func TestFilters(t *testing.T) {
}
}
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash1, hash2, hash3, hash4}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(900)
- filter.SetEndBlock(999)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -231,12 +201,8 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(990)
- filter.SetEndBlock(-1)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -244,44 +210,32 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{hash1, hash2}})
- filter.SetBeginBlock(1)
- filter.SetEndBlock(10)
+ filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}
failHash := common.BytesToHash([]byte("fail"))
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
failAddr := common.BytesToAddress([]byte("failmenow"))
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{failAddr})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}, {hash1}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
diff --git a/eth/gen_config.go b/eth/gen_config.go
index 477479419..4a4cd7b9c 100644
--- a/eth/gen_config.go
+++ b/eth/gen_config.go
@@ -47,7 +47,6 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.SyncMode = c.SyncMode
enc.LightServ = c.LightServ
enc.LightPeers = c.LightPeers
- enc.MaxPeers = c.MaxPeers
enc.SkipBcVersionCheck = c.SkipBcVersionCheck
enc.DatabaseHandles = c.DatabaseHandles
enc.DatabaseCache = c.DatabaseCache
@@ -119,9 +118,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.LightPeers != nil {
c.LightPeers = *dec.LightPeers
}
- if dec.MaxPeers != nil {
- c.MaxPeers = *dec.MaxPeers
- }
if dec.SkipBcVersionCheck != nil {
c.SkipBcVersionCheck = *dec.SkipBcVersionCheck
}
diff --git a/eth/handler.go b/eth/handler.go
index 8c2f5660d..bec5126dc 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ import (
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
)
var (
@@ -78,7 +82,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txSub *event.TypeMuxSubscription
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@@ -87,8 +92,6 @@ type ProtocolManager struct {
quitSync chan struct{}
noMorePeers chan struct{}
- lesServer LesServer
-
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -96,7 +99,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, maxPeers int, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
@@ -105,7 +108,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
blockchain: blockchain,
chaindb: chaindb,
chainconfig: config,
- maxPeers: maxPeers,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -159,10 +161,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
- manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash,
- blockchain.GetBlockByHash, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
- blockchain.GetTdByHash, blockchain.InsertHeaderChain, manager.blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback,
- manager.removePeer)
+ manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
@@ -203,10 +202,14 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) Start() {
+func (pm *ProtocolManager) Start(maxPeers int) {
+ pm.maxPeers = maxPeers
+
// broadcast transactions
- pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ pm.txCh = make(chan core.TxPreEvent, txChanSize)
+ pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
@@ -270,7 +273,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
- if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -447,7 +450,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
- headers = pm.fetcher.FilterHeaders(headers, time.Now())
+ headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
@@ -500,7 +503,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(trasactions) > 0 || len(uncles) > 0
if filter {
- trasactions, uncles = pm.fetcher.FilterBodies(trasactions, uncles, time.Now())
+ trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())
}
if len(trasactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
@@ -606,7 +609,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
- if !pm.blockchain.HasBlock(block.Hash) {
+ if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
@@ -663,7 +666,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkTransaction(tx.Hash())
}
- pm.txpool.AddBatch(txs)
+ pm.txpool.AddRemotes(txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@@ -693,9 +696,10 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
peer.SendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ return
}
// Otherwise if the block is indeed in out own chain, announce it
- if pm.blockchain.HasBlock(hash) {
+ if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
@@ -728,10 +732,15 @@ func (self *ProtocolManager) minedBroadcastLoop() {
}
func (self *ProtocolManager) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.Data.(core.TxPreEvent)
- self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ for {
+ select {
+ case event := <-self.txCh:
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-self.txSub.Err():
+ return
+ }
}
}
diff --git a/eth/handler_test.go b/eth/handler_test.go
index 413ed2bff..6752cd2a8 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -374,7 +374,7 @@ func testGetNodeData(t *testing.T, protocol int) {
}
accounts := []common.Address{testBank, acc1Addr, acc2Addr}
for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
- trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), statedb)
+ trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb))
for j, acc := range accounts {
state, _ := pm.blockchain.State()
@@ -474,13 +474,13 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
gspec = &core.Genesis{Config: config}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{})
)
- pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db)
+ pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
- pm.Start()
+ pm.Start(1000)
defer pm.Stop()
// Connect a new peer and check that we receive the DAO challenge
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 0260b9d77..f02242b15 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -59,18 +59,18 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
)
chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, 1000, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
if err != nil {
return nil, err
}
- pm.Start()
+ pm.Start(1000)
return pm, nil
}
@@ -88,15 +88,16 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
- pool []*types.Transaction // Collection of all transactions
- added chan<- []*types.Transaction // Notification channel for new transactions
+ txFeed event.Feed
+ pool []*types.Transaction // Collection of all transactions
+ added chan<- []*types.Transaction // Notification channel for new transactions
lock sync.RWMutex // Protects the transaction pool
}
-// AddBatch appends a batch of transactions to the pool, and notifies any
+// AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
-func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
+func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()
@@ -104,8 +105,7 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
if p.added != nil {
p.added <- txs
}
-
- return nil
+ return make([]error, len(txs))
}
// Pending returns all the transactions known to the pool
@@ -124,6 +124,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return batches, nil
}
+func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return p.txFeed.Subscribe(ch)
+}
+
// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize))
diff --git a/eth/protocol.go b/eth/protocol.go
index 4bc8bee72..cd7db57f2 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -22,7 +22,9 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -94,12 +96,16 @@ var errorToString = map[int]string{
}
type txPool interface {
- // AddBatch should add the given transactions to the pool.
- AddBatch([]*types.Transaction) error
+ // AddRemotes should add the given transactions to the pool.
+ AddRemotes([]*types.Transaction) []error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending() (map[common.Address]types.Transactions, error)
+
+ // SubscribeTxPreEvent should return an event subscription of
+ // TxPreEvent and send events to the given channel.
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
}
// statusData is the network packet for the status message.
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
index 2056ee0a8..d3a44ae91 100644
--- a/eth/protocol_test.go
+++ b/eth/protocol_test.go
@@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) {
for nonce := range alltxs {
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
}
- pm.txpool.AddBatch(alltxs)
+ pm.txpool.AddRemotes(alltxs)
// Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup
diff --git a/eth/sync.go b/eth/sync.go
index 8784b225d..a8ae64617 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -138,7 +138,9 @@ func (pm *ProtocolManager) syncer() {
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
- forceSync := time.Tick(forceSyncCycle)
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
for {
select {
case <-pm.newPeerCh:
@@ -148,7 +150,7 @@ func (pm *ProtocolManager) syncer() {
}
go pm.synchronise(pm.peers.BestPeer())
- case <-forceSync:
+ case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
@@ -186,7 +188,17 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}
- if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
+ // Run the sync cycle, and disable fast sync if we've went past the pivot block
+ err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
+
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Disable fast sync if we indeed have something in our chain
+ if pm.blockchain.CurrentBlock().NumberU64() > 0 {
+ log.Info("Fast sync complete, auto disabling")
+ atomic.StoreUint32(&pm.fastSync, 0)
+ }
+ }
+ if err != nil {
return
}
atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
@@ -199,12 +211,4 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
// more reliably update peers or the local TD state.
go pm.BroadcastBlock(head, false)
}
- // If fast sync was enabled, and we synced up, disable it
- if atomic.LoadUint32(&pm.fastSync) == 1 {
- // Disable fast sync if we indeed have something in our chain
- if pm.blockchain.CurrentBlock().NumberU64() > 0 {
- log.Info("Fast sync complete, auto disabling")
- atomic.StoreUint32(&pm.fastSync, 0)
- }
- }
}