aboutsummaryrefslogtreecommitdiffstats
path: root/eth/backend.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/backend.go')
-rw-r--r--eth/backend.go72
1 files changed, 43 insertions, 29 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 639792333..1cd9e8fff 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@@ -53,20 +54,24 @@ type LesServer interface {
Start(srvr *p2p.Server)
Stop()
Protocols() []p2p.Protocol
+ SetBloomBitsIndexer(bbIndexer *core.ChainIndexer)
}
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
+ config *Config
chainConfig *params.ChainConfig
+
// Channel for shutting down the service
- shutdownChan chan bool // Channel for shutting down the ethereum
- stopDbUpgrade func() // stop chain db sequential key upgrade
+ shutdownChan chan bool // Channel for shutting down the ethereum
+ stopDbUpgrade func() error // stop chain db sequential key upgrade
+
// Handlers
txPool *core.TxPool
- txMu sync.Mutex
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
+
// DB interfaces
chainDb ethdb.Database // Block chain database
@@ -74,6 +79,9 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
+
ApiBackend *EthApiBackend
miner *miner.Miner
@@ -88,7 +96,7 @@ type Ethereum struct {
func (s *Ethereum) AddLesServer(ls LesServer) {
s.lesServer = ls
- s.protocolManager.lesServer = ls
+ ls.SetBloomBitsIndexer(s.bloomIndexer)
}
// New creates a new Ethereum object (including the
@@ -100,12 +108,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
-
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
- stopDbUpgrade := upgradeSequentialKeys(chainDb)
+ stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
@@ -113,6 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
+ config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
@@ -123,11 +131,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
- if err := addMipmapBloomBins(chainDb); err != nil {
- return nil, err
- }
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
@@ -139,7 +146,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
- eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig)
+ eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
@@ -149,25 +156,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
+ eth.bloomIndexer.Start(eth.blockchain)
- newPool := core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
- eth.txPool = newPool
-
- maxPeers := config.MaxPeers
- if config.LightServ > 0 {
- // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections
- // temporary solution until the new peer connectivity API is finished
- halfPeers := maxPeers / 2
- maxPeers -= config.LightPeers
- if maxPeers < halfPeers {
- maxPeers = halfPeers
- }
+ if config.TxPool.Journal != "" {
+ config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
+ eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
- if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
+ if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
-
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
@@ -201,10 +199,13 @@ func makeExtraData(extra []byte) []byte {
// CreateDB creates the chain database.
func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) {
db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles)
+ if err != nil {
+ return nil, err
+ }
if db, ok := db.(*ethdb.LDBDatabase); ok {
db.Meter("eth/db/chaindata/")
}
- return db, err
+ return db, nil
}
// CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service
@@ -328,7 +329,7 @@ func (s *Ethereum) StartMining(local bool) error {
wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
if wallet == nil || err != nil {
log.Error("Etherbase account unavailable locally", "err", err)
- return fmt.Errorf("singer missing: %v", err)
+ return fmt.Errorf("signer missing: %v", err)
}
clique.Authorize(eb, wallet.SignHash)
}
@@ -363,17 +364,29 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
- } else {
- return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
+ return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers()
+
+ // Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
- s.protocolManager.Start()
+ // Figure out a max peers count based on the server limits
+ maxPeers := srvr.MaxPeers
+ if s.config.LightServ > 0 {
+ maxPeers -= s.config.LightPeers
+ if maxPeers < srvr.MaxPeers/2 {
+ maxPeers = srvr.MaxPeers / 2
+ }
+ }
+ // Start the networking layer and the light server if requested
+ s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
@@ -386,6 +399,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
+ s.bloomIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {