diff options
Diffstat (limited to 'eth/backend.go')
-rw-r--r-- | eth/backend.go | 113 |
1 files changed, 63 insertions, 50 deletions
diff --git a/eth/backend.go b/eth/backend.go index 982317314..fa8349116 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -3,9 +3,9 @@ package eth import ( "crypto/ecdsa" "fmt" - "math" "path" "strings" + "time" "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" @@ -30,8 +30,9 @@ var ( jsonlogger = logger.NewJsonLogger() defaultBootNodes = []*discover.Node{ - // ETH/DEV cmd/bootnode - discover.MustParseNode("enode://09fbeec0d047e9a37e63f60f8618aa9df0e49271f3fadb2c070dc09e2099b95827b63a8b837c6fd01d0802d457dd83e3bd48bd3e6509f8209ed90dabbc30e3d3@52.16.188.185:30303"), + // ETH/DEV Go Bootnodes + discover.MustParseNode("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"), + discover.MustParseNode("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"), // ETH/DEV cpp-ethereum (poc-9.ethdev.com) discover.MustParseNode("enode://487611428e6c99a11a9795a6abe7b529e81315ca6aad66e2a2fc76e3adf263faba0d35466c2f8f68d561dbefa8878d4df5f1f2ddb1fbeab7f42ffb8cd328bd4a@5.1.83.226:30303"), } @@ -124,6 +125,8 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) + // Closed when databases are flushed and closed + databasesClosed chan bool //*** SERVICES *** // State manager for processing new blocks and managing the over all states @@ -136,11 +139,10 @@ type Ethereum struct { protocolManager *ProtocolManager downloader *downloader.Downloader - net *p2p.Server - eventMux *event.TypeMux - txSub event.Subscription - minedBlockSub event.Subscription - miner *miner.Miner + net *p2p.Server + eventMux *event.TypeMux + txSub event.Subscription + miner *miner.Miner // logger logger.LogSystem @@ -199,30 +201,31 @@ func New(config *Config) (*Ethereum, error) { glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion) eth := &Ethereum{ - shutdownChan: make(chan bool), - blockDb: blockDb, - stateDb: stateDb, - extraDb: extraDb, - eventMux: &event.TypeMux{}, - accountManager: config.AccountManager, - DataDir: config.DataDir, - etherbase: common.HexToAddress(config.Etherbase), - clientVersion: config.Name, // TODO should separate from Name - ethVersionId: config.ProtocolVersion, - netVersionId: config.NetworkId, - NatSpec: config.NatSpec, + shutdownChan: make(chan bool), + databasesClosed: make(chan bool), + blockDb: blockDb, + stateDb: stateDb, + extraDb: extraDb, + eventMux: &event.TypeMux{}, + accountManager: config.AccountManager, + DataDir: config.DataDir, + etherbase: common.HexToAddress(config.Etherbase), + clientVersion: config.Name, // TODO should separate from Name + ethVersionId: config.ProtocolVersion, + netVersionId: config.NetworkId, + NatSpec: config.NatSpec, } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) - eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain, eth.chainManager.Td) + eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain) eth.pow = ethash.New(eth.chainManager) - eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State) + eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.whisper = whisper.New() eth.shhVersionId = int(eth.whisper.Version()) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.downloader) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) netprv, err := config.nodeKey() if err != nil { @@ -319,10 +322,9 @@ func (s *Ethereum) StartMining() error { err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) glog.V(logger.Error).Infoln(err) return err - } - s.miner.Start(eb) + go s.miner.Start(eb) return nil } @@ -379,8 +381,12 @@ func (s *Ethereum) Start() error { } } + // periodically flush databases + go s.syncDatabases() + // Start services - s.txPool.Start() + go s.txPool.Start() + s.protocolManager.Start() if s.whisper != nil { s.whisper.Start() @@ -390,14 +396,38 @@ func (s *Ethereum) Start() error { s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) go s.txBroadcastLoop() - // broadcast mined blocks - s.minedBlockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go s.minedBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } +func (s *Ethereum) syncDatabases() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + // don't change the order of database flushes + if err := s.extraDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + } + if err := s.stateDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + } + if err := s.blockDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + } + case <-s.shutdownChan: + break done + } + } + + s.blockDb.Close() + s.stateDb.Close() + s.extraDb.Close() + + close(s.databasesClosed) +} + func (s *Ethereum) StartForTest() { jsonlogger.LogJson(&logger.LogStarting{ ClientString: s.net.Name, @@ -418,14 +448,9 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - // Close the database - defer s.blockDb.Close() - defer s.stateDb.Close() - defer s.extraDb.Close() - - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop + s.protocolManager.Stop() s.txPool.Stop() s.eventMux.Stop() if s.whisper != nil { @@ -438,16 +463,14 @@ func (s *Ethereum) Stop() { // This function will wait for a shutdown and resumes main thread execution func (s *Ethereum) WaitForShutdown() { + <-s.databasesClosed <-s.shutdownChan } -// now tx broadcasting is taken out of txPool -// handled here via subscription, efficiency? func (self *Ethereum) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { event := obj.(core.TxPreEvent) - self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx}) self.syncAccounts(event.Tx) } } @@ -466,16 +489,6 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { } } -func (self *Ethereum) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.minedBlockSub.Chan() { - switch ev := obj.(type) { - case core.NewMinedBlockEvent: - self.protocolManager.BroadcastBlock(ev.Block.Hash(), ev.Block) - } - } -} - func saveProtocolVersion(db common.Database, protov int) { d, _ := db.Get([]byte("ProtocolVersion")) protocolVersion := common.NewValue(d).Uint() |