diff options
author | Jeffrey Wilcke <obscuren@users.noreply.github.com> | 2014-10-17 23:10:34 +0800 |
---|---|---|
committer | Jeffrey Wilcke <obscuren@users.noreply.github.com> | 2014-10-17 23:10:34 +0800 |
commit | df2b70853ff0764c4fcb181547eee3a66f7bda4a (patch) | |
tree | a70bf06660283bcc83f6a499cf9fed808d347a06 /ethchain | |
parent | 3b709852846317d3456f55f906afc327ba76d6df (diff) | |
parent | fa84e50ddb8e64d4cb92d58e235cfed13761f21e (diff) | |
download | go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar.gz go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar.bz2 go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar.lz go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar.xz go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.tar.zst go-tangerine-df2b70853ff0764c4fcb181547eee3a66f7bda4a.zip |
Merge pull request #58 from fjl/feature/event
Blocking event package
Diffstat (limited to 'ethchain')
-rw-r--r-- | ethchain/dagger.go | 7 | ||||
-rw-r--r-- | ethchain/events.go | 10 | ||||
-rw-r--r-- | ethchain/filter_test.go | 2 | ||||
-rw-r--r-- | ethchain/helper_test.go | 18 | ||||
-rw-r--r-- | ethchain/state_manager.go | 55 | ||||
-rw-r--r-- | ethchain/transaction_pool.go | 3 |
6 files changed, 47 insertions, 48 deletions
diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 916d7e9c8..2d2b5720f 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" ) @@ -16,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethreact.Event) []byte + Search(block *Block, stop <-chan struct{}) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool GetHashrate() int64 Turbo(bool) @@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) { pow.turbo = on } -func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { +func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty @@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { for { select { - case <-reactChan: + case <-stop: powlogger.Infoln("Breaking from mining") return nil default: diff --git a/ethchain/events.go b/ethchain/events.go new file mode 100644 index 000000000..05c21edfe --- /dev/null +++ b/ethchain/events.go @@ -0,0 +1,10 @@ +package ethchain + +type TxEvent struct { + Type int // TxPre || TxPost + Tx *Transaction +} + +type NewBlockEvent struct { + Block *Block +} diff --git a/ethchain/filter_test.go b/ethchain/filter_test.go index 6dce51b3b..e569b3774 100644 --- a/ethchain/filter_test.go +++ b/ethchain/filter_test.go @@ -3,5 +3,5 @@ package ethchain import "testing" func TestFilter(t *testing.T) { - filter := NewFilter() + NewFilter(NewTestManager()) } diff --git a/ethchain/helper_test.go b/ethchain/helper_test.go index 75d7771fc..2da01d8a6 100644 --- a/ethchain/helper_test.go +++ b/ethchain/helper_test.go @@ -6,16 +6,17 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethdb" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/event" ) // Implement our EthTest Manager type TestManager struct { stateManager *StateManager - reactor *ethreact.ReactorEngine + eventMux *event.TypeMux + db ethutil.Database txPool *TxPool blockChain *BlockChain Blocks []*Block @@ -49,8 +50,8 @@ func (tm *TestManager) StateManager() *StateManager { return tm.stateManager } -func (tm *TestManager) Reactor() *ethreact.ReactorEngine { - return tm.reactor +func (tm *TestManager) EventMux() *event.TypeMux { + return tm.eventMux } func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) { fmt.Println("Broadcast not implemented") @@ -63,7 +64,10 @@ func (tm *TestManager) KeyManager() *ethcrypto.KeyManager { return nil } -func (tm *TestManager) Db() ethutil.Database { return nil } +func (tm *TestManager) Db() ethutil.Database { + return tm.db +} + func NewTestManager() *TestManager { ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH") @@ -75,8 +79,8 @@ func NewTestManager() *TestManager { ethutil.Config.Db = db testManager := &TestManager{} - testManager.reactor = ethreact.New() - + testManager.eventMux = new(event.TypeMux) + testManager.db = db testManager.txPool = NewTxPool(testManager) testManager.blockChain = NewBlockChain(testManager) testManager.stateManager = NewStateManager(testManager) diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 589b99ac2..b71cbe8a1 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -11,11 +11,10 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) var statelogger = ethlog.NewLogger("STATE") @@ -37,7 +36,6 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool @@ -45,7 +43,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database - Eventer() *eventer.EventMachine + EventMux() *event.TypeMux } type StateManager struct { @@ -73,17 +71,15 @@ type StateManager struct { // 'Process' & canonical validation. lastAttemptedBlock *Block - // Quit chan - quit chan bool + events event.Subscription } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - eth: ethereum, - bc: ethereum.BlockChain(), - quit: make(chan bool), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager { func (self *StateManager) Start() { statelogger.Debugln("Starting state manager") - + self.events = self.eth.EventMux().Subscribe(Blocks(nil)) go self.updateThread() } func (self *StateManager) Stop() { statelogger.Debugln("Stopping state manager") - - close(self.quit) + self.events.Unsubscribe() } func (self *StateManager) updateThread() { - blockChan := self.eth.Eventer().Register("blocks") - -out: - for { - select { - case event := <-blockChan: - blocks := event.Data.(Blocks) - for _, block := range blocks { - err := self.Process(block, false) - if err != nil { - statelogger.Infoln(err) - statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - statelogger.Debugln(block) - break - } + for ev := range self.events.Chan() { + for _, block := range ev.(Blocks) { + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + break } - - case <-self.quit: - break out } } } @@ -202,7 +187,7 @@ done: } // Notify all subscribers - self.eth.Reactor().Post("newTx:post", tx) + self.eth.EventMux().Post(TxEvent{TxPost, tx}) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.eth.Reactor().Post("newBlock", block) + sm.eth.EventMux().Post(NewBlockEvent{block}) state.Manifest().Reset() } @@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.eth.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.EventMux().Post(state.Manifest().Messages) return bloomf } diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index da6c3d6ba..0676af3a3 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -24,6 +24,7 @@ type TxMsgTy byte const ( TxPre = iota TxPost + minGasPrice = 1000000 ) @@ -160,7 +161,7 @@ out: txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash()) // Notify the subscribers - pool.Ethereum.Reactor().Post("newTx:pre", tx) + pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx}) } case <-pool.quit: break out |