diff options
Diffstat (limited to 'eth/backend.go')
-rw-r--r-- | eth/backend.go | 72 |
1 files changed, 43 insertions, 29 deletions
diff --git a/eth/backend.go b/eth/backend.go index 639792333..1cd9e8fff 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" @@ -53,20 +54,24 @@ type LesServer interface { Start(srvr *p2p.Server) Stop() Protocols() []p2p.Protocol + SetBloomBitsIndexer(bbIndexer *core.ChainIndexer) } // Ethereum implements the Ethereum full node service. type Ethereum struct { + config *Config chainConfig *params.ChainConfig + // Channel for shutting down the service - shutdownChan chan bool // Channel for shutting down the ethereum - stopDbUpgrade func() // stop chain db sequential key upgrade + shutdownChan chan bool // Channel for shutting down the ethereum + stopDbUpgrade func() error // stop chain db sequential key upgrade + // Handlers txPool *core.TxPool - txMu sync.Mutex blockchain *core.BlockChain protocolManager *ProtocolManager lesServer LesServer + // DB interfaces chainDb ethdb.Database // Block chain database @@ -74,6 +79,9 @@ type Ethereum struct { engine consensus.Engine accountManager *accounts.Manager + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + ApiBackend *EthApiBackend miner *miner.Miner @@ -88,7 +96,7 @@ type Ethereum struct { func (s *Ethereum) AddLesServer(ls LesServer) { s.lesServer = ls - s.protocolManager.lesServer = ls + ls.SetBloomBitsIndexer(s.bloomIndexer) } // New creates a new Ethereum object (including the @@ -100,12 +108,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } - chainDb, err := CreateDB(ctx, config, "chaindata") if err != nil { return nil, err } - stopDbUpgrade := upgradeSequentialKeys(chainDb) + stopDbUpgrade := upgradeDeduplicateData(chainDb) chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr @@ -113,6 +120,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ + config: config, chainDb: chainDb, chainConfig: chainConfig, eventMux: ctx.EventMux, @@ -123,11 +131,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { networkId: config.NetworkId, gasPrice: config.GasPrice, etherbase: config.Etherbase, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks), } - if err := addMipmapBloomBins(chainDb); err != nil { - return nil, err - } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) if !config.SkipBcVersionCheck { @@ -139,7 +146,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig) + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -149,25 +156,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eth.blockchain.SetHead(compat.RewindTo) core.WriteChainConfig(chainDb, genesisHash, chainConfig) } + eth.bloomIndexer.Start(eth.blockchain) - newPool := core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) - eth.txPool = newPool - - maxPeers := config.MaxPeers - if config.LightServ > 0 { - // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections - // temporary solution until the new peer connectivity API is finished - halfPeers := maxPeers / 2 - maxPeers -= config.LightPeers - if maxPeers < halfPeers { - maxPeers = halfPeers - } + if config.TxPool.Journal != "" { + config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) @@ -201,10 +199,13 @@ func makeExtraData(extra []byte) []byte { // CreateDB creates the chain database. func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) { db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles) + if err != nil { + return nil, err + } if db, ok := db.(*ethdb.LDBDatabase); ok { db.Meter("eth/db/chaindata/") } - return db, err + return db, nil } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service @@ -328,7 +329,7 @@ func (s *Ethereum) StartMining(local bool) error { wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) if wallet == nil || err != nil { log.Error("Etherbase account unavailable locally", "err", err) - return fmt.Errorf("singer missing: %v", err) + return fmt.Errorf("signer missing: %v", err) } clique.Authorize(eb, wallet.SignHash) } @@ -363,17 +364,29 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage func (s *Ethereum) Protocols() []p2p.Protocol { if s.lesServer == nil { return s.protocolManager.SubProtocols - } else { - return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...) } + return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...) } // Start implements node.Service, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start(srvr *p2p.Server) error { + // Start the bloom bits servicing goroutines + s.startBloomHandlers() + + // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) - s.protocolManager.Start() + // Figure out a max peers count based on the server limits + maxPeers := srvr.MaxPeers + if s.config.LightServ > 0 { + maxPeers -= s.config.LightPeers + if maxPeers < srvr.MaxPeers/2 { + maxPeers = srvr.MaxPeers / 2 + } + } + // Start the networking layer and the light server if requested + s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } @@ -386,6 +399,7 @@ func (s *Ethereum) Stop() error { if s.stopDbUpgrade != nil { s.stopDbUpgrade() } + s.bloomIndexer.Close() s.blockchain.Stop() s.protocolManager.Stop() if s.lesServer != nil { |