From 36cdab206849c7e363e0b9911553098c3e8ca644 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:58:31 +0200 Subject: all: use (blocking) event package instead of ethreact --- ethchain/dagger.go | 7 +-- ethchain/events.go | 10 ++++ ethchain/state_manager.go | 55 +++++++---------- ethchain/transaction_pool.go | 3 +- ethereum.go | 74 +++++++++-------------- ethminer/miner.go | 137 ++++++++++++++++++++++--------------------- events.go | 11 ++++ peer.go | 2 +- 8 files changed, 144 insertions(+), 155 deletions(-) create mode 100644 ethchain/events.go create mode 100644 events.go diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 916d7e9c8..2d2b5720f 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" ) @@ -16,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethreact.Event) []byte + Search(block *Block, stop <-chan struct{}) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool GetHashrate() int64 Turbo(bool) @@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) { pow.turbo = on } -func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { +func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty @@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { for { select { - case <-reactChan: + case <-stop: powlogger.Infoln("Breaking from mining") return nil default: diff --git a/ethchain/events.go b/ethchain/events.go new file mode 100644 index 000000000..05c21edfe --- /dev/null +++ b/ethchain/events.go @@ -0,0 +1,10 @@ +package ethchain + +type TxEvent struct { + Type int // TxPre || TxPost + Tx *Transaction +} + +type NewBlockEvent struct { + Block *Block +} diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 589b99ac2..b71cbe8a1 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -11,11 +11,10 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) var statelogger = ethlog.NewLogger("STATE") @@ -37,7 +36,6 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool @@ -45,7 +43,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database - Eventer() *eventer.EventMachine + EventMux() *event.TypeMux } type StateManager struct { @@ -73,17 +71,15 @@ type StateManager struct { // 'Process' & canonical validation. lastAttemptedBlock *Block - // Quit chan - quit chan bool + events event.Subscription } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - eth: ethereum, - bc: ethereum.BlockChain(), - quit: make(chan bool), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager { func (self *StateManager) Start() { statelogger.Debugln("Starting state manager") - + self.events = self.eth.EventMux().Subscribe(Blocks(nil)) go self.updateThread() } func (self *StateManager) Stop() { statelogger.Debugln("Stopping state manager") - - close(self.quit) + self.events.Unsubscribe() } func (self *StateManager) updateThread() { - blockChan := self.eth.Eventer().Register("blocks") - -out: - for { - select { - case event := <-blockChan: - blocks := event.Data.(Blocks) - for _, block := range blocks { - err := self.Process(block, false) - if err != nil { - statelogger.Infoln(err) - statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - statelogger.Debugln(block) - break - } + for ev := range self.events.Chan() { + for _, block := range ev.(Blocks) { + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + break } - - case <-self.quit: - break out } } } @@ -202,7 +187,7 @@ done: } // Notify all subscribers - self.eth.Reactor().Post("newTx:post", tx) + self.eth.EventMux().Post(TxEvent{TxPost, tx}) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.eth.Reactor().Post("newBlock", block) + sm.eth.EventMux().Post(NewBlockEvent{block}) state.Manifest().Reset() } @@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.eth.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.EventMux().Post(state.Manifest().Messages) return bloomf } diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index da6c3d6ba..0676af3a3 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -24,6 +24,7 @@ type TxMsgTy byte const ( TxPre = iota TxPost + minGasPrice = 1000000 ) @@ -160,7 +161,7 @@ out: txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash()) // Notify the subscribers - pool.Ethereum.Reactor().Post("newTx:pre", tx) + pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx}) } case <-pool.quit: break out diff --git a/ethereum.go b/ethereum.go index 204f30bec..750ca8f03 100644 --- a/ethereum.go +++ b/ethereum.go @@ -17,12 +17,11 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) const ( @@ -60,7 +59,7 @@ type Ethereum struct { // The block pool blockPool *BlockPool // Eventer - eventer *eventer.EventMachine + eventMux *event.TypeMux // Peers peers *list.List // Nonce @@ -85,8 +84,6 @@ type Ethereum struct { listening bool - reactor *ethreact.ReactorEngine - RpcServer *ethrpc.JsonRpcServer keyManager *ethcrypto.KeyManager @@ -129,8 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager isUpToDate: true, filters: make(map[int]*ethchain.Filter), } - ethereum.reactor = ethreact.New() - ethereum.eventer = eventer.New() + ethereum.eventMux = event.NewTypeMux() ethereum.blockPool = NewBlockPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum) @@ -143,10 +139,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethreact.ReactorEngine { - return s.reactor -} - func (s *Ethereum) KeyManager() *ethcrypto.KeyManager { return s.keyManager } @@ -169,8 +161,8 @@ func (s *Ethereum) TxPool() *ethchain.TxPool { func (s *Ethereum) BlockPool() *BlockPool { return s.blockPool } -func (s *Ethereum) Eventer() *eventer.EventMachine { - return s.eventer +func (s *Ethereum) EventMux() *event.TypeMux { + return s.eventMux } func (self *Ethereum) Db() ethutil.Database { return self.db @@ -376,7 +368,7 @@ func (s *Ethereum) removePeerElement(e *list.Element) { s.peers.Remove(e) - s.reactor.Post("peerList", s.peers) + s.eventMux.Post(PeerListEvent{s.peers}) } func (s *Ethereum) RemovePeer(p *Peer) { @@ -400,7 +392,6 @@ func (s *Ethereum) reapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { - s.reactor.Start() s.blockPool.Start() s.stateManager.Start() @@ -524,8 +515,7 @@ func (s *Ethereum) Stop() { } s.txPool.Stop() s.stateManager.Stop() - s.reactor.Flush() - s.reactor.Stop() + s.eventMux.Stop() s.blockPool.Stop() ethlogger.Infoln("Server stopped") @@ -584,10 +574,10 @@ out: select { case <-upToDateTimer.C: if self.IsUpToDate() && !self.isUpToDate { - self.reactor.Post("chainSync", false) + self.eventMux.Post(ChainSyncEvent{false}) self.isUpToDate = true } else if !self.IsUpToDate() && self.isUpToDate { - self.reactor.Post("chainSync", true) + self.eventMux.Post(ChainSyncEvent{true}) self.isUpToDate = false } case <-self.quit: @@ -623,40 +613,30 @@ func (self *Ethereum) GetFilter(id int) *ethchain.Filter { } func (self *Ethereum) filterLoop() { - blockChan := make(chan ethreact.Event, 5) - messageChan := make(chan ethreact.Event, 5) // Subscribe to events - reactor := self.Reactor() - reactor.Subscribe("newBlock", blockChan) - reactor.Subscribe("messages", messageChan) -out: - for { - select { - case <-self.quit: - break out - case block := <-blockChan: - if block, ok := block.Resource.(*ethchain.Block); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(block) - } + events := self.eventMux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil)) + for event := range events.Chan() { + switch event := event.(type) { + case ethchain.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) } - self.filterMu.RUnlock() } - case msg := <-messageChan: - if messages, ok := msg.Resource.(ethstate.Messages); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(messages) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } + self.filterMu.RUnlock() + + case ethstate.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) } } - self.filterMu.RUnlock() } + self.filterMu.RUnlock() } } } diff --git a/ethminer/miner.go b/ethminer/miner.go index 299a5204a..ffc49f096 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -6,27 +6,37 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/event" ) var logger = ethlog.NewLogger("MINER") type Miner struct { - pow ethchain.PoW - ethereum ethchain.EthManager - coinbase []byte - reactChan chan ethreact.Event - txs ethchain.Transactions - uncles []*ethchain.Block - block *ethchain.Block - powChan chan []byte - powQuitChan chan ethreact.Event - quitChan chan chan error + pow ethchain.PoW + ethereum ethchain.EthManager + coinbase []byte + txs ethchain.Transactions + uncles []*ethchain.Block + block *ethchain.Block + + events event.Subscription + powQuitChan chan struct{} + powDone chan struct{} turbo bool } +const ( + Started = iota + Stopped +) + +type Event struct { + Type int // Started || Stopped + Miner *Miner +} + func (self *Miner) GetPow() ethchain.PoW { return self.pow } @@ -48,46 +58,42 @@ func (self *Miner) ToggleTurbo() { } func (miner *Miner) Start() { - miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block - miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread - miner.quitChan = make(chan chan error, 1) // Insert initial TXs in our little miner 'pool' miner.txs = miner.ethereum.TxPool().Flush() miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase) + mux := miner.ethereum.EventMux() + miner.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethchain.TxEvent{}) + // Prepare inital block //miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State()) go miner.listener() - reactor := miner.ethereum.Reactor() - reactor.Subscribe("newBlock", miner.reactChan) - reactor.Subscribe("newTx:pre", miner.reactChan) - - // We need the quit chan to be a Reactor event. - // The POW search method is actually blocking and if we don't - // listen to the reactor events inside of the pow itself - // The miner overseer will never get the reactor events themselves - // Only after the miner will find the sha - reactor.Subscribe("newBlock", miner.powQuitChan) - reactor.Subscribe("newTx:pre", miner.powQuitChan) - logger.Infoln("Started") + mux.Post(Event{Started, miner}) +} - reactor.Post("miner:start", miner) +func (miner *Miner) Stop() { + logger.Infoln("Stopping...") + miner.events.Unsubscribe() + miner.ethereum.EventMux().Post(Event{Stopped, miner}) } func (miner *Miner) listener() { for { + miner.startMining() + select { - case status := <-miner.quitChan: - logger.Infoln("Stopped") - status <- nil - return - case chanMessage := <-miner.reactChan: + case event, isopen := <-miner.events.Chan(): + miner.stopMining() + if !isopen { + return + } - if block, ok := chanMessage.Resource.(*ethchain.Block); ok { + switch event := event.(type) { + case ethchain.NewBlockEvent: + block := event.Block //logger.Infoln("Got new block via Reactor") if bytes.Compare(miner.ethereum.BlockChain().CurrentBlock.Hash(), block.Hash()) == 0 { // TODO: Perhaps continue mining to get some uncle rewards @@ -117,49 +123,44 @@ func (miner *Miner) listener() { miner.uncles = append(miner.uncles, block) } } - } - if tx, ok := chanMessage.Resource.(*ethchain.Transaction); ok { - found := false - for _, ctx := range miner.txs { - if found = bytes.Compare(ctx.Hash(), tx.Hash()) == 0; found { - break + case ethchain.TxEvent: + if event.Type == ethchain.TxPre { + found := false + for _, ctx := range miner.txs { + if found = bytes.Compare(ctx.Hash(), event.Tx.Hash()) == 0; found { + break + } + } + if found == false { + // Undo all previous commits + miner.block.Undo() + // Apply new transactions + miner.txs = append(miner.txs, event.Tx) } - - } - if found == false { - // Undo all previous commits - miner.block.Undo() - // Apply new transactions - miner.txs = append(miner.txs, tx) } } - default: - miner.mineNewBlock() + + case <-miner.powDone: + // next iteration will start mining again } } } -func (miner *Miner) Stop() { - logger.Infoln("Stopping...") - - miner.powQuitChan <- ethreact.Event{} - - status := make(chan error) - miner.quitChan <- status - <-status - - reactor := miner.ethereum.Reactor() - reactor.Unsubscribe("newBlock", miner.powQuitChan) - reactor.Unsubscribe("newTx:pre", miner.powQuitChan) - reactor.Unsubscribe("newBlock", miner.reactChan) - reactor.Unsubscribe("newTx:pre", miner.reactChan) +func (miner *Miner) startMining() { + if miner.powDone == nil { + miner.powDone = make(chan struct{}) + } + miner.powQuitChan = make(chan struct{}) + go miner.mineNewBlock() +} - reactor.Post("miner:stop", miner) +func (miner *Miner) stopMining() { + close(miner.powQuitChan) + <-miner.powDone } func (self *Miner) mineNewBlock() { - stateManager := self.ethereum.StateManager() self.block = self.ethereum.BlockChain().NewBlock(self.coinbase) @@ -195,8 +196,9 @@ func (self *Miner) mineNewBlock() { logger.Infof("Mining on block. Includes %v transactions", len(self.txs)) // Find a valid nonce - self.block.Nonce = self.pow.Search(self.block, self.powQuitChan) - if self.block.Nonce != nil { + nonce := self.pow.Search(self.block, self.powQuitChan) + if nonce != nil { + self.block.Nonce = nonce err := self.ethereum.StateManager().Process(self.block, false) if err != nil { logger.Infoln(err) @@ -208,4 +210,5 @@ func (self *Miner) mineNewBlock() { self.txs = self.ethereum.TxPool().CurrentTransactions() } } + self.powDone <- struct{}{} } diff --git a/events.go b/events.go new file mode 100644 index 000000000..5fff1d831 --- /dev/null +++ b/events.go @@ -0,0 +1,11 @@ +package eth + +import "container/list" + +type PeerListEvent struct { + Peers *list.List +} + +type ChainSyncEvent struct { + InSync bool +} diff --git a/peer.go b/peer.go index e9551e066..d66d082bb 100644 --- a/peer.go +++ b/peer.go @@ -802,7 +802,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.versionKnown = true p.ethereum.PushPeer(p) - p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) + p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()}) p.protocolCaps = caps capsIt := caps.NewIterator() -- cgit v1.2.3