diff options
author | Felix Lange <fjl@twurst.com> | 2014-10-14 07:58:31 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2014-10-17 00:50:48 +0800 |
commit | 36cdab206849c7e363e0b9911553098c3e8ca644 (patch) | |
tree | 5cf4093f0531b435e7dc4092953a60903d837429 /ethereum.go | |
parent | 690690489610352d43f8547744b6c9486ad5affa (diff) | |
download | go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar.gz go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar.bz2 go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar.lz go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar.xz go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.tar.zst go-tangerine-36cdab206849c7e363e0b9911553098c3e8ca644.zip |
all: use (blocking) event package instead of ethreact
Diffstat (limited to 'ethereum.go')
-rw-r--r-- | ethereum.go | 74 |
1 files changed, 27 insertions, 47 deletions
diff --git a/ethereum.go b/ethereum.go index 204f30bec..750ca8f03 100644 --- a/ethereum.go +++ b/ethereum.go @@ -17,12 +17,11 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) const ( @@ -60,7 +59,7 @@ type Ethereum struct { // The block pool blockPool *BlockPool // Eventer - eventer *eventer.EventMachine + eventMux *event.TypeMux // Peers peers *list.List // Nonce @@ -85,8 +84,6 @@ type Ethereum struct { listening bool - reactor *ethreact.ReactorEngine - RpcServer *ethrpc.JsonRpcServer keyManager *ethcrypto.KeyManager @@ -129,8 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager isUpToDate: true, filters: make(map[int]*ethchain.Filter), } - ethereum.reactor = ethreact.New() - ethereum.eventer = eventer.New() + ethereum.eventMux = event.NewTypeMux() ethereum.blockPool = NewBlockPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum) @@ -143,10 +139,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethreact.ReactorEngine { - return s.reactor -} - func (s *Ethereum) KeyManager() *ethcrypto.KeyManager { return s.keyManager } @@ -169,8 +161,8 @@ func (s *Ethereum) TxPool() *ethchain.TxPool { func (s *Ethereum) BlockPool() *BlockPool { return s.blockPool } -func (s *Ethereum) Eventer() *eventer.EventMachine { - return s.eventer +func (s *Ethereum) EventMux() *event.TypeMux { + return s.eventMux } func (self *Ethereum) Db() ethutil.Database { return self.db @@ -376,7 +368,7 @@ func (s *Ethereum) removePeerElement(e *list.Element) { s.peers.Remove(e) - s.reactor.Post("peerList", s.peers) + s.eventMux.Post(PeerListEvent{s.peers}) } func (s *Ethereum) RemovePeer(p *Peer) { @@ -400,7 +392,6 @@ func (s *Ethereum) reapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { - s.reactor.Start() s.blockPool.Start() s.stateManager.Start() @@ -524,8 +515,7 @@ func (s *Ethereum) Stop() { } s.txPool.Stop() s.stateManager.Stop() - s.reactor.Flush() - s.reactor.Stop() + s.eventMux.Stop() s.blockPool.Stop() ethlogger.Infoln("Server stopped") @@ -584,10 +574,10 @@ out: select { case <-upToDateTimer.C: if self.IsUpToDate() && !self.isUpToDate { - self.reactor.Post("chainSync", false) + self.eventMux.Post(ChainSyncEvent{false}) self.isUpToDate = true } else if !self.IsUpToDate() && self.isUpToDate { - self.reactor.Post("chainSync", true) + self.eventMux.Post(ChainSyncEvent{true}) self.isUpToDate = false } case <-self.quit: @@ -623,40 +613,30 @@ func (self *Ethereum) GetFilter(id int) *ethchain.Filter { } func (self *Ethereum) filterLoop() { - blockChan := make(chan ethreact.Event, 5) - messageChan := make(chan ethreact.Event, 5) // Subscribe to events - reactor := self.Reactor() - reactor.Subscribe("newBlock", blockChan) - reactor.Subscribe("messages", messageChan) -out: - for { - select { - case <-self.quit: - break out - case block := <-blockChan: - if block, ok := block.Resource.(*ethchain.Block); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(block) - } + events := self.eventMux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil)) + for event := range events.Chan() { + switch event := event.(type) { + case ethchain.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) } - self.filterMu.RUnlock() } - case msg := <-messageChan: - if messages, ok := msg.Resource.(ethstate.Messages); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(messages) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } + self.filterMu.RUnlock() + + case ethstate.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) } } - self.filterMu.RUnlock() } + self.filterMu.RUnlock() } } } |