diff options
Diffstat (limited to 'ethchain/state_manager.go')
-rw-r--r-- | ethchain/state_manager.go | 55 |
1 files changed, 20 insertions, 35 deletions
diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 589b99ac2..b71cbe8a1 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -11,11 +11,10 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) var statelogger = ethlog.NewLogger("STATE") @@ -37,7 +36,6 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool @@ -45,7 +43,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database - Eventer() *eventer.EventMachine + EventMux() *event.TypeMux } type StateManager struct { @@ -73,17 +71,15 @@ type StateManager struct { // 'Process' & canonical validation. lastAttemptedBlock *Block - // Quit chan - quit chan bool + events event.Subscription } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - eth: ethereum, - bc: ethereum.BlockChain(), - quit: make(chan bool), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager { func (self *StateManager) Start() { statelogger.Debugln("Starting state manager") - + self.events = self.eth.EventMux().Subscribe(Blocks(nil)) go self.updateThread() } func (self *StateManager) Stop() { statelogger.Debugln("Stopping state manager") - - close(self.quit) + self.events.Unsubscribe() } func (self *StateManager) updateThread() { - blockChan := self.eth.Eventer().Register("blocks") - -out: - for { - select { - case event := <-blockChan: - blocks := event.Data.(Blocks) - for _, block := range blocks { - err := self.Process(block, false) - if err != nil { - statelogger.Infoln(err) - statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - statelogger.Debugln(block) - break - } + for ev := range self.events.Chan() { + for _, block := range ev.(Blocks) { + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + break } - - case <-self.quit: - break out } } } @@ -202,7 +187,7 @@ done: } // Notify all subscribers - self.eth.Reactor().Post("newTx:post", tx) + self.eth.EventMux().Post(TxEvent{TxPost, tx}) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.eth.Reactor().Post("newBlock", block) + sm.eth.EventMux().Post(NewBlockEvent{block}) state.Manifest().Reset() } @@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.eth.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.EventMux().Post(state.Manifest().Messages) return bloomf } |