diff options
author | obscuren <geffobscura@gmail.com> | 2014-09-29 18:57:51 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-09-29 18:57:51 +0800 |
commit | ab6ede51d7fedb9270cab08ee732a834be34dab2 (patch) | |
tree | d8252f27d51c456e637140a312cadfe2ced71528 /ethchain/state_manager.go | |
parent | ea0357bf02b61db94bd0ad8806ba7337a55a4f79 (diff) | |
download | go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.gz go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.bz2 go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.lz go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.xz go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.zst go-tangerine-ab6ede51d7fedb9270cab08ee732a834be34dab2.zip |
Working on new (blocking) event machine.
The new event machine will be used for loose coupling and handle the
communications between the services:
1) Block pool finds blocks which "links" with our current canonical
chain
2) Posts the blocks on to the event machine
3) State manager receives blocks & processes them
4) Broadcasts new post block event
Diffstat (limited to 'ethchain/state_manager.go')
-rw-r--r-- | ethchain/state_manager.go | 71 |
1 files changed, 51 insertions, 20 deletions
diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index cd2d57af9..f38666572 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -15,14 +15,11 @@ import ( "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/eventer" ) var statelogger = ethlog.NewLogger("STATE") -type BlockProcessor interface { - ProcessBlock(block *Block) -} - type Peer interface { Inbound() bool LastSend() time.Time @@ -48,6 +45,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database + Eventer() *eventer.EventMachine } type StateManager struct { @@ -60,7 +58,7 @@ type StateManager struct { // Proof of work used for validating Pow PoW // The ethereum manager interface - Ethereum EthManager + eth EthManager // The managed states // Transiently state. The trans state isn't ever saved, validated and // it could be used for setting account nonces without effecting @@ -74,14 +72,18 @@ type StateManager struct { // This does not have to be a valid block and will be set during // 'Process' & canonical validation. lastAttemptedBlock *Block + + // Quit chan + quit chan bool } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - Ethereum: ethereum, - bc: ethereum.BlockChain(), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), + quit: make(chan bool), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -89,8 +91,41 @@ func NewStateManager(ethereum EthManager) *StateManager { return sm } +func (self *StateManager) Start() { + statelogger.Debugln("Starting state manager") + + go self.updateThread() +} + +func (self *StateManager) Stop() { + statelogger.Debugln("Stopping state manager") + + close(self.quit) +} + +func (self *StateManager) updateThread() { + blockChan := self.eth.Eventer().Register("block") + +out: + for { + select { + case event := <-blockChan: + block := event.Data.(*Block) + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + } + + case <-self.quit: + break out + } + } +} + func (sm *StateManager) CurrentState() *ethstate.State { - return sm.Ethereum.BlockChain().CurrentBlock.State() + return sm.eth.BlockChain().CurrentBlock.State() } func (sm *StateManager) TransState() *ethstate.State { @@ -102,7 +137,7 @@ func (sm *StateManager) MiningState() *ethstate.State { } func (sm *StateManager) NewMiningState() *ethstate.State { - sm.miningState = sm.Ethereum.BlockChain().CurrentBlock.State().Copy() + sm.miningState = sm.eth.BlockChain().CurrentBlock.State().Copy() return sm.miningState } @@ -164,7 +199,7 @@ done: } // Notify all subscribers - self.Ethereum.Reactor().Post("newTx:post", tx) + self.eth.Reactor().Post("newTx:post", tx) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -251,16 +286,16 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { filter := sm.createBloomFilter(state) // Persist the data fk := append([]byte("bloom"), block.Hash()...) - sm.Ethereum.Db().Put(fk, filter.Bin()) + sm.eth.Db().Put(fk, filter.Bin()) statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.Ethereum.Reactor().Post("newBlock", block) + sm.eth.Reactor().Post("newBlock", block) state.Manifest().Reset() } - sm.Ethereum.TxPool().RemoveInvalid(state) + sm.eth.TxPool().RemoveInvalid(state) } else { statelogger.Errorln("total diff failed") } @@ -385,10 +420,6 @@ func (sm *StateManager) AccumelateRewards(state *ethstate.State, block, parent * return nil } -func (sm *StateManager) Stop() { - sm.bc.Stop() -} - // Manifest will handle both creating notifications and generating bloom bin data func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf := NewBloomFilter(nil) @@ -398,7 +429,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.Ethereum.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.Reactor().Post("messages", state.Manifest().Messages) return bloomf } |